[VOL-1462] Sync data between two voltha cores in the same pair

This commit consists of the following updates:
1) Background data syncing between two cores after a transaction
is completed by one core.
2) Add transaction management to southbound APIs (adapter facing).
This is enabled got adapter registration only for now.
3) Fix an issue with flow decomposition
4) Add the rough-in to allow a packet to be send to an OFAgent
with a transaction ID.  Two cores can therefore send the same
packet and let the OFAgent discard the duplicate.  The work in
OFAgent remains.
5) Cleanups

Change-Id: Ibe9d75edb66cfd6a0954bdfeb16a7e7c8a3c53b6
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 05d0af5..4359f7d 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -41,6 +41,10 @@
 	DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
 )
 
+const (
+	TransactionKey = "transactionID"
+)
+
 // requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
 // obtained from that channel, this interface is invoked.   This is used to handle
 // async requests into the Core via the kafka messaging bus
@@ -592,6 +596,25 @@
 	return
 }
 
+func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+	arg := &KVArg{
+		Key:   TransactionKey,
+		Value: &ic.StrType{Val: transactionId},
+	}
+
+	var marshalledArg *any.Any
+	var err error
+	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
+		log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+		return currentArgs
+	}
+	protoArg := &ic.Argument{
+		Key:   arg.Key,
+		Value: marshalledArg,
+	}
+	return append(currentArgs, protoArg)
+}
+
 func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
@@ -607,6 +630,9 @@
 		} else {
 			log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
 			// let the callee unpack the arguments as its the only one that knows the real proto type
+			// Augment the requestBody with the message Id as it will be used in scenarios where cores
+			// are set in pairs and competing
+			requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
 			out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
 			if err != nil {
 				log.Warn(err)
@@ -623,7 +649,6 @@
 				returnedValues = make([]interface{}, 1)
 				returnedValues[0] = returnError
 			} else {
-				//log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
 				returnedValues = make([]interface{}, 0)
 				// Check for errors first
 				lastIndex := len(out) - 1
@@ -635,6 +660,8 @@
 						returnError = &ic.Error{Reason: "incorrect-error-returns"}
 						returnedValues = append(returnedValues, returnError)
 					}
+				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil()  {
+					return // Ignore case - when core is in competing mode
 				} else { // Non-error case
 					success = true
 					for idx, val := range out {