VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index 042e121..d21fdd5 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -60,15 +60,30 @@
 	ch    chan *ic.InterContainerMessage
 }
 
-// InterContainerProxy represents the messaging proxy
-type InterContainerProxy struct {
+type InterContainerProxy interface {
+	Start() error
+	Stop()
+	GetDefaultTopic() *Topic
+	DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
+	InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
+	SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
+	SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
+	UnSubscribeFromRequestHandler(topic Topic) error
+	DeleteTopic(topic Topic) error
+	EnableLivenessChannel(enable bool) chan bool
+	SendLiveness() error
+}
+
+// interContainerProxy represents the messaging proxy
+type interContainerProxy struct {
 	kafkaHost                      string
 	kafkaPort                      int
-	DefaultTopic                   *Topic
+	defaultTopic                   *Topic
 	defaultRequestHandlerInterface interface{}
 	deviceDiscoveryTopic           *Topic
 	kafkaClient                    Client
-	doneCh                         chan int
+	doneCh                         chan struct{}
+	doneOnce                       sync.Once
 
 	// This map is used to map a topic to an interface and channel.   When a request is received
 	// on that channel (registered to the topic) then that interface is invoked.
@@ -87,63 +102,63 @@
 	lockTransactionIdToChannelMap sync.RWMutex
 }
 
-type InterContainerProxyOption func(*InterContainerProxy)
+type InterContainerProxyOption func(*interContainerProxy)
 
 func InterContainerHost(host string) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
+	return func(args *interContainerProxy) {
 		args.kafkaHost = host
 	}
 }
 
 func InterContainerPort(port int) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
+	return func(args *interContainerProxy) {
 		args.kafkaPort = port
 	}
 }
 
 func DefaultTopic(topic *Topic) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
-		args.DefaultTopic = topic
+	return func(args *interContainerProxy) {
+		args.defaultTopic = topic
 	}
 }
 
 func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
+	return func(args *interContainerProxy) {
 		args.deviceDiscoveryTopic = topic
 	}
 }
 
 func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
+	return func(args *interContainerProxy) {
 		args.defaultRequestHandlerInterface = handler
 	}
 }
 
 func MsgClient(client Client) InterContainerProxyOption {
-	return func(args *InterContainerProxy) {
+	return func(args *interContainerProxy) {
 		args.kafkaClient = client
 	}
 }
 
-func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
-	proxy := &InterContainerProxy{
+func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
+	proxy := &interContainerProxy{
 		kafkaHost: DefaultKafkaHost,
 		kafkaPort: DefaultKafkaPort,
+		doneCh:    make(chan struct{}),
 	}
 
 	for _, option := range opts {
 		option(proxy)
 	}
 
-	// Create the locks for all the maps
-	proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
-	proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
-	proxy.lockTopicResponseChannelMap = sync.RWMutex{}
-
-	return proxy, nil
+	return proxy
 }
 
-func (kp *InterContainerProxy) Start() error {
+func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
+	return newInterContainerProxy(opts...)
+}
+
+func (kp *interContainerProxy) Start() error {
 	logger.Info("Starting-Proxy")
 
 	// Kafka MsgClient should already have been created.  If not, output fatal error
@@ -151,9 +166,6 @@
 		logger.Fatal("kafka-client-not-set")
 	}
 
-	// Create the Done channel
-	kp.doneCh = make(chan int, 1)
-
 	// Start the kafka client
 	if err := kp.kafkaClient.Start(); err != nil {
 		logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
@@ -172,9 +184,9 @@
 	return nil
 }
 
-func (kp *InterContainerProxy) Stop() {
+func (kp *interContainerProxy) Stop() {
 	logger.Info("stopping-intercontainer-proxy")
-	kp.doneCh <- 1
+	kp.doneOnce.Do(func() { close(kp.doneCh) })
 	// TODO : Perform cleanup
 	kp.kafkaClient.Stop()
 	//kp.deleteAllTopicRequestHandlerChannelMap()
@@ -182,8 +194,12 @@
 	//kp.deleteAllTransactionIdToChannelMap()
 }
 
+func (kp *interContainerProxy) GetDefaultTopic() *Topic {
+	return kp.defaultTopic
+}
+
 // DeviceDiscovered publish the discovered device onto the kafka messaging bus
-func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
+func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
 	logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
 	//	Simple validation
 	if deviceId == "" || deviceType == "" {
@@ -194,7 +210,7 @@
 	header := &ic.Header{
 		Id:        uuid.New().String(),
 		Type:      ic.MessageType_DEVICE_DISCOVERED,
-		FromTopic: kp.DefaultTopic.Name,
+		FromTopic: kp.defaultTopic.Name,
 		ToTopic:   kp.deviceDiscoveryTopic.Name,
 		Timestamp: time.Now().UnixNano(),
 	}
@@ -225,14 +241,14 @@
 }
 
 // InvokeRPC is used to send a request to a given topic
-func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
 	waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
 
 	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
 	// typically the device ID.
 	responseTopic := replyToTopic
 	if responseTopic == nil {
-		responseTopic = kp.DefaultTopic
+		responseTopic = kp.defaultTopic
 	}
 
 	// Encode the request
@@ -288,12 +304,14 @@
 			var err error
 			if responseBody, err = decodeResponse(msg); err != nil {
 				logger.Errorw("decode-response-error", log.Fields{"error": err})
+				// FIXME we should return something
 			}
 			return responseBody.Success, responseBody.Result
 		case <-ctx.Done():
 			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ic.Error{Reason: ctx.Err().Error()}
+			protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
@@ -302,7 +320,8 @@
 		case <-childCtx.Done():
 			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ic.Error{Reason: childCtx.Err().Error()}
+			protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
@@ -318,7 +337,7 @@
 
 // SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
 // when a message is received on a given topic
-func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
+func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
 
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ic.InterContainerMessage
@@ -339,7 +358,7 @@
 
 // SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
 // when a message is received on a given topic.  So far there is only 1 target registered per microservice
-func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
+func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ic.InterContainerMessage
 	var err error
@@ -355,13 +374,13 @@
 	return nil
 }
 
-func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
+func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
 	return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
 }
 
 // setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
 // responses from that topic.
-func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
+func (kp *interContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
@@ -369,14 +388,14 @@
 	}
 }
 
-func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
+func (kp *interContainerProxy) isTopicSubscribedForResponse(topic string) bool {
 	kp.lockTopicResponseChannelMap.RLock()
 	defer kp.lockTopicResponseChannelMap.RUnlock()
 	_, exist := kp.topicToResponseChannelMap[topic]
 	return exist
 }
 
-func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	if _, exist := kp.topicToResponseChannelMap[topic]; exist {
@@ -392,7 +411,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
+func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	var err error
@@ -406,7 +425,7 @@
 	return err
 }
 
-func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
+func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
 	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
@@ -414,7 +433,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
 	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
@@ -427,7 +446,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
 	var err error
@@ -441,7 +460,7 @@
 	return err
 }
 
-func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
+func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[id]; !exist {
@@ -449,7 +468,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
+func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
@@ -459,7 +478,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
+func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	for key, value := range kp.transactionIdToChannelMap {
@@ -470,7 +489,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
+func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	for key, value := range kp.transactionIdToChannelMap {
@@ -479,7 +498,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
+func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
 	// If we have any consumers on that topic we need to close them
 	if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
 		logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
@@ -520,7 +539,7 @@
 		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
-		Timestamp: time.Now().Unix(),
+		Timestamp: time.Now().UnixNano(),
 	}
 	responseBody := &ic.InterContainerResponseBody{
 		Success: false,
@@ -598,7 +617,7 @@
 	return
 }
 
-func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
 	arg := &KVArg{
 		Key:   TransactionKey,
 		Value: &ic.StrType{Val: transactionId},
@@ -617,7 +636,7 @@
 	return append(currentArgs, protoArg)
 }
 
-func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
 	var marshalledArg *any.Any
 	var err error
 	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
@@ -631,7 +650,7 @@
 	return append(currentArgs, protoArg)
 }
 
-func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
+func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
 	if msg.Header.Type == ic.MessageType_REQUEST {
@@ -721,7 +740,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
 		//logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
@@ -729,7 +748,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
+func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.RLock()
 	defer kp.lockTransactionIdToChannelMap.RUnlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
@@ -743,7 +762,7 @@
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
-func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
+func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
 	logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
 	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
@@ -754,21 +773,21 @@
 	return ch, nil
 }
 
-func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
+func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
 	logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
 	kp.deleteFromTransactionIdToChannelMap(trnsId)
 	return nil
 }
 
-func (kp *InterContainerProxy) EnableLivenessChannel(enable bool) chan bool {
+func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
 	return kp.kafkaClient.EnableLivenessChannel(enable)
 }
 
-func (kp *InterContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
+func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
 	return kp.kafkaClient.EnableHealthinessChannel(enable)
 }
 
-func (kp *InterContainerProxy) SendLiveness() error {
+func (kp *interContainerProxy) SendLiveness() error {
 	return kp.kafkaClient.SendLiveness()
 }