[VOL-1346] This commit addresses device discovery notifications
which will be principally used by the affinity router. In doing so
this commit also rename the core_adapter.proto to inter_container.proto.
Change-Id: Ib2a7b84efa50367d0ffbc482fba6096a225f3150
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index ff3584f..4a3ec92 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -24,7 +24,7 @@
"github.com/golang/protobuf/ptypes/any"
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"reflect"
"sync"
"time"
@@ -45,14 +45,14 @@
// async requests into the Core via the kafka messaging bus
type requestHandlerChannel struct {
requesthandlerInterface interface{}
- ch <-chan *ca.InterContainerMessage
+ ch <-chan *ic.InterContainerMessage
}
// transactionChannel represents a combination of a topic and a channel onto which a response received
// on the kafka bus will be sent to
type transactionChannel struct {
topic *Topic
- ch chan *ca.InterContainerMessage
+ ch chan *ic.InterContainerMessage
}
// InterContainerProxy represents the messaging proxy
@@ -61,6 +61,7 @@
kafkaPort int
DefaultTopic *Topic
defaultRequestHandlerInterface interface{}
+ deviceDiscoveryTopic *Topic
kafkaClient Client
doneCh chan int
@@ -72,7 +73,7 @@
// This map is used to map a channel to a response topic. This channel handles all responses on that
// channel for that topic and forward them to the appropriate consumers channel, using the
// transactionIdToChannelMap.
- topicToResponseChannelMap map[string]<-chan *ca.InterContainerMessage
+ topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
lockTopicResponseChannelMap sync.RWMutex
// This map is used to map a transaction to a consumers channel. This is used whenever a request has been
@@ -101,6 +102,12 @@
}
}
+func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
+ return func(args *InterContainerProxy) {
+ args.deviceDiscoveryTopic = topic
+ }
+}
+
func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
return func(args *InterContainerProxy) {
args.defaultRequestHandlerInterface = handler
@@ -149,7 +156,7 @@
}
// Create the topic to response channel map
- kp.topicToResponseChannelMap = make(map[string]<-chan *ca.InterContainerMessage)
+ kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
//
// Create the transactionId to Channel Map
kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
@@ -170,6 +177,47 @@
//kp.deleteAllTransactionIdToChannelMap()
}
+// DeviceDiscovered publish the discovered device onto the kafka messaging bus
+func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string) error {
+ log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
+ // Simple validation
+ if deviceId == "" || deviceType == "" {
+ log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
+ return errors.New("invalid-parameters")
+ }
+ // Create the device discovery message
+ header := &ic.Header{
+ Id: uuid.New().String(),
+ Type: ic.MessageType_DEVICE_DISCOVERED,
+ FromTopic: kp.DefaultTopic.Name,
+ ToTopic: kp.deviceDiscoveryTopic.Name,
+ Timestamp: time.Now().UnixNano(),
+ }
+ body := &ic.DeviceDiscovered{
+ Id: deviceId,
+ DeviceType: deviceType,
+ ParentId: parentId,
+ }
+
+ var marshalledData *any.Any
+ var err error
+ if marshalledData, err = ptypes.MarshalAny(body); err != nil {
+ log.Errorw("cannot-marshal-request", log.Fields{"error": err})
+ return err
+ }
+ msg := &ic.InterContainerMessage{
+ Header: header,
+ Body: marshalledData,
+ }
+
+ // Send the message
+ if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
+ log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
+ return err
+ }
+ return nil
+}
+
// InvokeRPC is used to send a request to a given topic
func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
@@ -189,7 +237,7 @@
}
// Subscribe for response, if needed, before sending request
- var ch <-chan *ca.InterContainerMessage
+ var ch <-chan *ic.InterContainerMessage
if waitForResponse {
var err error
if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
@@ -221,7 +269,7 @@
select {
case msg := <-ch:
log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
- var responseBody *ca.InterContainerResponseBody
+ var responseBody *ic.InterContainerResponseBody
var err error
if responseBody, err = decodeResponse(msg); err != nil {
log.Errorw("decode-response-error", log.Fields{"error": err})
@@ -230,7 +278,7 @@
case <-ctx.Done():
log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
// pack the error as proto any type
- protoError := &ca.Error{Reason: ctx.Err().Error()}
+ protoError := &ic.Error{Reason: ctx.Err().Error()}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
return false, nil // Should never happen
@@ -239,7 +287,7 @@
case <-childCtx.Done():
log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
// pack the error as proto any type
- protoError := &ca.Error{Reason: childCtx.Err().Error()}
+ protoError := &ic.Error{Reason: childCtx.Err().Error()}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
return false, nil // Should never happen
@@ -258,7 +306,7 @@
func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
// Subscribe to receive messages for that topic
- var ch <-chan *ca.InterContainerMessage
+ var ch <-chan *ic.InterContainerMessage
var err error
if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
//if ch, err = kp.Subscribe(topic); err != nil {
@@ -277,7 +325,7 @@
// when a message is received on a given topic. So far there is only 1 target registered per microservice
func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
// Subscribe to receive messages for that topic
- var ch <-chan *ca.InterContainerMessage
+ var ch <-chan *ic.InterContainerMessage
var err error
if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
@@ -296,7 +344,7 @@
// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
// responses from that topic.
-func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
@@ -376,7 +424,7 @@
return err
}
-func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
if _, exist := kp.transactionIdToChannelMap[id]; !exist {
@@ -444,15 +492,15 @@
return marshalledReturnedVal, nil
}
-func encodeDefaultFailedResponse(request *ca.InterContainerMessage) *ca.InterContainerMessage {
- responseHeader := &ca.Header{
+func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
+ responseHeader := &ic.Header{
Id: request.Header.Id,
- Type: ca.MessageType_RESPONSE,
+ Type: ic.MessageType_RESPONSE,
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
Timestamp: time.Now().Unix(),
}
- responseBody := &ca.InterContainerResponseBody{
+ responseBody := &ic.InterContainerResponseBody{
Success: false,
Result: nil,
}
@@ -463,7 +511,7 @@
log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
}
- return &ca.InterContainerMessage{
+ return &ic.InterContainerMessage{
Header: responseHeader,
Body: marshalledResponseBody,
}
@@ -472,11 +520,11 @@
//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
//or an error on failure
-func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
+func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
//log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
- responseHeader := &ca.Header{
+ responseHeader := &ic.Header{
Id: request.Header.Id,
- Type: ca.MessageType_RESPONSE,
+ Type: ic.MessageType_RESPONSE,
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
Timestamp: time.Now().Unix(),
@@ -492,7 +540,7 @@
break // for now we support only 1 returned value - (excluding the error)
}
- responseBody := &ca.InterContainerResponseBody{
+ responseBody := &ic.InterContainerResponseBody{
Success: success,
Result: marshalledReturnedVal,
}
@@ -504,7 +552,7 @@
return nil, err
}
- return &ca.InterContainerMessage{
+ return &ic.InterContainerMessage{
Header: responseHeader,
Body: marshalledResponseBody,
}, nil
@@ -524,16 +572,16 @@
return
}
-func (kp *InterContainerProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
+func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
// First extract the header to know whether this is a request - responses are handled by a different handler
- if msg.Header.Type == ca.MessageType_REQUEST {
+ if msg.Header.Type == ic.MessageType_REQUEST {
var out []reflect.Value
var err error
// Get the request body
- requestBody := &ca.InterContainerRequestBody{}
+ requestBody := &ic.InterContainerRequestBody{}
if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
} else {
@@ -547,11 +595,11 @@
// Response required?
if requestBody.ResponseRequired {
// If we already have an error before then just return that
- var returnError *ca.Error
+ var returnError *ic.Error
var returnedValues []interface{}
var success bool
if err != nil {
- returnError = &ca.Error{Reason: err.Error()}
+ returnError = &ic.Error{Reason: err.Error()}
returnedValues = make([]interface{}, 1)
returnedValues[0] = returnError
} else {
@@ -561,10 +609,10 @@
lastIndex := len(out) - 1
if out[lastIndex].Interface() != nil { // Error
if goError, ok := out[lastIndex].Interface().(error); ok {
- returnError = &ca.Error{Reason: goError.Error()}
+ returnError = &ic.Error{Reason: goError.Error()}
returnedValues = append(returnedValues, returnError)
} else { // Should never happen
- returnError = &ca.Error{Reason: "incorrect-error-returns"}
+ returnError = &ic.Error{Reason: "incorrect-error-returns"}
returnedValues = append(returnedValues, returnError)
}
} else { // Non-error case
@@ -578,7 +626,7 @@
}
}
- var icm *ca.InterContainerMessage
+ var icm *ic.InterContainerMessage
if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
icm = encodeDefaultFailedResponse(msg)
@@ -597,7 +645,7 @@
}
}
-func (kp *InterContainerProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
// Wait for messages
for msg := range ch {
//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
@@ -605,7 +653,7 @@
}
}
-func (kp *InterContainerProxy) dispatchResponse(msg *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
@@ -617,14 +665,14 @@
// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
// and then dispatches to the consumers
-func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
+func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
startloop:
for {
select {
case msg := <-subscribedCh:
//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
- if msg.Header.Type == ca.MessageType_RESPONSE {
+ if msg.Header.Type == ic.MessageType_RESPONSE {
go kp.dispatchResponse(msg)
}
case <-kp.doneCh:
@@ -641,13 +689,13 @@
// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
// API. There is one response channel waiting for kafka messages before dispatching the message to the
// corresponding waiting channel
-func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
+func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
// First check whether we already have a channel listening for response on that topic. If there is
// already one then it will be reused. If not, it will be created.
if !kp.isTopicSubscribedForResponse(topic.Name) {
- var subscribedCh <-chan *ca.InterContainerMessage
+ var subscribedCh <-chan *ic.InterContainerMessage
var err error
if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
@@ -659,7 +707,7 @@
// Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
// broadcast any message for this topic to all channels waiting on it.
- ch := make(chan *ca.InterContainerMessage)
+ ch := make(chan *ic.InterContainerMessage)
kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
return ch, nil
@@ -676,15 +724,15 @@
//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
//or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ca.InterContainerMessage, error) {
- requestHeader := &ca.Header{
+func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+ requestHeader := &ic.Header{
Id: uuid.New().String(),
- Type: ca.MessageType_REQUEST,
+ Type: ic.MessageType_REQUEST,
FromTopic: replyTopic.Name,
ToTopic: toTopic.Name,
Timestamp: time.Now().Unix(),
}
- requestBody := &ca.InterContainerRequestBody{
+ requestBody := &ic.InterContainerRequestBody{
Rpc: rpc,
ResponseRequired: true,
ReplyToTopic: replyTopic.Name,
@@ -708,7 +756,7 @@
log.Warnw("cannot-marshal-request", log.Fields{"error": err})
return nil, err
}
- protoArg := &ca.Argument{
+ protoArg := &ic.Argument{
Key: arg.Key,
Value: marshalledArg,
}
@@ -721,16 +769,16 @@
log.Warnw("cannot-marshal-request", log.Fields{"error": err})
return nil, err
}
- request := &ca.InterContainerMessage{
+ request := &ic.InterContainerMessage{
Header: requestHeader,
Body: marshalledData,
}
return request, nil
}
-func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
+func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
// Extract the message body
- responseBody := ca.InterContainerResponseBody{}
+ responseBody := ic.InterContainerResponseBody{}
if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
return nil, err