VOL-2180 code changes for context addition
Integrating InterContainerProxy interface changes

Change-Id: Ia20c5ac3093b7845acf80cce801ec0c1d90c125f
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index 042e121..d21fdd5 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -60,15 +60,30 @@
 	ch    chan *ic.InterContainerMessage
 }
 
-// InterContainerProxy represents the messaging proxy
-type InterContainerProxy struct {
+type InterContainerProxy interface {
+	Start() error
+	Stop()
+	GetDefaultTopic() *Topic
+	DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
+	InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
+	SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
+	SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
+	UnSubscribeFromRequestHandler(topic Topic) error
+	DeleteTopic(topic Topic) error
+	EnableLivenessChannel(enable bool) chan bool
+	SendLiveness() error
+}
+
+// interContainerProxy represents the messaging proxy
+type interContainerProxy struct {
 	kafkaHost                      string
 	kafkaPort                      int
-	DefaultTopic                   *Topic
+	defaultTopic                   *Topic
 	defaultRequestHandlerInterface interface{}
 	deviceDiscoveryTopic           *Topic
 	kafkaClient                    Client
-	doneCh                         chan int
+	doneCh                         chan struct{}
+	doneOnce                       sync.Once
 
 	// This map is used to map a topic to an interface and channel.   When a request is received
 	// on that channel (registered to the topic) then that interface is invoked.
@@ -87,63 +102,63 @@
 	lockTransactionIdToChannelMap sync.RWMutex
 }
 
-type InterContainerProxyOption func(*InterContainerProxy)
+type InterContainerProxyOption func(*interContainerProxy)
 
 func InterContainerHost(host string) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
+	return func(args *interContainerProxy) {
 		args.kafkaHost = host
 	}
 }
 
 func InterContainerPort(port int) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
+	return func(args *interContainerProxy) {
 		args.kafkaPort = port
 	}
 }
 
 func DefaultTopic(topic *Topic) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
-		args.DefaultTopic = topic
+	return func(args *interContainerProxy) {
+		args.defaultTopic = topic
 	}
 }
 
 func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
+	return func(args *interContainerProxy) {
 		args.deviceDiscoveryTopic = topic
 	}
 }
 
 func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
+	return func(args *interContainerProxy) {
 		args.defaultRequestHandlerInterface = handler
 	}
 }
 
 func MsgClient(client Client) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
+	return func(args *interContainerProxy) {
 		args.kafkaClient = client
 	}
 }
 
-func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
-	proxy := &InterContainerProxy{
+func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
+	proxy := &interContainerProxy{
 		kafkaHost: DefaultKafkaHost,
 		kafkaPort: DefaultKafkaPort,
+		doneCh:    make(chan struct{}),
 	}
 
 	for _, option := range opts {
 		option(proxy)
 	}
 
-	// Create the locks for all the maps
-	proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
-	proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
-	proxy.lockTopicResponseChannelMap = sync.RWMutex{}
-
-	return proxy, nil
+	return proxy
 }
 
-func (kp *InterContainerProxy) Start() error {
+func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
+	return newInterContainerProxy(opts...)
+}
+
+func (kp *interContainerProxy) Start() error {
 	logger.Info("Starting-Proxy")
 
 	// Kafka MsgClient should already have been created.  If not, output fatal error
@@ -151,9 +166,6 @@
 		logger.Fatal("kafka-client-not-set")
 	}
 
-	// Create the Done channel
-	kp.doneCh = make(chan int, 1)
-
 	// Start the kafka client
 	if err := kp.kafkaClient.Start(); err != nil {
 		logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
@@ -172,9 +184,9 @@
 	return nil
 }
 
-func (kp *InterContainerProxy) Stop() {
+func (kp *interContainerProxy) Stop() {
 	logger.Info("stopping-intercontainer-proxy")
-	kp.doneCh <- 1
+	kp.doneOnce.Do(func() { close(kp.doneCh) })
 	// TODO : Perform cleanup
 	kp.kafkaClient.Stop()
 	//kp.deleteAllTopicRequestHandlerChannelMap()
@@ -182,8 +194,12 @@
 	//kp.deleteAllTransactionIdToChannelMap()
 }
 
+func (kp *interContainerProxy) GetDefaultTopic() *Topic {
+	return kp.defaultTopic
+}
+
 // DeviceDiscovered publish the discovered device onto the kafka messaging bus
-func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
+func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
 	logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
 	//	Simple validation
 	if deviceId == "" || deviceType == "" {
@@ -194,7 +210,7 @@
 	header := &ic.Header{
 		Id:        uuid.New().String(),
 		Type:      ic.MessageType_DEVICE_DISCOVERED,
-		FromTopic: kp.DefaultTopic.Name,
+		FromTopic: kp.defaultTopic.Name,
 		ToTopic:   kp.deviceDiscoveryTopic.Name,
 		Timestamp: time.Now().UnixNano(),
 	}
@@ -225,14 +241,14 @@
 }
 
 // InvokeRPC is used to send a request to a given topic
-func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
 	waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
 
 	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
 	// typically the device ID.
 	responseTopic := replyToTopic
 	if responseTopic == nil {
-		responseTopic = kp.DefaultTopic
+		responseTopic = kp.defaultTopic
 	}
 
 	// Encode the request
@@ -288,12 +304,14 @@
 			var err error
 			if responseBody, err = decodeResponse(msg); err != nil {
 				logger.Errorw("decode-response-error", log.Fields{"error": err})
+				// FIXME we should return something
 			}
 			return responseBody.Success, responseBody.Result
 		case <-ctx.Done():
 			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ic.Error{Reason: ctx.Err().Error()}
+			protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
@@ -302,7 +320,8 @@
 		case <-childCtx.Done():
 			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ic.Error{Reason: childCtx.Err().Error()}
+			protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
@@ -318,7 +337,7 @@
 
 // SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
 // when a message is received on a given topic
-func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
+func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
 
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ic.InterContainerMessage
@@ -339,7 +358,7 @@
 
 // SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
 // when a message is received on a given topic.  So far there is only 1 target registered per microservice
-func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
+func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ic.InterContainerMessage
 	var err error
@@ -355,13 +374,13 @@
 	return nil
 }
 
-func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
+func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
 	return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
 }
 
 // setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
 // responses from that topic.
-func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
+func (kp *interContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
@@ -369,14 +388,14 @@
 	}
 }
 
-func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
+func (kp *interContainerProxy) isTopicSubscribedForResponse(topic string) bool {
 	kp.lockTopicResponseChannelMap.RLock()
 	defer kp.lockTopicResponseChannelMap.RUnlock()
 	_, exist := kp.topicToResponseChannelMap[topic]
 	return exist
 }
 
-func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	if _, exist := kp.topicToResponseChannelMap[topic]; exist {
@@ -392,7 +411,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
+func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	var err error
@@ -406,7 +425,7 @@
 	return err
 }
 
-func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
+func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
 	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
@@ -414,7 +433,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
 	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
@@ -427,7 +446,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
 	var err error
@@ -441,7 +460,7 @@
 	return err
 }
 
-func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
+func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[id]; !exist {
@@ -449,7 +468,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
+func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
@@ -459,7 +478,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
+func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	for key, value := range kp.transactionIdToChannelMap {
@@ -470,7 +489,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
+func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	for key, value := range kp.transactionIdToChannelMap {
@@ -479,7 +498,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
+func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
 	// If we have any consumers on that topic we need to close them
 	if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
 		logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
@@ -520,7 +539,7 @@
 		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
-		Timestamp: time.Now().Unix(),
+		Timestamp: time.Now().UnixNano(),
 	}
 	responseBody := &ic.InterContainerResponseBody{
 		Success: false,
@@ -598,7 +617,7 @@
 	return
 }
 
-func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
 	arg := &KVArg{
 		Key:   TransactionKey,
 		Value: &ic.StrType{Val: transactionId},
@@ -617,7 +636,7 @@
 	return append(currentArgs, protoArg)
 }
 
-func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
 	var marshalledArg *any.Any
 	var err error
 	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
@@ -631,7 +650,7 @@
 	return append(currentArgs, protoArg)
 }
 
-func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
+func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
 	if msg.Header.Type == ic.MessageType_REQUEST {
@@ -721,7 +740,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
 		//logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
@@ -729,7 +748,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
+func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.RLock()
 	defer kp.lockTransactionIdToChannelMap.RUnlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
@@ -743,7 +762,7 @@
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
-func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
+func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
 	logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
 	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
@@ -754,21 +773,21 @@
 	return ch, nil
 }
 
-func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
+func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
 	logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
 	kp.deleteFromTransactionIdToChannelMap(trnsId)
 	return nil
 }
 
-func (kp *InterContainerProxy) EnableLivenessChannel(enable bool) chan bool {
+func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
 	return kp.kafkaClient.EnableLivenessChannel(enable)
 }
 
-func (kp *InterContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
+func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
 	return kp.kafkaClient.EnableHealthinessChannel(enable)
 }
 
-func (kp *InterContainerProxy) SendLiveness() error {
+func (kp *interContainerProxy) SendLiveness() error {
 	return kp.kafkaClient.SendLiveness()
 }