Added SubscribeForMetadata() to the sarama client, to access the header field of each received message.
Also replaced TestKafkaClientImplementsKafkaClientIf() with static checks.
For VOL-2207. Please consider these related patchsets together:
https://gerrit.opencord.org/#/q/VOL-2207
Change-Id: I77f03fbb0ebcdf60cb5f8d0c8c3276325510e874
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 9d4ab52..c0c16f9 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -42,6 +42,9 @@
channels []chan *ic.InterContainerMessage
}
+// static check to ensure SaramaClient implements Client
+var _ Client = &SaramaClient{}
+
// SaramaClient represents the messaging proxy
type SaramaClient struct {
cAdmin sarama.ClusterAdmin
@@ -68,6 +71,7 @@
numReplicas int
autoCreateTopic bool
doneCh chan int
+ metadataCallback func(fromTopic string, timestamp int64)
topicToConsumerChannelMap map[string]*consumerChannels
lockTopicToConsumerChannelMap sync.RWMutex
topicLockMap map[string]*sync.RWMutex
@@ -460,6 +464,10 @@
return err
}
+func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
+ sc.metadataCallback = callback
+}
+
func (sc *SaramaClient) updateLiveness(alive bool) {
// Post a consistent stream of liveness data to the channel,
// so that in a live state, the core does not timeout and
@@ -930,12 +938,16 @@
func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
// Need to go over all channels and publish messages to them - do we need to copy msg?
sc.lockTopicToConsumerChannelMap.RLock()
- defer sc.lockTopicToConsumerChannelMap.RUnlock()
for _, ch := range consumerCh.channels {
go func(c chan *ic.InterContainerMessage) {
c <- protoMessage
}(ch)
}
+ sc.lockTopicToConsumerChannelMap.RUnlock()
+
+ if callback := sc.metadataCallback; callback != nil {
+ callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
+ }
}
func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {