This commit consists of the following:
1) The kafka messaging proxy in Twisted python for adapters
2) Initial implementation and containerization of ponsim OLT adapter
and ponsim ONU adapter
3) Initial submission of request and response facade in both Twisted
python and Go Language
4) Initial implementation of device management and logical device management
in the Core
5) Update to the log module to allow dynamic setting of log level per
package using the gRPC API
6) Bug fixes and minor changes

Change-Id: Ia8f033da84cfd08275335bae9542802415e7bb0f
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 00086e3..c00fb60 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -33,7 +33,7 @@
 
 // Initialize the logger - gets the default until the main function setup the logger
 func init() {
-	log.GetLogger()
+	log.AddPackage(log.JSON, log.WarnLevel, nil)
 }
 
 const (
@@ -49,7 +49,7 @@
 	DefaultReturnErrors      = true
 	DefaultConsumerMaxwait   = 50
 	DefaultMaxProcessingTime = 100
-	DefaultRequestTimeout    = 50 // 50 milliseconds
+	DefaultRequestTimeout    = 200 // 200 milliseconds - to handle a wider latency range
 )
 
 type consumerChannels struct {
@@ -66,6 +66,7 @@
 	producer                      sarama.AsyncProducer
 	consumer                      sarama.Consumer
 	doneCh                        chan int
+	waitForResponseRoutineStarted bool
 	topicToConsumerChannelMap     map[string]*consumerChannels
 	transactionIdToChannelMap     map[string]chan *ca.InterContainerMessage
 	lockTopicToConsumerChannelMap sync.RWMutex
@@ -181,12 +182,15 @@
 	go kp.sendToKafkaTopic(protoRequest, topic)
 
 	if waitForResponse {
-		// if ctx is nil use a default timeout ctx to ensure we do not wait forever
+		// Create a child context based on the parent context, if any
 		var cancel context.CancelFunc
+		childCtx := context.Background()
 		if ctx == nil {
 			ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
-			defer cancel()
+		} else {
+			childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
 		}
+		defer cancel()
 
 		// Wait for response as well as timeout or cancellation
 		// Remove the subscription for a response on return
@@ -210,6 +214,15 @@
 				return false, nil // Should never happen
 			}
 			return false, marshalledArg
+		case <-childCtx.Done():
+			log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
+			//	 pack the error as proto any type
+			protoError := &ca.Error{Reason: childCtx.Err().Error()}
+			var marshalledArg *any.Any
+			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+				return false, nil // Should never happen
+			}
+			return false, marshalledArg
 		case <-kp.doneCh:
 			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name, "rpc": rpc})
 			return true, nil
@@ -270,7 +283,7 @@
 	return nil
 }
 
-func (kp *KafkaMessagingProxy) addToTopicToConsumerChannelMap(id string, arg *consumerChannels) {
+func (kp *KafkaMessagingProxy) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
 	kp.lockTopicToConsumerChannelMap.Lock()
 	defer kp.lockTopicToConsumerChannelMap.Unlock()
 	if _, exist := kp.topicToConsumerChannelMap[id]; !exist {
@@ -533,34 +546,34 @@
 				returnedValues[0] = returnError
 			} else {
 				log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
-				returnSize := 1 // Minimum array size
-				if len(out) > 1 {
-					returnSize = len(out) - 1
-				}
-				returnedValues = make([]interface{}, returnSize)
-				for idx, val := range out {
-					log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
-					if idx == 0 {
-						if val.Interface() != nil {
-							if goError, ok := out[0].Interface().(error); ok {
-								returnError = &ca.Error{Reason: goError.Error()}
-								returnedValues[0] = returnError
-							} // Else should never occur - maybe never say never?
-							break
-						} else {
-							success = true
+				returnedValues = make([]interface{}, 0)
+				// Check for errors first
+				lastIndex := len(out) - 1
+				if out[lastIndex].Interface() != nil { // Error
+					if goError, ok := out[lastIndex].Interface().(error); ok {
+						returnError = &ca.Error{Reason: goError.Error()}
+						returnedValues = append(returnedValues, returnError)
+					} else { // Should never happen
+						returnError = &ca.Error{Reason: "incorrect-error-returns"}
+						returnedValues = append(returnedValues, returnError)
+					}
+				} else { // Non-error case
+					success = true
+					for idx, val := range out {
+						log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
+						if idx != lastIndex {
+							returnedValues = append(returnedValues, val.Interface())
 						}
-					} else {
-						returnedValues[idx-1] = val.Interface()
 					}
 				}
 			}
 
 			var icm *ca.InterContainerMessage
 			if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
-				log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"erroe": err})
+				log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
 				icm = encodeDefaultFailedResponse(msg)
 			}
+			log.Debugw("sending-to-kafka", log.Fields{"msg": icm, "send-to-topic": msg.Header.FromTopic})
 			kp.sendToKafkaTopic(icm, &Topic{Name: msg.Header.FromTopic})
 		}
 
@@ -605,7 +618,7 @@
 		case err := <-consumerCh.consumer.Errors():
 			log.Warnw("consumer-error", log.Fields{"error": err})
 		case msg := <-consumerCh.consumer.Messages():
-			log.Debugw("message-received", log.Fields{"msg": msg})
+			//log.Debugw("message-received", log.Fields{"msg": msg})
 			// Since the only expected message is a proto intercontainermessage then extract it right away
 			// instead of dispatching it to the consumers
 			msgBody := msg.Value
@@ -614,9 +627,17 @@
 				log.Warnw("invalid-message", log.Fields{"error": err})
 				continue
 			}
-			log.Debugw("msg-to-consumers", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
-
-			go kp.dispatchToConsumers(consumerCh, icm)
+			if icm.Header.Type == ca.MessageType_REQUEST {
+				log.Debugw("request-received", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
+				go kp.dispatchToConsumers(consumerCh, icm)
+			} else if icm.Header.Type == ca.MessageType_RESPONSE {
+				log.Debugw("response-received", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
+				go kp.dispatchResponse(icm)
+			} else {
+				log.Debugw("unsupported-msg-received", log.Fields{"msg": *icm})
+			}
+			//// TODO:  Dispatch requests and responses separately
+			//go kp.dispatchToConsumers(consumerCh, icm)
 		case <-kp.doneCh:
 			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
 			break startloop
@@ -636,6 +657,7 @@
 
 func (kp *KafkaMessagingProxy) waitForResponse(ch chan *ca.InterContainerMessage, topic Topic) {
 	log.Debugw("starting-consuming-responses-loop", log.Fields{"topic": topic.Name})
+	kp.waitForResponseRoutineStarted = true
 startloop:
 	for {
 		select {
@@ -681,7 +703,7 @@
 	}
 
 	// Add the consumer channel to the map
-	kp.addToTopicToConsumerChannelMap(topic.Name, cc)
+	kp.addTopicToConsumerChannelMap(topic.Name, cc)
 
 	//Start a consumer to listen on that specific topic
 	go kp.consumeMessagesLoop(topic)
@@ -694,18 +716,16 @@
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
 func (kp *KafkaMessagingProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
-	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name})
+	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
 	if consumerCh := kp.getConsumerChannel(topic); consumerCh == nil {
 		log.Debugw("topic-not-subscribed", log.Fields{"topic": topic.Name})
-		var consumerListeningChannel chan *ca.InterContainerMessage
 		var err error
-		if consumerListeningChannel, err = kp.setupConsumerChannel(topic); err != nil {
+
+		if _, err = kp.setupConsumerChannel(topic); err != nil {
 			log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
 			return nil, err
 		}
-		// Start a go routine to listen to response messages over the consumer listening channel
-		go kp.waitForResponse(consumerListeningChannel, topic)
 	}
 
 	ch := make(chan *ca.InterContainerMessage)