[VOL-1499] Use precreated topic
This commit migrate from dynamically created kafka topic to
pre-created topic. The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.
Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 099d0d8..c5e0772 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -43,6 +43,7 @@
const (
TransactionKey = "transactionID"
+ FromTopic = "fromTopic"
)
// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
@@ -331,7 +332,7 @@
kp.defaultRequestHandlerInterface = handler
kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
// Launch a go routine to receive and process kafka messages
- go kp.waitForRequest(ch, topic, handler)
+ go kp.waitForMessages(ch, topic, handler)
return nil
}
@@ -342,14 +343,14 @@
// Subscribe to receive messages for that topic
var ch <-chan *ic.InterContainerMessage
var err error
- if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key:Offset, Value:initialOffset}); err != nil {
+ if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
return err
}
kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
// Launch a go routine to receive and process kafka messages
- go kp.waitForRequest(ch, topic, kp.defaultRequestHandlerInterface)
+ go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
return nil
}
@@ -615,7 +616,21 @@
return append(currentArgs, protoArg)
}
-func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
+func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+ var marshalledArg *any.Any
+ var err error
+ if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
+ log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+ return currentArgs
+ }
+ protoArg := &ic.Argument{
+ Key: FromTopic,
+ Value: marshalledArg,
+ }
+ return append(currentArgs, protoArg)
+}
+
+func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
// First extract the header to know whether this is a request - responses are handled by a different handler
if msg.Header.Type == ic.MessageType_REQUEST {
@@ -633,6 +648,11 @@
// Augment the requestBody with the message Id as it will be used in scenarios where cores
// are set in pairs and competing
requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
+
+ // Augment the requestBody with the From topic name as it will be used in scenarios where a container
+ // needs to send an unsollicited message to the currently requested container
+ requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
+
out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
if err != nil {
log.Warn(err)
@@ -660,7 +680,7 @@
returnError = &ic.Error{Reason: "incorrect-error-returns"}
returnedValues = append(returnedValues, returnError)
}
- } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
+ } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
return // Ignore case - when core is in competing mode
} else { // Non-error case
success = true
@@ -688,15 +708,19 @@
// TODO: handle error response.
kp.kafkaClient.Send(icm, replyTopic, key)
}
-
+ } else if msg.Header.Type == ic.MessageType_RESPONSE {
+ log.Debugw("response-received", log.Fields{"msg": msg})
+ go kp.dispatchResponse(msg)
+ } else {
+ log.Warnw("unsupported-message-received", log.Fields{"msg": msg})
}
}
-func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
// Wait for messages
for msg := range ch {
//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
- go kp.handleRequest(msg, targetInterface)
+ go kp.handleMessage(msg, targetInterface)
}
}
@@ -710,33 +734,6 @@
kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
}
-// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
-// and then dispatches to the consumers
-func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
- log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
-startloop:
- for {
- select {
- case msg, ok := <-subscribedCh:
- if !ok {
- log.Debugw("channel-closed", log.Fields{"topic": topic.Name})
- break startloop
- }
- log.Debugw("message-received", log.Fields{"msg": msg})
- //log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
- if msg.Header.Type == ic.MessageType_RESPONSE {
- go kp.dispatchResponse(msg)
- }
- case <-kp.doneCh:
- log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
- break startloop
- }
- }
- //log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
- // We got an exit signal. Unsubscribe to the channel
- //kp.kafkaClient.UnSubscribe(topic, subscribedCh)
-}
-
// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
// API. There is one response channel waiting for kafka messages before dispatching the message to the
@@ -744,26 +741,6 @@
func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
- // First check whether we already have a channel listening for response on that topic. If there is
- // already one then it will be reused. If not, it will be created.
- if !kp.isTopicSubscribedForResponse(topic.Name) {
- log.Debugw("not-subscribed-for-response", log.Fields{"topic": topic.Name, "trnsid": trnsId})
- var subscribedCh <-chan *ic.InterContainerMessage
- var err error
- if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
- log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
- return nil, err
- }
- kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
- go kp.waitForResponseLoop(subscribedCh, &topic)
-
- // Wait until topic is ready - it takes on average 300 ms for a topic to be created. This is a one time
- // delay everything a device is created.
- // TODO: Implement a mechanism to determine when a topic is ready instead of relying on a timeout
- //kp.kafkaClient.WaitForTopicToBeReady
- time.Sleep(400 * time.Millisecond)
- }
-
// Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
// broadcast any message for this topic to all channels waiting on it.
ch := make(chan *ic.InterContainerMessage)