[VOL-1505]  This update enables the core to add a key when
publishing an event onto kafka.   The corresponding update is
done in the adapter GO components.   Similar changes remain to
be done in pyvoltha.

Change-Id: I0bb1e3cb8c2fa9e0214f96d863819755d34a0bb9
diff --git a/adapters/common/adapter_proxy.go b/adapters/common/adapter_proxy.go
index 4e63442..5e04484 100644
--- a/adapters/common/adapter_proxy.go
+++ b/adapters/common/adapter_proxy.go
@@ -90,7 +90,7 @@
 	replyToTopic := kafka.Topic{Name: toAdapter}
 	rpc := "Process_inter_adapter_message"
 
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
 	log.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
 	return unPackResponse(rpc, "", success, result)
 }
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index a503c97..3198111 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -108,7 +108,7 @@
 		Value: deviceTypes,
 	}
 
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
 	log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
 	return unPackResponse(rpc, "", success, result)
 }
@@ -124,7 +124,7 @@
 	}
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
 	replyToTopic := ap.getAdapterTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -148,7 +148,7 @@
 
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
 	replyToTopic := ap.getAdapterTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
 	log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
@@ -179,13 +179,13 @@
 	}
 	// Use a device specific topic as we are the only adaptercore handling requests for this device
 	replyToTopic := ap.getAdapterTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
 	log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
 
 func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
-	childDeviceType string, channelId int) error {
+	childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64 ) error {
 	log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
 	rpc := "ChildDeviceDetected"
 	// Use a device specific topic to send the request.  The adapter handling the device creates a device
@@ -193,7 +193,7 @@
 	toTopic := ap.getCoreTopic(parentDeviceId)
 	replyToTopic := ap.getAdapterTopic()
 
-	args := make([]*kafka.KVArg, 4)
+	args := make([]*kafka.KVArg, 7)
 	id := &voltha.ID{Id: parentDeviceId}
 	args[0] = &kafka.KVArg{
 		Key:   "parent_device_id",
@@ -214,8 +214,23 @@
 		Key:   "channel_id",
 		Value: channel,
 	}
+	vId := &ic.StrType{Val: vendorId}
+	args[4] = &kafka.KVArg{
+		Key:   "vendor_id",
+		Value: vId,
+	}
+	sNo := &ic.StrType{Val: serialNumber}
+	args[5] = &kafka.KVArg{
+		Key:   "serial_number",
+		Value: sNo,
+	}
+	oId := &ic.IntType{Val: int64(onuId)}
+	args[6] = &kafka.KVArg{
+		Key:   "onu_id",
+		Value: oId,
+	}
 
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
 	log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
 	return unPackResponse(rpc, parentDeviceId, success, result)
 }
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
index a8e6311..34ab711 100644
--- a/db/kvstore/client.go
+++ b/db/kvstore/client.go
@@ -84,6 +84,8 @@
 	ReleaseAllReservations() error
 	RenewReservation(key string) error
 	Watch(key string) chan *Event
+	AcquireLock(lockName string, timeout int) error
+	ReleaseLock(lockName string) error
 	CloseWatch(key string, ch chan *Event)
 	Close()
 }
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index a5c71ac..738ca92 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -497,3 +497,12 @@
 		log.Errorw("error-closing-client", log.Fields{"error": err})
 	}
 }
+
+
+func (c *ConsulClient)  AcquireLock(lockName string, timeout int) error {
+	return nil
+}
+
+func (c *ConsulClient)  ReleaseLock(lockName string) error {
+	return nil
+}
\ No newline at end of file
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 9ecddca..2caa990 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -33,6 +33,10 @@
 	keyReservations map[string]*v3Client.LeaseID
 	watchedChannels map[string][]map[chan *Event]v3Client.Watcher
 	writeLock       sync.Mutex
+	lockToMutexMap map[string]*v3Concurrency.Mutex
+	lockToSessionMap map[string]*v3Concurrency.Session
+	lockToMutexLock sync.Mutex
+
 }
 
 // NewEtcdClient returns a new client for the Etcd KV store
@@ -49,8 +53,10 @@
 	}
 	wc := make(map[string][]map[chan *Event]v3Client.Watcher)
 	reservations := make(map[string]*v3Client.LeaseID)
+	lockMutexMap := make(map[string]*v3Concurrency.Mutex)
+	lockSessionMap := make(map[string]*v3Concurrency.Session)
 
-	return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations: reservations}, nil
+	return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations:reservations, lockToMutexMap:lockMutexMap, lockToSessionMap:lockSessionMap}, nil
 }
 
 // List returns an array of key-value pairs with key as a prefix.  Timeout defines how long the function will
@@ -431,3 +437,63 @@
 		log.Errorw("error-closing-client", log.Fields{"error": err})
 	}
 }
+
+func (c *EtcdClient) addLockName(lockName string, lock *v3Concurrency.Mutex, session *v3Concurrency.Session) {
+	c.lockToMutexLock.Lock()
+	defer c.lockToMutexLock.Unlock()
+	c.lockToMutexMap[lockName] = lock
+	c.lockToSessionMap[lockName] = session
+}
+
+func (c *EtcdClient) deleteLockName(lockName string) {
+	c.lockToMutexLock.Lock()
+	defer c.lockToMutexLock.Unlock()
+	delete(c.lockToMutexMap, lockName)
+	delete(c.lockToSessionMap, lockName)
+}
+
+func (c *EtcdClient) getLock(lockName string) (*v3Concurrency.Mutex, *v3Concurrency.Session) {
+	c.lockToMutexLock.Lock()
+	defer c.lockToMutexLock.Unlock()
+	var lock *v3Concurrency.Mutex
+	var session *v3Concurrency.Session
+	if l, exist := c.lockToMutexMap[lockName]; exist {
+		lock = l
+	}
+	if s, exist := c.lockToSessionMap[lockName]; exist {
+		session = s
+	}
+	return lock, session
+}
+
+
+func (c *EtcdClient)  AcquireLock(lockName string, timeout int) error {
+	duration := GetDuration(timeout)
+	ctx, cancel := context.WithTimeout(context.Background(), duration)
+	session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+	mu := v3Concurrency.NewMutex(session, "/devicelock_" + lockName)
+	if err := mu.Lock(context.Background()); err != nil {
+		return err
+	}
+	c.addLockName(lockName, mu, session)
+	cancel()
+	return nil
+}
+
+func (c *EtcdClient)  ReleaseLock(lockName string) error {
+	lock, session := c.getLock(lockName)
+	var err error
+	if lock != nil {
+		if e := lock.Unlock(context.Background()); e != nil {
+			err = e
+		}
+	}
+	if session != nil {
+		if e := session.Close(); e != nil {
+			err = e
+		}
+	}
+	c.deleteLockName(lockName)
+
+	return err
+}
\ No newline at end of file
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index c5e0772..1229e7a 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -227,7 +227,7 @@
 
 // InvokeRPC is used to send a request to a given topic
 func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
-	waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
+	waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
 
 	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
 	// typically the device ID.
@@ -237,7 +237,7 @@
 	}
 
 	// Encode the request
-	protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
+	protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
 	if err != nil {
 		log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
 		return false, nil
@@ -255,7 +255,7 @@
 	// Send request - if the topic is formatted with a device Id then we will send the request using a
 	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
 	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
-	key := GetDeviceIdFromTopic(*toTopic)
+	//key := GetDeviceIdFromTopic(*toTopic)
 	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
 	kp.kafkaClient.Send(protoRequest, toTopic, key)
 
@@ -549,6 +549,7 @@
 		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
+		KeyTopic: request.Header.KeyTopic,
 		Timestamp: time.Now().Unix(),
 	}
 
@@ -703,7 +704,7 @@
 			// present then the key will be empty, hence all messages for a given topic will be sent to all
 			// partitions.
 			replyTopic := &Topic{Name: msg.Header.FromTopic}
-			key := GetDeviceIdFromTopic(*replyTopic)
+			key := msg.Header.KeyTopic
 			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
 			// TODO: handle error response.
 			kp.kafkaClient.Send(icm, replyTopic, key)
@@ -757,12 +758,13 @@
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
 	requestHeader := &ic.Header{
 		Id:        uuid.New().String(),
 		Type:      ic.MessageType_REQUEST,
 		FromTopic: replyTopic.Name,
 		ToTopic:   toTopic.Name,
+		KeyTopic: key,
 		Timestamp: time.Now().Unix(),
 	}
 	requestBody := &ic.InterContainerRequestBody{
diff --git a/protos/inter_container.proto b/protos/inter_container.proto
index ac2af4f..4d4ffe8 100644
--- a/protos/inter_container.proto
+++ b/protos/inter_container.proto
@@ -48,7 +48,8 @@
     MessageType type = 2;
     string from_topic = 3;
     string to_topic = 4;
-    int64 timestamp = 5;
+    string key_topic = 5;
+    int64 timestamp = 6;
 }
 
 message Argument {
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index b3ba10d..5d21838 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -93,7 +93,7 @@
 	//	}
 	//}
 	ap.deviceTopicRegistered = true
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -114,7 +114,7 @@
 	// Use a device specific topic as we are the only core handling requests for this device
 	//replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -130,7 +130,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -146,7 +146,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -162,7 +162,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 
 	// We no longer need to have this device topic as we won't receive any unsolicited messages on it
@@ -184,7 +184,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
 	if success {
 		unpackResult := &ic.SwitchCapability{}
@@ -220,7 +220,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
 	if success {
 		unpackResult := &ic.PortCapability{}
@@ -288,7 +288,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
 
 	return unPackResponse(rpc, device.Id, success, result)
@@ -309,7 +309,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
 
 	if success {
@@ -346,7 +346,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
 
 	return unPackResponse(rpc, device.Id, success, result)
@@ -367,7 +367,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 
 	return unPackResponse(rpc, device.Id, success, result)
@@ -388,7 +388,7 @@
 	}
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
 
 	return unPackResponse(rpc, device.Id, success, result)
@@ -422,7 +422,7 @@
 	// TODO:  Do we need to wait for an ACK on a packet Out?
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
 	log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
@@ -447,7 +447,7 @@
 
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -472,7 +472,7 @@
 
 	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 	log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }