[VOL-4290] Voltha go library updates for gRPC migration
Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 79827aa..185f6ec 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -27,10 +27,8 @@
scc "github.com/bsm/sarama-cluster"
"github.com/eapache/go-resiliency/breaker"
"github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
"github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
- ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
)
// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
@@ -38,7 +36,7 @@
//consumer or a group consumer
type consumerChannels struct {
consumers []interface{}
- channels []chan *ic.InterContainerMessage
+ channels []chan proto.Message
}
// static check to ensure SaramaClient implements Client
@@ -378,7 +376,7 @@
// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
// messages from that topic
-func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error) {
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
@@ -388,13 +386,13 @@
if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
// Create a channel specific for that consumers and add it to the consumers channel map
- ch := make(chan *ic.InterContainerMessage)
+ ch := make(chan proto.Message)
sc.addChannelToConsumerChannelMap(ctx, topic, ch)
return ch, nil
}
// Register for the topic and set it up
- var consumerListeningChannel chan *ic.InterContainerMessage
+ var consumerListeningChannel chan proto.Message
var err error
// Use the consumerType option to figure out the type of consumer to launch
@@ -441,7 +439,7 @@
}
//UnSubscribe unsubscribe a consumer from a given topic
-func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error {
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
@@ -609,7 +607,7 @@
// without blocking others. The monitor shouldn't really fall
// behind...
sc.liveness = make(chan bool, 10)
- // post intial state to the channel
+ // post initial state to the channel
sc.liveness <- sc.alive
}
} else {
@@ -635,7 +633,7 @@
// without blocking others. The monitor shouldn't really fall
// behind...
sc.healthiness = make(chan bool, 10)
- // post intial state to the channel
+ // post initial state to the channel
sc.healthiness <- sc.healthy
}
} else {
@@ -749,7 +747,7 @@
return nil
}
-func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan *ic.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan proto.Message) {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -788,7 +786,7 @@
return err
}
-func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan proto.Message) error {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -908,19 +906,18 @@
// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
// topic via the unique channel each subscriber received during subscription
-func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage proto.Message, fromTopic string, ts time.Time) {
// Need to go over all channels and publish messages to them - do we need to copy msg?
sc.lockTopicToConsumerChannelMap.RLock()
for _, ch := range consumerCh.channels {
- go func(c chan *ic.InterContainerMessage) {
+ go func(c chan proto.Message) {
c <- protoMessage
}(ch)
}
sc.lockTopicToConsumerChannelMap.RUnlock()
if callback := sc.metadataCallback; callback != nil {
- ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
- callback(protoMessage.Header.FromTopic, ts)
+ callback(fromTopic, ts)
}
}
@@ -948,12 +945,12 @@
msgBody := msg.Value
sc.updateLiveness(ctx, true)
logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
- icm := &ic.InterContainerMessage{}
- if err := proto.Unmarshal(msgBody, icm); err != nil {
+ var protoMsg proto.Message
+ if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
continue
}
- go sc.dispatchToConsumers(consumerChnls, icm)
+ go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
case <-sc.doneCh:
logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
break startloop
@@ -989,12 +986,12 @@
sc.updateLiveness(ctx, true)
logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
msgBody := msg.Value
- icm := &ic.InterContainerMessage{}
- if err := proto.Unmarshal(msgBody, icm); err != nil {
+ var protoMsg proto.Message
+ if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
continue
}
- go sc.dispatchToConsumers(consumerChnls, icm)
+ go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
consumer.MarkOffset(msg, "")
case ntf := <-consumer.Notifications():
logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
@@ -1030,7 +1027,7 @@
//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
//// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan proto.Message, error) {
var pConsumers []sarama.PartitionConsumer
var err error
@@ -1046,10 +1043,10 @@
// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
// unbuffered to verify race conditions.
- consumerListeningChannel := make(chan *ic.InterContainerMessage)
+ consumerListeningChannel := make(chan proto.Message)
cc := &consumerChannels{
consumers: consumersIf,
- channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
+ channels: []chan proto.Message{consumerListeningChannel},
}
// Add the consumers channel to the map
@@ -1069,7 +1066,7 @@
// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
// TODO: Replace this development partition consumers with a group consumers
var pConsumer *scc.Consumer
var err error
@@ -1079,10 +1076,10 @@
}
// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
// unbuffered to verify race conditions.
- consumerListeningChannel := make(chan *ic.InterContainerMessage)
+ consumerListeningChannel := make(chan proto.Message)
cc := &consumerChannels{
consumers: []interface{}{pConsumer},
- channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
+ channels: []chan proto.Message{consumerListeningChannel},
}
// Add the consumers channel to the map
@@ -1120,9 +1117,9 @@
return pConsumers, nil
}
-func removeChannel(ctx context.Context, channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
+func removeChannel(ctx context.Context, channels []chan proto.Message, ch <-chan proto.Message) []chan proto.Message {
var i int
- var channel chan *ic.InterContainerMessage
+ var channel chan proto.Message
for i, channel = range channels {
if channel == ch {
channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]