[VOL-1346] This commit addresses device discovery notifications
which will be principally used by the affinity router. In doing so
this commit also rename the core_adapter.proto to inter_container.proto.
Change-Id: Ib2a7b84efa50367d0ffbc482fba6096a225f3150
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 8468e42..e330b85 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -22,7 +22,7 @@
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"gopkg.in/Shopify/sarama.v1"
"strings"
"sync"
@@ -40,7 +40,7 @@
//consumer or a group consumer
type consumerChannels struct {
consumers []interface{}
- channels []chan *ca.InterContainerMessage
+ channels []chan *ic.InterContainerMessage
}
// SaramaClient represents the messaging proxy
@@ -307,20 +307,20 @@
// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
// messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
log.Debugw("subscribe", log.Fields{"topic": topic.Name})
// If a consumers already exist for that topic then resuse it
if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
// Create a channel specific for that consumers and add it to the consumers channel map
- ch := make(chan *ca.InterContainerMessage)
+ ch := make(chan *ic.InterContainerMessage)
sc.addChannelToConsumerChannelMap(topic, ch)
return ch, nil
}
// Register for the topic and set it up
- var consumerListeningChannel chan *ca.InterContainerMessage
+ var consumerListeningChannel chan *ic.InterContainerMessage
var err error
// Use the consumerType option to figure out the type of consumer to launch
@@ -351,7 +351,7 @@
}
//UnSubscribe unsubscribe a consumer from a given topic
-func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
+func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
return err
@@ -393,9 +393,9 @@
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
case ok := <-sc.producer.Successes():
- log.Debugw("message-sent", log.Fields{"status":ok})
+ log.Debugw("message-sent", log.Fields{"status": ok})
case notOk := <-sc.producer.Errors():
- log.Debugw("error-sending", log.Fields{"status":notOk})
+ log.Debugw("error-sending", log.Fields{"status": notOk})
return notOk
}
return nil
@@ -443,7 +443,7 @@
return nil
}
-func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ca.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -482,7 +482,7 @@
return err
}
-func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -620,12 +620,12 @@
// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
// topic via the unique channel each subsciber received during subscription
-func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
// Need to go over all channels and publish messages to them - do we need to copy msg?
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
for _, ch := range consumerCh.channels {
- go func(c chan *ca.InterContainerMessage) {
+ go func(c chan *ic.InterContainerMessage) {
c <- protoMessage
}(ch)
}
@@ -652,7 +652,7 @@
break startloop
}
msgBody := msg.Value
- icm := &ca.InterContainerMessage{}
+ icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
log.Warnw("partition-invalid-message", log.Fields{"error": err})
continue
@@ -688,7 +688,7 @@
break startloop
}
msgBody := msg.Value
- icm := &ca.InterContainerMessage{}
+ icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
log.Warnw("invalid-message", log.Fields{"error": err})
continue
@@ -728,7 +728,7 @@
//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
//// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
var pConsumers []sarama.PartitionConsumer
var err error
@@ -744,10 +744,10 @@
// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
// unbuffered to verify race conditions.
- consumerListeningChannel := make(chan *ca.InterContainerMessage)
+ consumerListeningChannel := make(chan *ic.InterContainerMessage)
cc := &consumerChannels{
consumers: consumersIf,
- channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
+ channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
}
// Add the consumers channel to the map
@@ -761,7 +761,7 @@
// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
// TODO: Replace this development partition consumers with a group consumers
var pConsumer *scc.Consumer
var err error
@@ -771,10 +771,10 @@
}
// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
// unbuffered to verify race conditions.
- consumerListeningChannel := make(chan *ca.InterContainerMessage)
+ consumerListeningChannel := make(chan *ic.InterContainerMessage)
cc := &consumerChannels{
consumers: []interface{}{pConsumer},
- channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
+ channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
}
// Add the consumers channel to the map
@@ -806,9 +806,9 @@
return pConsumers, nil
}
-func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
+func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
var i int
- var channel chan *ca.InterContainerMessage
+ var channel chan *ic.InterContainerMessage
for i, channel = range channels {
if channel == ch {
channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]