/*
 * Copyright 2018-present Open Networking Foundation

 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at

 * http://www.apache.org/licenses/LICENSE-2.0

 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package kafka

import (
	"context"
	"errors"
	"fmt"
	"github.com/golang/protobuf/proto"
	"github.com/golang/protobuf/ptypes"
	"github.com/golang/protobuf/ptypes/any"
	"github.com/google/uuid"
	"github.com/opencord/voltha-go/common/log"
	ic "github.com/opencord/voltha-go/protos/inter_container"
	"reflect"
	"sync"
	"time"
)

// Initialize the logger - gets the default until the main function setup the logger
func init() {
	log.AddPackage(log.JSON, log.WarnLevel, nil)
}

const (
	DefaultMaxRetries     = 3
	DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
)

// requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
// obtained from that channel, this interface is invoked.   This is used to handle
// async requests into the Core via the kafka messaging bus
type requestHandlerChannel struct {
	requesthandlerInterface interface{}
	ch                      <-chan *ic.InterContainerMessage
}

// transactionChannel represents a combination of a topic and a channel onto which a response received
// on the kafka bus will be sent to
type transactionChannel struct {
	topic *Topic
	ch    chan *ic.InterContainerMessage
}

// InterContainerProxy represents the messaging proxy
type InterContainerProxy struct {
	kafkaHost                      string
	kafkaPort                      int
	DefaultTopic                   *Topic
	defaultRequestHandlerInterface interface{}
	deviceDiscoveryTopic           *Topic
	kafkaClient                    Client
	doneCh                         chan int

	// This map is used to map a topic to an interface and channel.   When a request is received
	// on that channel (registered to the topic) then that interface is invoked.
	topicToRequestHandlerChannelMap   map[string]*requestHandlerChannel
	lockTopicRequestHandlerChannelMap sync.RWMutex

	// This map is used to map a channel to a response topic.   This channel handles all responses on that
	// channel for that topic and forward them to the appropriate consumers channel, using the
	// transactionIdToChannelMap.
	topicToResponseChannelMap   map[string]<-chan *ic.InterContainerMessage
	lockTopicResponseChannelMap sync.RWMutex

	// This map is used to map a transaction to a consumers channel.  This is used whenever a request has been
	// sent out and we are waiting for a response.
	transactionIdToChannelMap     map[string]*transactionChannel
	lockTransactionIdToChannelMap sync.RWMutex
}

type InterContainerProxyOption func(*InterContainerProxy)

func InterContainerHost(host string) InterContainerProxyOption {
	return func(args *InterContainerProxy) {
		args.kafkaHost = host
	}
}

func InterContainerPort(port int) InterContainerProxyOption {
	return func(args *InterContainerProxy) {
		args.kafkaPort = port
	}
}

func DefaultTopic(topic *Topic) InterContainerProxyOption {
	return func(args *InterContainerProxy) {
		args.DefaultTopic = topic
	}
}

func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
	return func(args *InterContainerProxy) {
		args.deviceDiscoveryTopic = topic
	}
}

func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
	return func(args *InterContainerProxy) {
		args.defaultRequestHandlerInterface = handler
	}
}

func MsgClient(client Client) InterContainerProxyOption {
	return func(args *InterContainerProxy) {
		args.kafkaClient = client
	}
}

func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
	proxy := &InterContainerProxy{
		kafkaHost: DefaultKafkaHost,
		kafkaPort: DefaultKafkaPort,
	}

	for _, option := range opts {
		option(proxy)
	}

	// Create the locks for all the maps
	proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
	proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
	proxy.lockTopicResponseChannelMap = sync.RWMutex{}

	return proxy, nil
}

func (kp *InterContainerProxy) Start() error {
	log.Info("Starting-Proxy")

	// Kafka MsgClient should already have been created.  If not, output fatal error
	if kp.kafkaClient == nil {
		log.Fatal("kafka-client-not-set")
	}

	// Create the Done channel
	kp.doneCh = make(chan int, 1)

	// Start the kafka client
	if err := kp.kafkaClient.Start(); err != nil {
		log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
		return err
	}

	// Create the topic to response channel map
	kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
	//
	// Create the transactionId to Channel Map
	kp.transactionIdToChannelMap = make(map[string]*transactionChannel)

	// Create the topic to request channel map
	kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)

	return nil
}

func (kp *InterContainerProxy) Stop() {
	log.Info("stopping-intercontainer-proxy")
	kp.doneCh <- 1
	// TODO : Perform cleanup
	//kp.kafkaClient.Stop()
	//kp.deleteAllTopicRequestHandlerChannelMap()
	//kp.deleteAllTopicResponseChannelMap()
	//kp.deleteAllTransactionIdToChannelMap()
}

// DeviceDiscovered publish the discovered device onto the kafka messaging bus
func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string) error {
	log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
	//	Simple validation
	if deviceId == "" || deviceType == "" {
		log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
		return errors.New("invalid-parameters")
	}
	//	Create the device discovery message
	header := &ic.Header{
		Id:        uuid.New().String(),
		Type:      ic.MessageType_DEVICE_DISCOVERED,
		FromTopic: kp.DefaultTopic.Name,
		ToTopic:   kp.deviceDiscoveryTopic.Name,
		Timestamp: time.Now().UnixNano(),
	}
	body := &ic.DeviceDiscovered{
		Id:         deviceId,
		DeviceType: deviceType,
		ParentId:   parentId,
	}

	var marshalledData *any.Any
	var err error
	if marshalledData, err = ptypes.MarshalAny(body); err != nil {
		log.Errorw("cannot-marshal-request", log.Fields{"error": err})
		return err
	}
	msg := &ic.InterContainerMessage{
		Header: header,
		Body:   marshalledData,
	}

	// Send the message
	if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
		log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
		return err
	}
	return nil
}

// InvokeRPC is used to send a request to a given topic
func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
	waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {

	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
	// typically the device ID.
	responseTopic := replyToTopic
	if responseTopic == nil {
		responseTopic = kp.DefaultTopic
	}

	// Encode the request
	protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
	if err != nil {
		log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
		return false, nil
	}

	// Subscribe for response, if needed, before sending request
	var ch <-chan *ic.InterContainerMessage
	if waitForResponse {
		var err error
		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
			log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
		}
	}

	// Send request - if the topic is formatted with a device Id then we will send the request using a
	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
	key := GetDeviceIdFromTopic(*toTopic)
	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
	go kp.kafkaClient.Send(protoRequest, toTopic, key)

	if waitForResponse {
		// Create a child context based on the parent context, if any
		var cancel context.CancelFunc
		childCtx := context.Background()
		if ctx == nil {
			ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
		} else {
			childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
		}
		defer cancel()

		// Wait for response as well as timeout or cancellation
		// Remove the subscription for a response on return
		defer kp.unSubscribeForResponse(protoRequest.Header.Id)
		select {
		case msg := <-ch:
			log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
			var responseBody *ic.InterContainerResponseBody
			var err error
			if responseBody, err = decodeResponse(msg); err != nil {
				log.Errorw("decode-response-error", log.Fields{"error": err})
			}
			return responseBody.Success, responseBody.Result
		case <-ctx.Done():
			log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
			//	 pack the error as proto any type
			protoError := &ic.Error{Reason: ctx.Err().Error()}
			var marshalledArg *any.Any
			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
				return false, nil // Should never happen
			}
			return false, marshalledArg
		case <-childCtx.Done():
			log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
			//	 pack the error as proto any type
			protoError := &ic.Error{Reason: childCtx.Err().Error()}
			var marshalledArg *any.Any
			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
				return false, nil // Should never happen
			}
			return false, marshalledArg
		case <-kp.doneCh:
			log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
			return true, nil
		}
	}
	return true, nil
}

// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
// when a message is received on a given topic
func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {

	// Subscribe to receive messages for that topic
	var ch <-chan *ic.InterContainerMessage
	var err error
	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
		//if ch, err = kp.Subscribe(topic); err != nil {
		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
	}

	kp.defaultRequestHandlerInterface = handler
	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
	// Launch a go routine to receive and process kafka messages
	go kp.waitForRequest(ch, topic, handler)

	return nil
}

// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
// when a message is received on a given topic.  So far there is only 1 target registered per microservice
func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
	// Subscribe to receive messages for that topic
	var ch <-chan *ic.InterContainerMessage
	var err error
	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
	}
	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})

	// Launch a go routine to receive and process kafka messages
	go kp.waitForRequest(ch, topic, kp.defaultRequestHandlerInterface)

	return nil
}

func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
	return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
}

// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
// responses from that topic.
func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
	kp.lockTopicResponseChannelMap.Lock()
	defer kp.lockTopicResponseChannelMap.Unlock()
	if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
		kp.topicToResponseChannelMap[topic] = arg
	}
}

func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
	kp.lockTopicResponseChannelMap.Lock()
	defer kp.lockTopicResponseChannelMap.Unlock()
	_, exist := kp.topicToResponseChannelMap[topic]
	return exist
}

func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
	kp.lockTopicResponseChannelMap.Lock()
	defer kp.lockTopicResponseChannelMap.Unlock()
	if _, exist := kp.topicToResponseChannelMap[topic]; exist {
		// Unsubscribe to this topic first - this will close the subscribed channel
		var err error
		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
			log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
		}
		delete(kp.topicToResponseChannelMap, topic)
		return err
	} else {
		return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
	}
}

func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
	kp.lockTopicResponseChannelMap.Lock()
	defer kp.lockTopicResponseChannelMap.Unlock()
	var err error
	for topic, _ := range kp.topicToResponseChannelMap {
		// Unsubscribe to this topic first - this will close the subscribed channel
		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
			log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
		}
		delete(kp.topicToResponseChannelMap, topic)
	}
	return err
}

func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
	kp.lockTopicRequestHandlerChannelMap.Lock()
	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
		kp.topicToRequestHandlerChannelMap[topic] = arg
	}
}

func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
	kp.lockTopicRequestHandlerChannelMap.Lock()
	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
		// Close the kafka client client first by unsubscribing to this topic
		kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
		delete(kp.topicToRequestHandlerChannelMap, topic)
		return nil
	} else {
		return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
	}
}

func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
	kp.lockTopicRequestHandlerChannelMap.Lock()
	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
	var err error
	for topic, _ := range kp.topicToRequestHandlerChannelMap {
		// Close the kafka client client first by unsubscribing to this topic
		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
			log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
		}
		delete(kp.topicToRequestHandlerChannelMap, topic)
	}
	return err
}

func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
	kp.lockTransactionIdToChannelMap.Lock()
	defer kp.lockTransactionIdToChannelMap.Unlock()
	if _, exist := kp.transactionIdToChannelMap[id]; !exist {
		kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
	}
}

func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
	kp.lockTransactionIdToChannelMap.Lock()
	defer kp.lockTransactionIdToChannelMap.Unlock()
	if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
		// Close the channel first
		close(transChannel.ch)
		delete(kp.transactionIdToChannelMap, id)
	}
}

func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
	kp.lockTransactionIdToChannelMap.Lock()
	defer kp.lockTransactionIdToChannelMap.Unlock()
	for key, value := range kp.transactionIdToChannelMap {
		if value.topic.Name == id {
			close(value.ch)
			delete(kp.transactionIdToChannelMap, key)
		}
	}
}

func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
	kp.lockTransactionIdToChannelMap.Lock()
	defer kp.lockTransactionIdToChannelMap.Unlock()
	for key, value := range kp.transactionIdToChannelMap {
		close(value.ch)
		delete(kp.transactionIdToChannelMap, key)
	}
}

func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
	// If we have any consumers on that topic we need to close them
	kp.deleteFromTopicResponseChannelMap(topic.Name)
	kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
	kp.deleteTopicTransactionIdToChannelMap(topic.Name)
	return kp.kafkaClient.DeleteTopic(&topic)
}

func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
	// Encode the response argument - needs to be a proto message
	if returnedVal == nil {
		return nil, nil
	}
	protoValue, ok := returnedVal.(proto.Message)
	if !ok {
		log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
		err := errors.New("response-value-not-proto-message")
		return nil, err
	}

	// Marshal the returned value, if any
	var marshalledReturnedVal *any.Any
	var err error
	if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
		log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
		return nil, err
	}
	return marshalledReturnedVal, nil
}

func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
	responseHeader := &ic.Header{
		Id:        request.Header.Id,
		Type:      ic.MessageType_RESPONSE,
		FromTopic: request.Header.ToTopic,
		ToTopic:   request.Header.FromTopic,
		Timestamp: time.Now().Unix(),
	}
	responseBody := &ic.InterContainerResponseBody{
		Success: false,
		Result:  nil,
	}
	var marshalledResponseBody *any.Any
	var err error
	// Error should never happen here
	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
		log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
	}

	return &ic.InterContainerMessage{
		Header: responseHeader,
		Body:   marshalledResponseBody,
	}

}

//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
//or an error on failure
func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
	//log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
	responseHeader := &ic.Header{
		Id:        request.Header.Id,
		Type:      ic.MessageType_RESPONSE,
		FromTopic: request.Header.ToTopic,
		ToTopic:   request.Header.FromTopic,
		Timestamp: time.Now().Unix(),
	}

	// Go over all returned values
	var marshalledReturnedVal *any.Any
	var err error
	for _, returnVal := range returnedValues {
		if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
			log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
		}
		break // for now we support only 1 returned value - (excluding the error)
	}

	responseBody := &ic.InterContainerResponseBody{
		Success: success,
		Result:  marshalledReturnedVal,
	}

	// Marshal the response body
	var marshalledResponseBody *any.Any
	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
		log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
		return nil, err
	}

	return &ic.InterContainerMessage{
		Header: responseHeader,
		Body:   marshalledResponseBody,
	}, nil
}

func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
	myClassValue := reflect.ValueOf(myClass)
	m := myClassValue.MethodByName(funcName)
	if !m.IsValid() {
		return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
	}
	in := make([]reflect.Value, len(params))
	for i, param := range params {
		in[i] = reflect.ValueOf(param)
	}
	out = m.Call(in)
	return
}

func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {

	// First extract the header to know whether this is a request - responses are handled by a different handler
	if msg.Header.Type == ic.MessageType_REQUEST {

		var out []reflect.Value
		var err error

		// Get the request body
		requestBody := &ic.InterContainerRequestBody{}
		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
			log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
		} else {
			log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
			// let the callee unpack the arguments as its the only one that knows the real proto type
			out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
			if err != nil {
				log.Warn(err)
			}
		}
		// Response required?
		if requestBody.ResponseRequired {
			// If we already have an error before then just return that
			var returnError *ic.Error
			var returnedValues []interface{}
			var success bool
			if err != nil {
				returnError = &ic.Error{Reason: err.Error()}
				returnedValues = make([]interface{}, 1)
				returnedValues[0] = returnError
			} else {
				//log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
				returnedValues = make([]interface{}, 0)
				// Check for errors first
				lastIndex := len(out) - 1
				if out[lastIndex].Interface() != nil { // Error
					if goError, ok := out[lastIndex].Interface().(error); ok {
						returnError = &ic.Error{Reason: goError.Error()}
						returnedValues = append(returnedValues, returnError)
					} else { // Should never happen
						returnError = &ic.Error{Reason: "incorrect-error-returns"}
						returnedValues = append(returnedValues, returnError)
					}
				} else { // Non-error case
					success = true
					for idx, val := range out {
						//log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
						if idx != lastIndex {
							returnedValues = append(returnedValues, val.Interface())
						}
					}
				}
			}

			var icm *ic.InterContainerMessage
			if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
				log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
				icm = encodeDefaultFailedResponse(msg)
			}
			// To preserve ordering of messages, all messages to a given topic are sent to the same partition
			// by providing a message key.   The key is encoded in the topic name.  If the deviceId is not
			// present then the key will be empty, hence all messages for a given topic will be sent to all
			// partitions.
			replyTopic := &Topic{Name: msg.Header.FromTopic}
			key := GetDeviceIdFromTopic(*replyTopic)
			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
			// TODO: handle error response.
			kp.kafkaClient.Send(icm, replyTopic, key)
		}

	}
}

func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
	//	Wait for messages
	for msg := range ch {
		//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
		go kp.handleRequest(msg, targetInterface)
	}
}

func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
	kp.lockTransactionIdToChannelMap.Lock()
	defer kp.lockTransactionIdToChannelMap.Unlock()
	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
		log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
		return
	}
	kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
}

// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
// and then dispatches to the consumers
func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
	log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
startloop:
	for {
		select {
		case msg := <-subscribedCh:
			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
			if msg.Header.Type == ic.MessageType_RESPONSE {
				go kp.dispatchResponse(msg)
			}
		case <-kp.doneCh:
			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
			break startloop
		}
	}
	//log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
	//	We got an exit signal.  Unsubscribe to the channel
	//kp.kafkaClient.UnSubscribe(topic, subscribedCh)
}

// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
// API. There is one response channel waiting for kafka messages before dispatching the message to the
// corresponding waiting channel
func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})

	// First check whether we already have a channel listening for response on that topic.  If there is
	// already one then it will be reused.  If not, it will be created.
	if !kp.isTopicSubscribedForResponse(topic.Name) {
		var subscribedCh <-chan *ic.InterContainerMessage
		var err error
		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
			log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
			return nil, err
		}
		kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
		go kp.waitForResponseLoop(subscribedCh, &topic)
	}

	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
	// broadcast any message for this topic to all channels waiting on it.
	ch := make(chan *ic.InterContainerMessage)
	kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)

	return ch, nil
}

func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
	log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
	if _, exist := kp.transactionIdToChannelMap[trnsId]; exist {
		// The delete operation will close the channel
		kp.deleteFromTransactionIdToChannelMap(trnsId)
	}
	return nil
}

//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
//or an error on failure
func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
	requestHeader := &ic.Header{
		Id:        uuid.New().String(),
		Type:      ic.MessageType_REQUEST,
		FromTopic: replyTopic.Name,
		ToTopic:   toTopic.Name,
		Timestamp: time.Now().Unix(),
	}
	requestBody := &ic.InterContainerRequestBody{
		Rpc:              rpc,
		ResponseRequired: true,
		ReplyToTopic:     replyTopic.Name,
	}

	for _, arg := range kvArgs {
		if arg == nil {
			// In case the caller sends an array with empty args
			continue
		}
		var marshalledArg *any.Any
		var err error
		// ascertain the value interface type is a proto.Message
		protoValue, ok := arg.Value.(proto.Message)
		if !ok {
			log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
			err := errors.New("argument-value-not-proto-message")
			return nil, err
		}
		if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
			log.Warnw("cannot-marshal-request", log.Fields{"error": err})
			return nil, err
		}
		protoArg := &ic.Argument{
			Key:   arg.Key,
			Value: marshalledArg,
		}
		requestBody.Args = append(requestBody.Args, protoArg)
	}

	var marshalledData *any.Any
	var err error
	if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
		log.Warnw("cannot-marshal-request", log.Fields{"error": err})
		return nil, err
	}
	request := &ic.InterContainerMessage{
		Header: requestHeader,
		Body:   marshalledData,
	}
	return request, nil
}

func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
	//	Extract the message body
	responseBody := ic.InterContainerResponseBody{}
	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
		return nil, err
	}
	//log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})

	return &responseBody, nil

}
