[VOL-1499] Use precreated topic

This commit migrate from dynamically created kafka topic to
pre-created topic.  The changes are made in the rw_core, simulated
onu and olt adapters, and ponsim olt and onu adapters.
TODO: move the python shared library changes into the pyvoltha
repo.

Change-Id: Ia92287ec74009872e694aa22eb896d8a6487d231
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 099d0d8..c5e0772 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -43,6 +43,7 @@
 
 const (
 	TransactionKey = "transactionID"
+	FromTopic      = "fromTopic"
 )
 
 // requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
@@ -331,7 +332,7 @@
 	kp.defaultRequestHandlerInterface = handler
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
 	// Launch a go routine to receive and process kafka messages
-	go kp.waitForRequest(ch, topic, handler)
+	go kp.waitForMessages(ch, topic, handler)
 
 	return nil
 }
@@ -342,14 +343,14 @@
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ic.InterContainerMessage
 	var err error
-	if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key:Offset, Value:initialOffset}); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 		return err
 	}
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
 
 	// Launch a go routine to receive and process kafka messages
-	go kp.waitForRequest(ch, topic, kp.defaultRequestHandlerInterface)
+	go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
 
 	return nil
 }
@@ -615,7 +616,21 @@
 	return append(currentArgs, protoArg)
 }
 
-func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
+func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+	var marshalledArg *any.Any
+	var err error
+	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
+		log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+		return currentArgs
+	}
+	protoArg := &ic.Argument{
+		Key:   FromTopic,
+		Value: marshalledArg,
+	}
+	return append(currentArgs, protoArg)
+}
+
+func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
 	if msg.Header.Type == ic.MessageType_REQUEST {
@@ -633,6 +648,11 @@
 			// Augment the requestBody with the message Id as it will be used in scenarios where cores
 			// are set in pairs and competing
 			requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
+
+			// Augment the requestBody with the From topic name as it will be used in scenarios where a container
+			// needs to send an unsollicited message to the currently requested container
+			requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
+
 			out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
 			if err != nil {
 				log.Warn(err)
@@ -660,7 +680,7 @@
 						returnError = &ic.Error{Reason: "incorrect-error-returns"}
 						returnedValues = append(returnedValues, returnError)
 					}
-				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil()  {
+				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
 					return // Ignore case - when core is in competing mode
 				} else { // Non-error case
 					success = true
@@ -688,15 +708,19 @@
 			// TODO: handle error response.
 			kp.kafkaClient.Send(icm, replyTopic, key)
 		}
-
+	} else if msg.Header.Type == ic.MessageType_RESPONSE {
+		log.Debugw("response-received", log.Fields{"msg": msg})
+		go kp.dispatchResponse(msg)
+	} else {
+		log.Warnw("unsupported-message-received", log.Fields{"msg": msg})
 	}
 }
 
-func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
 		//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
-		go kp.handleRequest(msg, targetInterface)
+		go kp.handleMessage(msg, targetInterface)
 	}
 }
 
@@ -710,33 +734,6 @@
 	kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
 }
 
-// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
-// and then dispatches to the consumers
-func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
-	log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
-startloop:
-	for {
-		select {
-		case msg, ok := <-subscribedCh:
-			if !ok {
-				log.Debugw("channel-closed", log.Fields{"topic": topic.Name})
-				break startloop
-			}
-			log.Debugw("message-received", log.Fields{"msg": msg})
-			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
-			if msg.Header.Type == ic.MessageType_RESPONSE {
-				go kp.dispatchResponse(msg)
-			}
-		case <-kp.doneCh:
-			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
-			break startloop
-		}
-	}
-	//log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
-	//	We got an exit signal.  Unsubscribe to the channel
-	//kp.kafkaClient.UnSubscribe(topic, subscribedCh)
-}
-
 // subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
@@ -744,26 +741,6 @@
 func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
 	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
-	// First check whether we already have a channel listening for response on that topic.  If there is
-	// already one then it will be reused.  If not, it will be created.
-	if !kp.isTopicSubscribedForResponse(topic.Name) {
-		log.Debugw("not-subscribed-for-response", log.Fields{"topic": topic.Name, "trnsid": trnsId})
-		var subscribedCh <-chan *ic.InterContainerMessage
-		var err error
-		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
-			log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
-			return nil, err
-		}
-		kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
-		go kp.waitForResponseLoop(subscribedCh, &topic)
-
-		//	Wait until topic is ready - it takes on average 300 ms for a topic to be created.  This is a one time
-		//	delay everything a device is created.
-		// TODO:  Implement a mechanism to determine when a topic is ready instead of relying on a timeout
-		//kp.kafkaClient.WaitForTopicToBeReady
-		time.Sleep(400 * time.Millisecond)
-	}
-
 	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
 	// broadcast any message for this topic to all channels waiting on it.
 	ch := make(chan *ic.InterContainerMessage)