[VOL-1346] This commit addresses device discovery notifications
which will be principally used by the affinity router. In doing so
this commit also rename the core_adapter.proto to inter_container.proto.
Change-Id: Ib2a7b84efa50367d0ffbc482fba6096a225f3150
diff --git a/kafka/client.go b/kafka/client.go
index b93ad86..1df700e 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -16,7 +16,7 @@
package kafka
import (
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ca "github.com/opencord/voltha-go/protos/inter_container"
"time"
)
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index ff3584f..4a3ec92 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -24,7 +24,7 @@
"github.com/golang/protobuf/ptypes/any"
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"reflect"
"sync"
"time"
@@ -45,14 +45,14 @@
// async requests into the Core via the kafka messaging bus
type requestHandlerChannel struct {
requesthandlerInterface interface{}
- ch <-chan *ca.InterContainerMessage
+ ch <-chan *ic.InterContainerMessage
}
// transactionChannel represents a combination of a topic and a channel onto which a response received
// on the kafka bus will be sent to
type transactionChannel struct {
topic *Topic
- ch chan *ca.InterContainerMessage
+ ch chan *ic.InterContainerMessage
}
// InterContainerProxy represents the messaging proxy
@@ -61,6 +61,7 @@
kafkaPort int
DefaultTopic *Topic
defaultRequestHandlerInterface interface{}
+ deviceDiscoveryTopic *Topic
kafkaClient Client
doneCh chan int
@@ -72,7 +73,7 @@
// This map is used to map a channel to a response topic. This channel handles all responses on that
// channel for that topic and forward them to the appropriate consumers channel, using the
// transactionIdToChannelMap.
- topicToResponseChannelMap map[string]<-chan *ca.InterContainerMessage
+ topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
lockTopicResponseChannelMap sync.RWMutex
// This map is used to map a transaction to a consumers channel. This is used whenever a request has been
@@ -101,6 +102,12 @@
}
}
+func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
+ return func(args *InterContainerProxy) {
+ args.deviceDiscoveryTopic = topic
+ }
+}
+
func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
return func(args *InterContainerProxy) {
args.defaultRequestHandlerInterface = handler
@@ -149,7 +156,7 @@
}
// Create the topic to response channel map
- kp.topicToResponseChannelMap = make(map[string]<-chan *ca.InterContainerMessage)
+ kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
//
// Create the transactionId to Channel Map
kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
@@ -170,6 +177,47 @@
//kp.deleteAllTransactionIdToChannelMap()
}
+// DeviceDiscovered publish the discovered device onto the kafka messaging bus
+func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string) error {
+ log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
+ // Simple validation
+ if deviceId == "" || deviceType == "" {
+ log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
+ return errors.New("invalid-parameters")
+ }
+ // Create the device discovery message
+ header := &ic.Header{
+ Id: uuid.New().String(),
+ Type: ic.MessageType_DEVICE_DISCOVERED,
+ FromTopic: kp.DefaultTopic.Name,
+ ToTopic: kp.deviceDiscoveryTopic.Name,
+ Timestamp: time.Now().UnixNano(),
+ }
+ body := &ic.DeviceDiscovered{
+ Id: deviceId,
+ DeviceType: deviceType,
+ ParentId: parentId,
+ }
+
+ var marshalledData *any.Any
+ var err error
+ if marshalledData, err = ptypes.MarshalAny(body); err != nil {
+ log.Errorw("cannot-marshal-request", log.Fields{"error": err})
+ return err
+ }
+ msg := &ic.InterContainerMessage{
+ Header: header,
+ Body: marshalledData,
+ }
+
+ // Send the message
+ if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
+ log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
+ return err
+ }
+ return nil
+}
+
// InvokeRPC is used to send a request to a given topic
func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
@@ -189,7 +237,7 @@
}
// Subscribe for response, if needed, before sending request
- var ch <-chan *ca.InterContainerMessage
+ var ch <-chan *ic.InterContainerMessage
if waitForResponse {
var err error
if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
@@ -221,7 +269,7 @@
select {
case msg := <-ch:
log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
- var responseBody *ca.InterContainerResponseBody
+ var responseBody *ic.InterContainerResponseBody
var err error
if responseBody, err = decodeResponse(msg); err != nil {
log.Errorw("decode-response-error", log.Fields{"error": err})
@@ -230,7 +278,7 @@
case <-ctx.Done():
log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
// pack the error as proto any type
- protoError := &ca.Error{Reason: ctx.Err().Error()}
+ protoError := &ic.Error{Reason: ctx.Err().Error()}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
return false, nil // Should never happen
@@ -239,7 +287,7 @@
case <-childCtx.Done():
log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
// pack the error as proto any type
- protoError := &ca.Error{Reason: childCtx.Err().Error()}
+ protoError := &ic.Error{Reason: childCtx.Err().Error()}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
return false, nil // Should never happen
@@ -258,7 +306,7 @@
func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
// Subscribe to receive messages for that topic
- var ch <-chan *ca.InterContainerMessage
+ var ch <-chan *ic.InterContainerMessage
var err error
if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
//if ch, err = kp.Subscribe(topic); err != nil {
@@ -277,7 +325,7 @@
// when a message is received on a given topic. So far there is only 1 target registered per microservice
func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
// Subscribe to receive messages for that topic
- var ch <-chan *ca.InterContainerMessage
+ var ch <-chan *ic.InterContainerMessage
var err error
if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
@@ -296,7 +344,7 @@
// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
// responses from that topic.
-func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
@@ -376,7 +424,7 @@
return err
}
-func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
if _, exist := kp.transactionIdToChannelMap[id]; !exist {
@@ -444,15 +492,15 @@
return marshalledReturnedVal, nil
}
-func encodeDefaultFailedResponse(request *ca.InterContainerMessage) *ca.InterContainerMessage {
- responseHeader := &ca.Header{
+func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
+ responseHeader := &ic.Header{
Id: request.Header.Id,
- Type: ca.MessageType_RESPONSE,
+ Type: ic.MessageType_RESPONSE,
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
Timestamp: time.Now().Unix(),
}
- responseBody := &ca.InterContainerResponseBody{
+ responseBody := &ic.InterContainerResponseBody{
Success: false,
Result: nil,
}
@@ -463,7 +511,7 @@
log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
}
- return &ca.InterContainerMessage{
+ return &ic.InterContainerMessage{
Header: responseHeader,
Body: marshalledResponseBody,
}
@@ -472,11 +520,11 @@
//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
//or an error on failure
-func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
+func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
//log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
- responseHeader := &ca.Header{
+ responseHeader := &ic.Header{
Id: request.Header.Id,
- Type: ca.MessageType_RESPONSE,
+ Type: ic.MessageType_RESPONSE,
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
Timestamp: time.Now().Unix(),
@@ -492,7 +540,7 @@
break // for now we support only 1 returned value - (excluding the error)
}
- responseBody := &ca.InterContainerResponseBody{
+ responseBody := &ic.InterContainerResponseBody{
Success: success,
Result: marshalledReturnedVal,
}
@@ -504,7 +552,7 @@
return nil, err
}
- return &ca.InterContainerMessage{
+ return &ic.InterContainerMessage{
Header: responseHeader,
Body: marshalledResponseBody,
}, nil
@@ -524,16 +572,16 @@
return
}
-func (kp *InterContainerProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
+func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
// First extract the header to know whether this is a request - responses are handled by a different handler
- if msg.Header.Type == ca.MessageType_REQUEST {
+ if msg.Header.Type == ic.MessageType_REQUEST {
var out []reflect.Value
var err error
// Get the request body
- requestBody := &ca.InterContainerRequestBody{}
+ requestBody := &ic.InterContainerRequestBody{}
if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
} else {
@@ -547,11 +595,11 @@
// Response required?
if requestBody.ResponseRequired {
// If we already have an error before then just return that
- var returnError *ca.Error
+ var returnError *ic.Error
var returnedValues []interface{}
var success bool
if err != nil {
- returnError = &ca.Error{Reason: err.Error()}
+ returnError = &ic.Error{Reason: err.Error()}
returnedValues = make([]interface{}, 1)
returnedValues[0] = returnError
} else {
@@ -561,10 +609,10 @@
lastIndex := len(out) - 1
if out[lastIndex].Interface() != nil { // Error
if goError, ok := out[lastIndex].Interface().(error); ok {
- returnError = &ca.Error{Reason: goError.Error()}
+ returnError = &ic.Error{Reason: goError.Error()}
returnedValues = append(returnedValues, returnError)
} else { // Should never happen
- returnError = &ca.Error{Reason: "incorrect-error-returns"}
+ returnError = &ic.Error{Reason: "incorrect-error-returns"}
returnedValues = append(returnedValues, returnError)
}
} else { // Non-error case
@@ -578,7 +626,7 @@
}
}
- var icm *ca.InterContainerMessage
+ var icm *ic.InterContainerMessage
if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
icm = encodeDefaultFailedResponse(msg)
@@ -597,7 +645,7 @@
}
}
-func (kp *InterContainerProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
// Wait for messages
for msg := range ch {
//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
@@ -605,7 +653,7 @@
}
}
-func (kp *InterContainerProxy) dispatchResponse(msg *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
@@ -617,14 +665,14 @@
// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
// and then dispatches to the consumers
-func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
+func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
startloop:
for {
select {
case msg := <-subscribedCh:
//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
- if msg.Header.Type == ca.MessageType_RESPONSE {
+ if msg.Header.Type == ic.MessageType_RESPONSE {
go kp.dispatchResponse(msg)
}
case <-kp.doneCh:
@@ -641,13 +689,13 @@
// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
// API. There is one response channel waiting for kafka messages before dispatching the message to the
// corresponding waiting channel
-func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
+func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
// First check whether we already have a channel listening for response on that topic. If there is
// already one then it will be reused. If not, it will be created.
if !kp.isTopicSubscribedForResponse(topic.Name) {
- var subscribedCh <-chan *ca.InterContainerMessage
+ var subscribedCh <-chan *ic.InterContainerMessage
var err error
if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
@@ -659,7 +707,7 @@
// Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
// broadcast any message for this topic to all channels waiting on it.
- ch := make(chan *ca.InterContainerMessage)
+ ch := make(chan *ic.InterContainerMessage)
kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
return ch, nil
@@ -676,15 +724,15 @@
//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
//or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ca.InterContainerMessage, error) {
- requestHeader := &ca.Header{
+func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+ requestHeader := &ic.Header{
Id: uuid.New().String(),
- Type: ca.MessageType_REQUEST,
+ Type: ic.MessageType_REQUEST,
FromTopic: replyTopic.Name,
ToTopic: toTopic.Name,
Timestamp: time.Now().Unix(),
}
- requestBody := &ca.InterContainerRequestBody{
+ requestBody := &ic.InterContainerRequestBody{
Rpc: rpc,
ResponseRequired: true,
ReplyToTopic: replyTopic.Name,
@@ -708,7 +756,7 @@
log.Warnw("cannot-marshal-request", log.Fields{"error": err})
return nil, err
}
- protoArg := &ca.Argument{
+ protoArg := &ic.Argument{
Key: arg.Key,
Value: marshalledArg,
}
@@ -721,16 +769,16 @@
log.Warnw("cannot-marshal-request", log.Fields{"error": err})
return nil, err
}
- request := &ca.InterContainerMessage{
+ request := &ic.InterContainerMessage{
Header: requestHeader,
Body: marshalledData,
}
return request, nil
}
-func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
+func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
// Extract the message body
- responseBody := ca.InterContainerResponseBody{}
+ responseBody := ic.InterContainerResponseBody{}
if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
return nil, err
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 8468e42..e330b85 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -22,7 +22,7 @@
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"gopkg.in/Shopify/sarama.v1"
"strings"
"sync"
@@ -40,7 +40,7 @@
//consumer or a group consumer
type consumerChannels struct {
consumers []interface{}
- channels []chan *ca.InterContainerMessage
+ channels []chan *ic.InterContainerMessage
}
// SaramaClient represents the messaging proxy
@@ -307,20 +307,20 @@
// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
// messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
log.Debugw("subscribe", log.Fields{"topic": topic.Name})
// If a consumers already exist for that topic then resuse it
if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
// Create a channel specific for that consumers and add it to the consumers channel map
- ch := make(chan *ca.InterContainerMessage)
+ ch := make(chan *ic.InterContainerMessage)
sc.addChannelToConsumerChannelMap(topic, ch)
return ch, nil
}
// Register for the topic and set it up
- var consumerListeningChannel chan *ca.InterContainerMessage
+ var consumerListeningChannel chan *ic.InterContainerMessage
var err error
// Use the consumerType option to figure out the type of consumer to launch
@@ -351,7 +351,7 @@
}
//UnSubscribe unsubscribe a consumer from a given topic
-func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
+func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
return err
@@ -393,9 +393,9 @@
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
case ok := <-sc.producer.Successes():
- log.Debugw("message-sent", log.Fields{"status":ok})
+ log.Debugw("message-sent", log.Fields{"status": ok})
case notOk := <-sc.producer.Errors():
- log.Debugw("error-sending", log.Fields{"status":notOk})
+ log.Debugw("error-sending", log.Fields{"status": notOk})
return notOk
}
return nil
@@ -443,7 +443,7 @@
return nil
}
-func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ca.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -482,7 +482,7 @@
return err
}
-func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -620,12 +620,12 @@
// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
// topic via the unique channel each subsciber received during subscription
-func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
// Need to go over all channels and publish messages to them - do we need to copy msg?
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
for _, ch := range consumerCh.channels {
- go func(c chan *ca.InterContainerMessage) {
+ go func(c chan *ic.InterContainerMessage) {
c <- protoMessage
}(ch)
}
@@ -652,7 +652,7 @@
break startloop
}
msgBody := msg.Value
- icm := &ca.InterContainerMessage{}
+ icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
log.Warnw("partition-invalid-message", log.Fields{"error": err})
continue
@@ -688,7 +688,7 @@
break startloop
}
msgBody := msg.Value
- icm := &ca.InterContainerMessage{}
+ icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
log.Warnw("invalid-message", log.Fields{"error": err})
continue
@@ -728,7 +728,7 @@
//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
//// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
var pConsumers []sarama.PartitionConsumer
var err error
@@ -744,10 +744,10 @@
// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
// unbuffered to verify race conditions.
- consumerListeningChannel := make(chan *ca.InterContainerMessage)
+ consumerListeningChannel := make(chan *ic.InterContainerMessage)
cc := &consumerChannels{
consumers: consumersIf,
- channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
+ channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
}
// Add the consumers channel to the map
@@ -761,7 +761,7 @@
// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
// TODO: Replace this development partition consumers with a group consumers
var pConsumer *scc.Consumer
var err error
@@ -771,10 +771,10 @@
}
// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
// unbuffered to verify race conditions.
- consumerListeningChannel := make(chan *ca.InterContainerMessage)
+ consumerListeningChannel := make(chan *ic.InterContainerMessage)
cc := &consumerChannels{
consumers: []interface{}{pConsumer},
- channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
+ channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
}
// Add the consumers channel to the map
@@ -806,9 +806,9 @@
return pConsumers, nil
}
-func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
+func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
var i int
- var channel chan *ca.InterContainerMessage
+ var channel chan *ic.InterContainerMessage
for i, channel = range channels {
if channel == ch {
channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
diff --git a/protos/core_adapter.proto b/protos/inter_container.proto
similarity index 90%
rename from protos/core_adapter.proto
rename to protos/inter_container.proto
index 24390f5..931ddb9 100644
--- a/protos/core_adapter.proto
+++ b/protos/inter_container.proto
@@ -5,7 +5,7 @@
import public "logical_device.proto";
-option go_package = "github.com/opencord/voltha-go/protos/core_adapter";
+option go_package = "github.com/opencord/voltha-go/protos/inter_container";
package voltha;
@@ -40,6 +40,7 @@
enum MessageType {
REQUEST = 0;
RESPONSE = 1;
+ DEVICE_DISCOVERED=2;
}
message Header {
@@ -81,6 +82,12 @@
LogicalPort port = 1;
}
+message DeviceDiscovered {
+ string id = 1;
+ string parent_id = 2;
+ string device_type = 3;
+}
+
message InterAdapterMessageType {
enum Types {
FLOW_REQUEST = 0;
diff --git a/protos/scripts/build_protos.sh b/protos/scripts/build_protos.sh
index 0b2e17a..3602c23 100755
--- a/protos/scripts/build_protos.sh
+++ b/protos/scripts/build_protos.sh
@@ -37,7 +37,7 @@
$SRC_DIR/meta.proto \
$SRC_DIR/yang_options.proto"
-export CORE_ADAPTER_PB="$SRC_DIR/core_adapter.proto"
+export INTER_CONTAINER_PB="$SRC_DIR/inter_container.proto"
export SCHEMA_PB="$SRC_DIR/schema.proto"
export IETF_PB="$SRC_DIR/ietf_interfaces.proto"
export OF_PB="$SRC_DIR/openflow_13.proto"
@@ -46,7 +46,7 @@
export PB_VARS="\
VOLTHA_PB \
COMMON_PB \
- CORE_ADAPTER_PB \
+ INTER_CONTAINER_PB \
SCHEMA_PB \
IETF_PB \
OF_PB \
diff --git a/python/Makefile b/python/Makefile
index b19472f..c2fd010 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -189,7 +189,11 @@
make -C voltha/protos install-protoc
clean:
- find voltha -name '*.pyc' | xargs rm -f
+ find . -name '*.pyc' | xargs rm -f
+ rm -f ./protos/*_pb2.py
+ rm -f ./protos/*_pb2_grpc.py
+ rm -f ./protos/*.desc
+ rm -f ./protos/*.proto
distclean: clean
rm -rf ${VENVDIR}
diff --git a/python/adapters/kafka/adapter_proxy.py b/python/adapters/kafka/adapter_proxy.py
index 769de80..657a681 100644
--- a/python/adapters/kafka/adapter_proxy.py
+++ b/python/adapters/kafka/adapter_proxy.py
@@ -23,7 +23,7 @@
from twisted.internet.defer import inlineCallbacks, returnValue
from container_proxy import ContainerProxy
from python.protos import third_party
-from python.protos.core_adapter_pb2 import InterAdapterHeader, \
+from python.protos.inter_container_pb2 import InterAdapterHeader, \
InterAdapterMessage
import time
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index cf7afd8..0efb811 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -24,7 +24,7 @@
from twisted.internet import reactor
from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
from python.adapters.interface import IAdapterInterface
-from python.protos.core_adapter_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
+from python.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
from python.protos.device_pb2 import Device
from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
FlowGroupChanges, ofp_packet_out
diff --git a/python/adapters/kafka/core_proxy.py b/python/adapters/kafka/core_proxy.py
index fa9544b..b897188 100644
--- a/python/adapters/kafka/core_proxy.py
+++ b/python/adapters/kafka/core_proxy.py
@@ -23,7 +23,7 @@
from container_proxy import ContainerProxy
from python.protos.common_pb2 import ID, ConnectStatus, OperStatus
-from python.protos.core_adapter_pb2 import StrType, BoolType, IntType, Packet
+from python.protos.inter_container_pb2 import StrType, BoolType, IntType, Packet
from python.protos.device_pb2 import Device, Ports
from python.protos.voltha_pb2 import CoreInstance
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index a3de4f1..fbb0834 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -28,7 +28,7 @@
from python.common.utils import asleep
from python.common.utils.registry import IComponent
from kafka_proxy import KafkaProxy, get_kafka_proxy
-from python.protos.core_adapter_pb2 import MessageType, Argument, \
+from python.protos.inter_container_pb2 import MessageType, Argument, \
InterContainerRequestBody, InterContainerMessage, Header, \
InterContainerResponseBody
diff --git a/python/adapters/ponsim_olt/ponsim_olt.py b/python/adapters/ponsim_olt/ponsim_olt.py
index 95d6590..e3157e7 100644
--- a/python/adapters/ponsim_olt/ponsim_olt.py
+++ b/python/adapters/ponsim_olt/ponsim_olt.py
@@ -41,7 +41,7 @@
from python.protos import ponsim_pb2
from python.protos import third_party
from python.protos.common_pb2 import OperStatus, ConnectStatus
-from python.protos.core_adapter_pb2 import SwitchCapability, PortCapability, \
+from python.protos.inter_container_pb2 import SwitchCapability, PortCapability, \
InterAdapterMessageType, InterAdapterResponseBody
from python.protos.device_pb2 import Port, PmConfig, PmConfigs
from python.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
diff --git a/python/adapters/ponsim_onu/ponsim_onu.py b/python/adapters/ponsim_onu/ponsim_onu.py
index eb4d716..9ad0799 100644
--- a/python/adapters/ponsim_onu/ponsim_onu.py
+++ b/python/adapters/ponsim_onu/ponsim_onu.py
@@ -34,7 +34,7 @@
from python.adapters.kafka.kafka_proxy import get_kafka_proxy
from python.protos import third_party
from python.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
-from python.protos.core_adapter_pb2 import PortCapability, \
+from python.protos.inter_container_pb2 import PortCapability, \
InterAdapterMessageType, InterAdapterResponseBody
from python.protos.device_pb2 import Port, PmConfig, PmConfigs
from python.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
diff --git a/python/requirements.txt b/python/requirements.txt
index a0641b2..56cb356 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -3,6 +3,7 @@
bitstring==3.1.5
cmd2==0.7.0
colorama==0.3.9
+confluent-kafka==0.11.5
cython==0.24.1
decorator==4.1.2
docker-py==1.10.6
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index e0d0fe6..f7ac794 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,51 +24,53 @@
// RW Core service default constants
const (
- ConsulStoreName = "consul"
- EtcdStoreName = "etcd"
- default_InstanceID = "rwcore001"
- default_GrpcPort = 50057
- default_GrpcHost = ""
- default_KafkaAdapterHost = "127.0.0.1"
- default_KafkaAdapterPort = 9092
- default_KafkaClusterHost = "127.0.0.1"
- default_KafkaClusterPort = 9094
- default_KVStoreType = EtcdStoreName
- default_KVStoreTimeout = 5 //in seconds
- default_KVStoreHost = "127.0.0.1"
- default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
- default_KVTxnKeyDelTime = 60
- default_LogLevel = 0
- default_Banner = false
- default_CoreTopic = "rwcore"
- default_RWCoreEndpoint = "rwcore"
- default_RWCoreKey = "pki/voltha.key"
- default_RWCoreCert = "pki/voltha.crt"
- default_RWCoreCA = "pki/voltha-CA.pem"
+ ConsulStoreName = "consul"
+ EtcdStoreName = "etcd"
+ default_InstanceID = "rwcore001"
+ default_GrpcPort = 50057
+ default_GrpcHost = ""
+ default_KafkaAdapterHost = "127.0.0.1"
+ default_KafkaAdapterPort = 9092
+ default_KafkaClusterHost = "127.0.0.1"
+ default_KafkaClusterPort = 9094
+ default_KVStoreType = EtcdStoreName
+ default_KVStoreTimeout = 5 //in seconds
+ default_KVStoreHost = "127.0.0.1"
+ default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
+ default_KVTxnKeyDelTime = 60
+ default_LogLevel = 0
+ default_Banner = false
+ default_CoreTopic = "rwcore"
+ default_RWCoreEndpoint = "rwcore"
+ default_RWCoreKey = "pki/voltha.key"
+ default_RWCoreCert = "pki/voltha.crt"
+ default_RWCoreCA = "pki/voltha-CA.pem"
+ default_Affinity_Router_Topic = "affinityRouter"
)
// RWCoreFlags represents the set of configurations used by the read-write core service
type RWCoreFlags struct {
// Command line parameters
- InstanceID string
- RWCoreEndpoint string
- GrpcHost string
- GrpcPort int
- KafkaAdapterHost string
- KafkaAdapterPort int
- KafkaClusterHost string
- KafkaClusterPort int
- KVStoreType string
- KVStoreTimeout int // in seconds
- KVStoreHost string
- KVStorePort int
- KVTxnKeyDelTime int
- CoreTopic string
- LogLevel int
- Banner bool
- RWCoreKey string
- RWCoreCert string
- RWCoreCA string
+ InstanceID string
+ RWCoreEndpoint string
+ GrpcHost string
+ GrpcPort int
+ KafkaAdapterHost string
+ KafkaAdapterPort int
+ KafkaClusterHost string
+ KafkaClusterPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ KVTxnKeyDelTime int
+ CoreTopic string
+ LogLevel int
+ Banner bool
+ RWCoreKey string
+ RWCoreCert string
+ RWCoreCA string
+ AffinityRouterTopic string
}
func init() {
@@ -78,25 +80,26 @@
// NewRWCoreFlags returns a new RWCore config
func NewRWCoreFlags() *RWCoreFlags {
var rwCoreFlag = RWCoreFlags{ // Default values
- InstanceID: default_InstanceID,
- RWCoreEndpoint: default_RWCoreEndpoint,
- GrpcHost: default_GrpcHost,
- GrpcPort: default_GrpcPort,
- KafkaAdapterHost: default_KafkaAdapterHost,
- KafkaAdapterPort: default_KafkaAdapterPort,
- KafkaClusterHost: default_KafkaClusterHost,
- KafkaClusterPort: default_KafkaClusterPort,
- KVStoreType: default_KVStoreType,
- KVStoreTimeout: default_KVStoreTimeout,
- KVStoreHost: default_KVStoreHost,
- KVStorePort: default_KVStorePort,
- KVTxnKeyDelTime: default_KVTxnKeyDelTime,
- CoreTopic: default_CoreTopic,
- LogLevel: default_LogLevel,
- Banner: default_Banner,
- RWCoreKey: default_RWCoreKey,
- RWCoreCert: default_RWCoreCert,
- RWCoreCA: default_RWCoreCA,
+ InstanceID: default_InstanceID,
+ RWCoreEndpoint: default_RWCoreEndpoint,
+ GrpcHost: default_GrpcHost,
+ GrpcPort: default_GrpcPort,
+ KafkaAdapterHost: default_KafkaAdapterHost,
+ KafkaAdapterPort: default_KafkaAdapterPort,
+ KafkaClusterHost: default_KafkaClusterHost,
+ KafkaClusterPort: default_KafkaClusterPort,
+ KVStoreType: default_KVStoreType,
+ KVStoreTimeout: default_KVStoreTimeout,
+ KVStoreHost: default_KVStoreHost,
+ KVStorePort: default_KVStorePort,
+ KVTxnKeyDelTime: default_KVTxnKeyDelTime,
+ CoreTopic: default_CoreTopic,
+ LogLevel: default_LogLevel,
+ Banner: default_Banner,
+ RWCoreKey: default_RWCoreKey,
+ RWCoreCert: default_RWCoreCert,
+ RWCoreCA: default_RWCoreCA,
+ AffinityRouterTopic: default_Affinity_Router_Topic,
}
return &rwCoreFlag
}
@@ -130,6 +133,9 @@
help = fmt.Sprintf("RW Core topic")
flag.StringVar(&(cf.CoreTopic), "rw_core_topic", default_CoreTopic, help)
+ help = fmt.Sprintf("Affinity Router topic")
+ flag.StringVar(&(cf.AffinityRouterTopic), "affinity_router_topic", default_Affinity_Router_Topic, help)
+
help = fmt.Sprintf("KV store type")
flag.StringVar(&(cf.KVStoreType), "kv_store_type", default_KVStoreType, help)
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 3ce59a7..c287c10 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -21,7 +21,7 @@
a "github.com/golang/protobuf/ptypes/any"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/kafka"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
@@ -43,7 +43,7 @@
if success {
return nil
} else {
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -170,7 +170,7 @@
return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
+func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
toTopic := kafka.CreateSubTopic(device.Type, device.Id)
args := make([]*kafka.KVArg, 1)
@@ -183,14 +183,14 @@
success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
if success {
- unpackResult := &ca.SwitchCapability{}
+ unpackResult := &ic.SwitchCapability{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
return unpackResult, nil
} else {
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -201,7 +201,7 @@
}
}
-func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
+func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
toTopic := kafka.CreateSubTopic(device.Type, device.Id)
args := make([]*kafka.KVArg, 2)
@@ -209,7 +209,7 @@
Key: "device",
Value: device,
}
- pNo := &ca.IntType{Val: int64(portNo)}
+ pNo := &ic.IntType{Val: int64(portNo)}
args[1] = &kafka.KVArg{
Key: "port_no",
Value: pNo,
@@ -219,14 +219,14 @@
success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
if success {
- unpackResult := &ca.PortCapability{}
+ unpackResult := &ic.PortCapability{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
return unpackResult, nil
} else {
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -303,13 +303,13 @@
log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
toTopic := kafka.CreateSubTopic(deviceType, deviceId)
rpc := "receive_packet_out"
- dId := &ca.StrType{Val: deviceId}
+ dId := &ic.StrType{Val: deviceId}
args := make([]*kafka.KVArg, 3)
args[0] = &kafka.KVArg{
Key: "deviceId",
Value: dId,
}
- op := &ca.IntType{Val: int64(outPort)}
+ op := &ic.IntType{Val: int64(outPort)}
args[1] = &kafka.KVArg{
Key: "outPort",
Value: op,
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 40563d4..d7e1b0a 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -22,7 +22,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -47,7 +47,7 @@
return &proxy
}
-func (rhp *AdapterRequestHandlerProxy) Register(args []*ca.Argument) (*voltha.CoreInstance, error) {
+func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -78,7 +78,7 @@
return &voltha.CoreInstance{InstanceId: rhp.coreInstanceId}, nil
}
-func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
if len(args) != 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -99,6 +99,7 @@
if device, err := rhp.deviceMgr.GetDevice(pID.Id); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
} else {
+ log.Debugw("GetDevice-response", log.Fields{"deviceId": pID.Id})
return device, nil
}
}
@@ -122,7 +123,7 @@
return cloned, nil
}
-func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) != 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -151,7 +152,7 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ic.Argument) (*voltha.Device, error) {
if len(args) < 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -170,14 +171,14 @@
return nil, nil
}
-func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ca.Argument) (*voltha.Ports, error) {
+func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- pt := &ca.IntType{}
+ pt := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -202,7 +203,7 @@
return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
}
-func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Device, error) {
if len(args) != 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -225,7 +226,7 @@
// ChildDeviceDetected is invoked when a child device is detected. The following
// parameters are expected:
// {parent_device_id, parent_port_no, child_device_type, proxy_address, admin_state, **kw)
-func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 4 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -233,9 +234,9 @@
}
pID := &voltha.ID{}
- portNo := &ca.IntType{}
- dt := &ca.StrType{}
- chnlId := &ca.IntType{}
+ portNo := &ic.IntType{}
+ dt := &ic.StrType{}
+ chnlId := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "parent_device_id":
@@ -272,15 +273,15 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- operStatus := &ca.IntType{}
- connStatus := &ca.IntType{}
+ operStatus := &ic.IntType{}
+ connStatus := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -309,15 +310,15 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- operStatus := &ca.IntType{}
- connStatus := &ca.IntType{}
+ operStatus := &ic.IntType{}
+ connStatus := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -347,16 +348,16 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- portType := &ca.IntType{}
- portNo := &ca.IntType{}
- operStatus := &ca.IntType{}
+ portType := &ic.IntType{}
+ portNo := &ic.IntType{}
+ operStatus := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -390,7 +391,7 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -423,14 +424,14 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
pmConfigs := &voltha.PmConfigs{}
- init := &ca.BoolType{}
+ init := &ic.BoolType{}
for _, arg := range args {
switch arg.Key {
case "device_pm_config":
@@ -458,15 +459,15 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- portNo := &ca.IntType{}
- packet := &ca.Packet{}
+ portNo := &ic.IntType{}
+ packet := &ic.Packet{}
for _, arg := range args {
switch arg.Key {
case "device_id":
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 7423563..0908146 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -131,7 +131,8 @@
kafka.InterContainerHost(core.config.KafkaAdapterHost),
kafka.InterContainerPort(core.config.KafkaAdapterPort),
kafka.MsgClient(core.kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic})); err != nil {
+ kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic}),
+ kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: core.config.AffinityRouterTopic})); err != nil {
log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
return err
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 7e7f42a..784b506 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -21,7 +21,7 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
- "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
ofp "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
fu "github.com/opencord/voltha-go/rw_core/utils"
@@ -326,12 +326,12 @@
// getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
// parent device
-func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*core_adapter.SwitchCapability, error) {
+func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceId})
if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
return nil, err
} else {
- var switchCap *core_adapter.SwitchCapability
+ var switchCap *ic.SwitchCapability
var err error
if switchCap, err = agent.adapterProxy.GetOfpDeviceInfo(ctx, device); err != nil {
log.Debugw("getSwitchCapability-error", log.Fields{"id": device.Id, "error": err})
@@ -343,12 +343,12 @@
// getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
// device
-func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*core_adapter.PortCapability, error) {
+func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceId})
if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
return nil, err
} else {
- var portCap *core_adapter.PortCapability
+ var portCap *ic.PortCapability
var err error
if portCap, err = agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo); err != nil {
log.Debugw("getPortCapability-error", log.Fields{"id": device.Id, "error": err})
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 6f4a874..45584a1 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -22,7 +22,7 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/kafka"
- "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
ofp "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
@@ -249,7 +249,7 @@
return status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*core_adapter.SwitchCapability, error) {
+func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*ic.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getSwitchCapability(ctx)
@@ -266,7 +266,7 @@
}
-func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*core_adapter.PortCapability, error) {
+func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getPortCapability(ctx, portNo)
@@ -340,9 +340,12 @@
// Activate the child device
if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
- return agent.enableDevice(nil)
+ go agent.enableDevice(nil)
}
+ // Publish on the messaging bus that we have discovered new devices
+ go dMgr.kafkaICProxy.DeviceDiscovered(agent.deviceId, deviceType, parentDeviceId)
+
return nil
}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 60692e5..8a69967 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -22,7 +22,7 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
ofp "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
@@ -68,7 +68,7 @@
func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
//Build the logical device based on information retrieved from the device adapter
- var switchCap *ca.SwitchCapability
+ var switchCap *ic.SwitchCapability
var err error
if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
log.Errorw("error-creating-logical-device", log.Fields{"error": err})
@@ -88,7 +88,7 @@
if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
log.Errorw("error-creating-logical-port", log.Fields{"error": err})
}
- var portCap *ca.PortCapability
+ var portCap *ic.PortCapability
for _, port := range nniPorts.Items {
log.Infow("!!!!!!!NNI PORTS", log.Fields{"NNI": port})
if portCap, err = agent.deviceMgr.getPortCapability(ctx, agent.rootDeviceId, port.PortNo); err != nil {
@@ -223,7 +223,6 @@
return nil
}
-
// getLogicalDeviceWithoutLock retrieves a logical device from the model without locking it. This is used only by
// functions that have already acquired the logical device lock to the model
func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
@@ -240,7 +239,7 @@
func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
log.Infow("addUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
- var portCap *ca.PortCapability
+ var portCap *ic.PortCapability
var err error
//Get UNI port number
@@ -327,7 +326,6 @@
"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
}
-
//updateFlowGroupsWithoutLock updates the flows in the logical device without locking the logical device. This function
//must only be called by a function that is holding the lock on the logical device
func (agent *LogicalDeviceAgent) updateFlowGroupsWithoutLock(groups []*ofp.OfpGroupEntry) error {
@@ -737,7 +735,7 @@
func (agent *LogicalDeviceAgent) GetRoute(ingressPortNo uint32, egressPortNo uint32) []graph.RouteHop {
log.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
// Get the updated logical device
- var ld *ca.LogicalDevice
+ var ld *ic.LogicalDevice
routes := make([]graph.RouteHop, 0)
var err error
if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
diff --git a/rw_core/main.go b/rw_core/main.go
index 472072a..dbb82b0 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -23,7 +23,7 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/kafka"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/rw_core/config"
c "github.com/opencord/voltha-go/rw_core/core"
"os"
@@ -43,7 +43,7 @@
kafkaClient kafka.Client
core *c.Core
//For test
- receiverChannels []<-chan *ca.InterContainerMessage
+ receiverChannels []<-chan *ic.InterContainerMessage
}
func init() {
@@ -73,7 +73,7 @@
kafka.ProducerReturnOnErrors(true),
kafka.ProducerReturnOnSuccess(true),
kafka.ProducerMaxRetries(6),
- kafka.ProducerRetryBackoff(time.Millisecond * 30)), nil
+ kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
}
return nil, errors.New("unsupported-client-type")
}
@@ -83,7 +83,7 @@
rwCore.config = cf
rwCore.halted = false
rwCore.exitChannel = make(chan int, 1)
- rwCore.receiverChannels = make([]<-chan *ca.InterContainerMessage, 0)
+ rwCore.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
return &rwCore
}
diff --git a/tests/kafka/kafka_client_test.go b/tests/kafka/kafka_client_test.go
index 76d63c6..12f0ae4 100644
--- a/tests/kafka/kafka_client_test.go
+++ b/tests/kafka/kafka_client_test.go
@@ -22,7 +22,7 @@
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
kk "github.com/opencord/voltha-go/kafka"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/stretchr/testify/assert"
"os"
"testing"
@@ -63,7 +63,7 @@
numMessageToSend = 1
}
-func waitForMessage(ch <-chan *ca.InterContainerMessage, doneCh chan string, maxMessages int) {
+func waitForMessage(ch <-chan *ic.InterContainerMessage, doneCh chan string, maxMessages int) {
totalTime = 0
totalMessageReceived = 0
mytime := time.Now()
@@ -92,17 +92,17 @@
func sendMessages(topic *kk.Topic, numMessages int, fn sendToKafka) error {
// Loop for numMessages
for i := 0; i < numMessages; i++ {
- msg := &ca.InterContainerMessage{}
- msg.Header = &ca.Header{
+ msg := &ic.InterContainerMessage{}
+ msg.Header = &ic.Header{
Id: uuid.New().String(),
- Type: ca.MessageType_REQUEST,
+ Type: ic.MessageType_REQUEST,
FromTopic: topic.Name,
ToTopic: topic.Name,
Timestamp: time.Now().UnixNano(),
}
var marshalledArg *any.Any
var err error
- body := &ca.InterContainerRequestBody{Rpc: "testRPC", Args: []*ca.Argument{}}
+ body := &ic.InterContainerRequestBody{Rpc: "testRPC", Args: []*ic.Argument{}}
if marshalledArg, err = ptypes.MarshalAny(body); err != nil {
log.Warnw("cannot-marshal-request", log.Fields{"error": err})
return err
@@ -116,7 +116,7 @@
}
func runWithPartionConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
- var ch <-chan *ca.InterContainerMessage
+ var ch <-chan *ic.InterContainerMessage
var err error
if ch, err = partionClient.Subscribe(topic); err != nil {
return nil
@@ -130,7 +130,7 @@
}
func runWithGroupConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
- var ch <-chan *ca.InterContainerMessage
+ var ch <-chan *ic.InterContainerMessage
var err error
if ch, err = groupClient.Subscribe(topic); err != nil {
return nil
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
index 0293d6d..57e7ab9 100644
--- a/tests/kafka/kafka_inter_container_messaging_test.go
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -21,10 +21,11 @@
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
kk "github.com/opencord/voltha-go/kafka"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/protos/voltha"
rhp "github.com/opencord/voltha-go/rw_core/core"
"github.com/stretchr/testify/assert"
+ "os"
"testing"
"time"
)
@@ -35,27 +36,42 @@
var coreKafkaProxy *kk.InterContainerProxy
var adapterKafkaProxy *kk.InterContainerProxy
+var kafkaPartitionClient kk.Client
+var affinityRouterTopic string
+var hostIP string
+var kafkaClient kk.Client
func init() {
log.AddPackage(log.JSON, log.ErrorLevel, nil)
log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
log.SetAllLogLevel(log.ErrorLevel)
- kafkaClient := kk.NewSaramaClient(
- kk.Host("10.176.212.108"),
+ affinityRouterTopic = "AffinityRouter"
+ hostIP = os.Getenv("DOCKER_HOST_IP")
+ kafkaClient = kk.NewSaramaClient(
+ kk.Host(hostIP),
kk.Port(9092))
coreKafkaProxy, _ = kk.NewInterContainerProxy(
- kk.InterContainerHost("10.176.212.108"),
+ kk.InterContainerHost(hostIP),
kk.InterContainerPort(9092),
kk.DefaultTopic(&kk.Topic{Name: "Core"}),
- kk.MsgClient(kafkaClient))
+ kk.MsgClient(kafkaClient),
+ kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
adapterKafkaProxy, _ = kk.NewInterContainerProxy(
- kk.InterContainerHost("10.176.212.108"),
+ kk.InterContainerHost(hostIP),
kk.InterContainerPort(9092),
kk.DefaultTopic(&kk.Topic{Name: "Adapter"}),
kk.MsgClient(kafkaClient))
+ kafkaPartitionClient = kk.NewSaramaClient(
+ kk.ConsumerType(kk.PartitionConsumer),
+ kk.Host(hostIP),
+ kk.Port(9092),
+ kk.AutoCreateTopic(true),
+ kk.ProducerFlushFrequency(5))
+ kafkaPartitionClient.Start()
+
coreKafkaProxy.Start()
adapterKafkaProxy.Start()
subscribeTarget(coreKafkaProxy)
@@ -67,11 +83,11 @@
kmp.SubscribeWithRequestHandlerInterface(topic, requestProxy)
}
-func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
+func waitForRPCMessage(topic kk.Topic, ch <-chan *ic.InterContainerMessage, doneCh chan string) {
for msg := range ch {
log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
// Unpack message
- requestBody := &ca.InterContainerRequestBody{}
+ requestBody := &ic.InterContainerRequestBody{}
if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
doneCh <- "Error"
} else {
@@ -106,8 +122,8 @@
// // First subscribe to the specific topic
// //log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
// var err error
-// var ch1 <-chan *ca.InterContainerMessage
-// var ch2 <-chan *ca.InterContainerMessage
+// var ch1 <-chan *ic.InterContainerMessage
+// var ch2 <-chan *ic.InterContainerMessage
// topic := kk.Topic{Name: "Core"}
// ch1, err = coreKafkaProxy.Subscribe(topic)
// assert.NotNil(t, ch1)
@@ -159,7 +175,7 @@
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
@@ -177,12 +193,12 @@
rpc := "GetDevice"
topic := kk.Topic{Name: "Core"}
start := time.Now()
- status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
@@ -204,7 +220,7 @@
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
start := time.Now()
- status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic,true, args...)
+ status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
@@ -229,11 +245,11 @@
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
start := time.Now()
- status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic,true, args...)
+ status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
@@ -252,7 +268,7 @@
topic := kk.Topic{Name: "Core"}
expectedResponse := &voltha.Device{Id: trnsId}
start := time.Now()
- status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
@@ -275,7 +291,7 @@
topic := kk.Topic{Name: "Core"}
expectedResponse := &voltha.Device{Id: trnsId}
start := time.Now()
- status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
@@ -294,7 +310,7 @@
Key: "deviceID",
Value: protoArg1,
}
- protoArg2 := &ca.IntType{Val: 1}
+ protoArg2 := &ic.IntType{Val: 1}
args[1] = &kk.KVArg{
Key: "portType",
Value: protoArg2,
@@ -302,7 +318,7 @@
rpc := "GetPorts"
topic := kk.Topic{Name: "Core"}
start := time.Now()
- status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
@@ -325,12 +341,12 @@
rpc := "GetPorts"
topic := kk.Topic{Name: "Core"}
start := time.Now()
- status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
@@ -339,18 +355,18 @@
func TestChildDeviceDetected(t *testing.T) {
trnsId := uuid.New().String()
- protoArg1 := &ca.StrType{Val: trnsId}
+ protoArg1 := &ic.StrType{Val: trnsId}
args := make([]*kk.KVArg, 5)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoArg1,
}
- protoArg2 := &ca.IntType{Val: 1}
+ protoArg2 := &ic.IntType{Val: 1}
args[1] = &kk.KVArg{
Key: "parentPortNo",
Value: protoArg2,
}
- protoArg3 := &ca.StrType{Val: "great_onu"}
+ protoArg3 := &ic.StrType{Val: "great_onu"}
args[2] = &kk.KVArg{
Key: "childDeviceType",
Value: protoArg3,
@@ -360,7 +376,7 @@
Key: "proxyAddress",
Value: protoArg4,
}
- protoArg5 := &ca.IntType{Val: 1}
+ protoArg5 := &ic.IntType{Val: 1}
args[4] = &kk.KVArg{
Key: "portType",
Value: protoArg5,
@@ -369,7 +385,7 @@
rpc := "ChildDeviceDetected"
topic := kk.Topic{Name: "Core"}
start := time.Now()
- status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
@@ -378,18 +394,18 @@
func TestChildDeviceDetectedNoWait(t *testing.T) {
trnsId := uuid.New().String()
- protoArg1 := &ca.StrType{Val: trnsId}
+ protoArg1 := &ic.StrType{Val: trnsId}
args := make([]*kk.KVArg, 5)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoArg1,
}
- protoArg2 := &ca.IntType{Val: 1}
+ protoArg2 := &ic.IntType{Val: 1}
args[1] = &kk.KVArg{
Key: "parentPortNo",
Value: protoArg2,
}
- protoArg3 := &ca.StrType{Val: "great_onu"}
+ protoArg3 := &ic.StrType{Val: "great_onu"}
args[2] = &kk.KVArg{
Key: "childDeviceType",
Value: protoArg3,
@@ -399,7 +415,7 @@
Key: "proxyAddress",
Value: protoArg4,
}
- protoArg5 := &ca.IntType{Val: 1}
+ protoArg5 := &ic.IntType{Val: 1}
args[4] = &kk.KVArg{
Key: "portType",
Value: protoArg5,
@@ -408,7 +424,7 @@
rpc := "ChildDeviceDetected"
topic := kk.Topic{Name: "Core"}
start := time.Now()
- status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,false, args...)
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, false, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
@@ -417,18 +433,18 @@
func TestChildDeviceDetectedMissingArgs(t *testing.T) {
trnsId := uuid.New().String()
- protoArg1 := &ca.StrType{Val: trnsId}
+ protoArg1 := &ic.StrType{Val: trnsId}
args := make([]*kk.KVArg, 4)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoArg1,
}
- protoArg2 := &ca.IntType{Val: 1}
+ protoArg2 := &ic.IntType{Val: 1}
args[1] = &kk.KVArg{
Key: "parentPortNo",
Value: protoArg2,
}
- protoArg3 := &ca.StrType{Val: "great_onu"}
+ protoArg3 := &ic.StrType{Val: "great_onu"}
args[2] = &kk.KVArg{
Key: "childDeviceType",
Value: protoArg3,
@@ -437,11 +453,11 @@
rpc := "ChildDeviceDetected"
topic := kk.Topic{Name: "Core"}
start := time.Now()
- status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
@@ -457,12 +473,12 @@
Key: "device_id",
Value: protoArg1,
}
- protoArg2 := &ca.IntType{Val: 1}
+ protoArg2 := &ic.IntType{Val: 1}
args[1] = &kk.KVArg{
Key: "oper_status",
Value: protoArg2,
}
- protoArg3 := &ca.IntType{Val: 1}
+ protoArg3 := &ic.IntType{Val: 1}
args[2] = &kk.KVArg{
Key: "connect_status",
Value: protoArg3,
@@ -471,13 +487,58 @@
rpc := "DeviceStateUpdate"
topic := kk.Topic{Name: "Core"}
start := time.Now()
- status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
assert.Nil(t, result)
}
+func subscribeToTopic(topic *kk.Topic, waitingChannel chan *ic.InterContainerMessage) error {
+ var ch <-chan *ic.InterContainerMessage
+ var err error
+ if ch, err = kafkaPartitionClient.Subscribe(topic); err != nil {
+ return nil
+ }
+ msg := <-ch
+
+ log.Debugw("msg-received", log.Fields{"msg": msg})
+ waitingChannel <- msg
+ return nil
+}
+
+func TestDeviceDiscovery(t *testing.T) {
+ // Create an intercontainer proxy - similar to the Core
+ testProxy, _ := kk.NewInterContainerProxy(
+ kk.InterContainerHost(hostIP),
+ kk.InterContainerPort(9092),
+ kk.DefaultTopic(&kk.Topic{Name: "Test"}),
+ kk.MsgClient(kafkaClient),
+ kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
+
+ // First start to wait for the message
+ waitingChannel := make(chan *ic.InterContainerMessage)
+ go subscribeToTopic(&kk.Topic{Name: affinityRouterTopic}, waitingChannel)
+
+ // Sleep to make sure the consumer is ready
+ time.Sleep(time.Millisecond * 100)
+
+ // Send the message
+ go testProxy.DeviceDiscovered("TestDeviceId", "TestDevicetype", "TestParentId")
+
+ msg := <-waitingChannel
+ totalTime := (time.Now().UnixNano() - msg.Header.Timestamp) / int64(time.Millisecond)
+ assert.Equal(t, msg.Header.Type, ic.MessageType_DEVICE_DISCOVERED)
+ // Unpack message
+ dd := &ic.DeviceDiscovered{}
+ err := ptypes.UnmarshalAny(msg.Body, dd)
+ assert.Nil(t, err)
+ assert.Equal(t, dd.Id, "TestDeviceId")
+ assert.Equal(t, dd.DeviceType, "TestDevicetype")
+ assert.Equal(t, dd.ParentId, "TestParentId")
+ log.Debugw("TotalTime", log.Fields{"time": totalTime})
+}
+
func TestStopKafkaProxy(t *testing.T) {
adapterKafkaProxy.Stop()
coreKafkaProxy.Stop()