This update addresses the following:
1.  Decouple the kafka messaging proxy from the kafka client.  This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters.  This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch.   This will be dealt in a separate
update.

Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index ab35037..4af4fb0 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -29,13 +29,13 @@
 )
 
 type AdapterProxy struct {
-	TestMode   bool
-	kafkaProxy *kafka.KafkaMessagingProxy
+	TestMode     bool
+	kafkaICProxy *kafka.InterContainerProxy
 }
 
-func NewAdapterProxy(kafkaProxy *kafka.KafkaMessagingProxy) *AdapterProxy {
+func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
 	var proxy AdapterProxy
-	proxy.kafkaProxy = kafkaProxy
+	proxy.kafkaICProxy = kafkaProxy
 	return &proxy
 }
 
@@ -54,6 +54,18 @@
 	}
 }
 
+//func kafka.CreateSubTopic(args ...string) kafka.Topic{
+//	topic := ""
+//	for index , arg := range args {
+//		if index == 0 {
+//			topic = arg
+//		} else {
+//			topic = fmt.Sprintf("%s_%s",  topic, arg)
+//		}
+//	}
+//	return kafka.Topic{Name:topic}
+//}
+
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("AdoptDevice", log.Fields{"device": device})
 	rpc := "adopt_device"
@@ -63,21 +75,35 @@
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
-	log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+	// Use a device topic for the response as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
+	if success {
+		// From now on, any unsolicited requests from the adapters for this device will come over the device topic.
+		// We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
+		if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+			log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+			return err
+		}
+	}
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "disable_device"
-	topic := kafka.Topic{Name: device.Type}
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -85,13 +111,15 @@
 func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reenable_device"
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -99,13 +127,15 @@
 func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reboot_device"
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -113,26 +143,40 @@
 func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
 	rpc := "delete_device"
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+
+	// We no longer need to have a target against that topic as we won't receive any unsolicited messages on that
+	// topic
+	if err := ap.kafkaICProxy.UnSubscribeFromRequestHandler(replyToTopic); err != nil {
+		log.Errorw("Unable-to-subscribe-from-target", log.Fields{"topic": replyToTopic, "error": err})
+		return err
+	}
+	// Now delete the topic altogether
+	ap.kafkaICProxy.DeleteTopic(replyToTopic)
+
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
 	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
 	if success {
 		unpackResult := &ca.SwitchCapability{}
@@ -155,7 +199,7 @@
 
 func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
 	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -166,8 +210,9 @@
 		Key:   "port_no",
 		Value: pNo,
 	}
-
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
 	if success {
 		unpackResult := &ca.PortCapability{}
@@ -252,15 +297,15 @@
 
 func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
 	log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
-	topic := kafka.Topic{Name: deviceType}
+	toTopic := kafka.CreateSubTopic(deviceType, deviceId)
 	rpc := "receive_packet_out"
-	dId := &ca.StrType{Val:deviceId}
+	dId := &ca.StrType{Val: deviceId}
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
 		Key:   "deviceId",
 		Value: dId,
 	}
-	op := &ca.IntType{Val:int64(outPort)}
+	op := &ca.IntType{Val: int64(outPort)}
 	args[1] = &kafka.KVArg{
 		Key:   "outPort",
 		Value: op,
@@ -271,15 +316,16 @@
 	}
 
 	// TODO:  Do we need to wait for an ACK on a packet Out?
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
 	log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
 
-
 func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
 	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -295,14 +341,16 @@
 		Value: groups,
 	}
 
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
 	log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -318,7 +366,9 @@
 		Value: groupChanges,
 	}
 
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 85e43be..40563d4 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -30,7 +30,7 @@
 
 type AdapterRequestHandlerProxy struct {
 	TestMode         bool
-	coreInstanceId	string
+	coreInstanceId   string
 	deviceMgr        *DeviceManager
 	lDeviceMgr       *LogicalDeviceManager
 	localDataProxy   *model.Proxy
@@ -458,7 +458,6 @@
 	return new(empty.Empty), nil
 }
 
-
 func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ca.Argument) (*empty.Empty, error) {
 	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
@@ -488,7 +487,7 @@
 
 		}
 	}
-	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val,  "packet": packet})
+	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val, "packet": packet})
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 6015e7f..7423563 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -34,32 +34,34 @@
 	grpcServer        *grpcserver.GrpcServer
 	grpcNBIAPIHanfler *APIHandler
 	config            *config.RWCoreFlags
-	kmp               *kafka.KafkaMessagingProxy
+	kmp               *kafka.InterContainerProxy
 	clusterDataRoot   model.Root
 	localDataRoot     model.Root
 	clusterDataProxy  *model.Proxy
 	localDataProxy    *model.Proxy
 	exitChannel       chan int
 	kvClient          kvstore.Client
+	kafkaClient       kafka.Client
 }
 
 func init() {
 	log.AddPackage(log.JSON, log.WarnLevel, nil)
 }
 
-func NewCore(id string, cf *config.RWCoreFlags, kvClient kvstore.Client) *Core {
+func NewCore(id string, cf *config.RWCoreFlags, kvClient kvstore.Client, kafkaClient kafka.Client) *Core {
 	var core Core
 	core.instanceId = id
 	core.exitChannel = make(chan int, 1)
 	core.config = cf
 	core.kvClient = kvClient
+	core.kafkaClient = kafkaClient
 
 	// Setup the KV store
 	// Do not call NewBackend constructor; it creates its own KV client
 	// Commented the backend for now until the issue between the model and the KV store
 	// is resolved.
 	//backend := model.Backend{
-	//	Client:     kvClient,
+	//	MsgClient:     kvClient,
 	//	StoreType:  cf.KVStoreType,
 	//	Host:       cf.KVStoreHost,
 	//	Port:       cf.KVStorePort,
@@ -67,8 +69,8 @@
 	//	PathPrefix: "service/voltha"}
 	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
 	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
-	core.clusterDataProxy = core.clusterDataRoot.GetProxy("/", false)
-	core.localDataProxy = core.localDataRoot.GetProxy("/", false)
+	core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
+	core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
 	return &core
 }
 
@@ -78,7 +80,7 @@
 	log.Info("values", log.Fields{"kmp": core.kmp})
 	core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy)
 	core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
-	core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.localDataProxy, core.clusterDataProxy)
+	core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.clusterDataProxy, core.localDataProxy)
 	go core.startDeviceManager(ctx)
 	go core.startLogicalDeviceManager(ctx)
 	go core.startGRPCService(ctx)
@@ -89,6 +91,11 @@
 func (core *Core) Stop(ctx context.Context) {
 	log.Info("stopping-core")
 	core.exitChannel <- 1
+	// Stop all the started services
+	core.grpcServer.Stop()
+	core.logicalDeviceMgr.stop(ctx)
+	core.deviceMgr.stop(ctx)
+	core.kmp.Stop()
 	log.Info("core-stopped")
 }
 
@@ -120,9 +127,10 @@
 	log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
 		"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
 	var err error
-	if core.kmp, err = kafka.NewKafkaMessagingProxy(
-		kafka.KafkaHost(core.config.KafkaAdapterHost),
-		kafka.KafkaPort(core.config.KafkaAdapterPort),
+	if core.kmp, err = kafka.NewInterContainerProxy(
+		kafka.InterContainerHost(core.config.KafkaAdapterHost),
+		kafka.InterContainerPort(core.config.KafkaAdapterPort),
+		kafka.MsgClient(core.kafkaClient),
 		kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic})); err != nil {
 		log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
 		return err
@@ -140,7 +148,7 @@
 func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
 	cdProxy *model.Proxy, ldProxy *model.Proxy) error {
 	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, cdProxy, ldProxy)
-	core.kmp.SubscribeWithTarget(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
+	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
 
 	log.Info("request-handlers")
 	return nil
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 92f00bf..7e7f42a 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -33,7 +33,7 @@
 
 type DeviceAgent struct {
 	deviceId         string
-	deviceType 		string
+	deviceType       string
 	lastData         *voltha.Device
 	adapterProxy     *AdapterProxy
 	deviceMgr        *DeviceManager
@@ -79,18 +79,18 @@
 	if added := agent.clusterDataProxy.Add("/devices", agent.lastData, ""); added == nil {
 		log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
 	}
-	agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
-	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
+	agent.deviceProxy = agent.clusterDataProxy.Root.CreateProxy("/devices/"+agent.deviceId, false)
+	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
 
-	agent.flowProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.flowProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/devices/%s/flows", agent.deviceId),
 		false)
-	agent.groupProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.groupProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
 		false)
 
 	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	//agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
+	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
 
 	log.Debug("device-agent-started")
 }
@@ -249,12 +249,6 @@
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		//TODO: callback will be invoked to handle this state change
-		//For now force the state transition to happen
-		if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
-			return err
-		}
 	}
 	return nil
 }
@@ -289,7 +283,8 @@
 		agent.lockDevice.Unlock()
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
-		if device.AdminState != voltha.AdminState_DISABLED {
+		if (device.AdminState != voltha.AdminState_DISABLED) &&
+			(device.AdminState != voltha.AdminState_PREPROVISIONED) {
 			log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
 			//TODO:  Needs customized error message
 			agent.lockDevice.Unlock()
@@ -311,13 +306,6 @@
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		//TODO: callback will be invoked to handle this state change
-		//For now force the state transition to happen
-		if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
-			return err
-		}
-
 	}
 	return nil
 }
@@ -379,38 +367,54 @@
 	return nil
 }
 
-
-// TODO: implement when callback from the data model is ready
 // processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
 func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
-	log.Debug("!!!!!!!!!!!!!!!!!!!!!!!!!")
-	log.Debugw("processUpdate", log.Fields{"deviceId": agent.deviceId, "args": args})
+	//// Run this callback in its own go routine
+	go func(args ...interface{}) interface{} {
+		var previous *voltha.Device
+		var current *voltha.Device
+		var ok bool
+		if len(args) == 2 {
+			if previous, ok = args[0].(*voltha.Device); !ok {
+				log.Errorw("invalid-callback-type", log.Fields{"data": args[0]})
+				return nil
+			}
+			if current, ok = args[1].(*voltha.Device); !ok {
+				log.Errorw("invalid-callback-type", log.Fields{"data": args[1]})
+				return nil
+			}
+		} else {
+			log.Errorw("too-many-args-in-callback", log.Fields{"len": len(args)})
+			return nil
+		}
+		// Perform the state transition in it's own go routine
+		agent.deviceMgr.processTransition(previous, current)
+		return nil
+	}(args...)
+
 	return nil
 }
 
 func (agent *DeviceAgent) updateDevice(device *voltha.Device) error {
 	agent.lockDevice.Lock()
-	//defer agent.lockDevice.Unlock()
+	defer agent.lockDevice.Unlock()
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
-	// Get the dev info from the model
-	if storedData, err := agent.getDeviceWithoutLock(); err != nil {
-		agent.lockDevice.Unlock()
-		return status.Errorf(codes.NotFound, "%s", device.Id)
-	} else {
-		// store the changed data
-		cloned := proto.Clone(device).(*voltha.Device)
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
-		agent.lockDevice.Unlock()
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", device.Id)
-		}
-		// Perform the state transition
-		if err := agent.deviceMgr.processTransition(storedData, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
-			return err
-		}
-		return nil
+	cloned := proto.Clone(device).(*voltha.Device)
+	afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "%s", device.Id)
 	}
+	return nil
+}
+
+func (agent *DeviceAgent) updateDeviceWithoutLock(device *voltha.Device) error {
+	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
+	cloned := proto.Clone(device).(*voltha.Device)
+	afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "%s", device.Id)
+	}
+	return nil
 }
 
 func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
@@ -439,11 +443,6 @@
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		// Perform the state transition
-		if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
-			return err
-		}
 		return nil
 	}
 }
@@ -482,11 +481,6 @@
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		// Perform the state transition
-		if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
-			return err
-		}
 		return nil
 	}
 }
@@ -561,20 +555,11 @@
 				break
 			}
 		}
-		//To track an issue when adding peer-port.
-		log.Debugw("before-peer-added", log.Fields{"device": cloned})
 		// Store the device
 		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
-		//To track an issue when adding peer-port.
-		if d, ok := afterUpdate.(*voltha.Device); ok {
-			log.Debugw("after-peer-added", log.Fields{"device": d})
-		} else {
-			log.Debug("after-peer-added-incorrect-type", log.Fields{"type": reflect.ValueOf(afterUpdate).Type()})
-		}
-
 		return nil
 	}
 }
@@ -615,7 +600,7 @@
 	groups := device.FlowGroups
 
 	// Send update to adapters
-	// Check whether the device supports incremental flow changes
+	// TODO: Check whether the device supports incremental flow changes
 	// Assume false for test
 	acceptsAddRemoveFlowUpdates := false
 	if !acceptsAddRemoveFlowUpdates {
@@ -691,7 +676,7 @@
 	flows := device.Flows
 
 	// Send update to adapters
-	// Check whether the device supports incremental flow changes
+	// TODO: Check whether the device supports incremental flow changes
 	// Assume false for test
 	acceptsAddRemoveFlowUpdates := false
 	if !acceptsAddRemoveFlowUpdates {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index c4ac343..6f4a874 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -36,19 +36,19 @@
 	deviceAgents        map[string]*DeviceAgent
 	adapterProxy        *AdapterProxy
 	logicalDeviceMgr    *LogicalDeviceManager
-	kafkaProxy          *kafka.KafkaMessagingProxy
+	kafkaICProxy        *kafka.InterContainerProxy
 	stateTransitions    *TransitionMap
 	clusterDataProxy    *model.Proxy
 	exitChannel         chan int
 	lockDeviceAgentsMap sync.RWMutex
 }
 
-func newDeviceManager(kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *DeviceManager {
+func newDeviceManager(kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy) *DeviceManager {
 	var deviceMgr DeviceManager
 	deviceMgr.exitChannel = make(chan int, 1)
 	deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
-	deviceMgr.adapterProxy = NewAdapterProxy(kafkaProxy)
-	deviceMgr.kafkaProxy = kafkaProxy
+	deviceMgr.adapterProxy = NewAdapterProxy(kafkaICProxy)
+	deviceMgr.kafkaICProxy = kafkaICProxy
 	deviceMgr.clusterDataProxy = cdProxy
 	deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
 	return &deviceMgr
@@ -327,7 +327,7 @@
 	//Get parent device type
 	parent, err := dMgr.GetDevice(parentDeviceId)
 	if err != nil {
-		log.Error("no-parent-found", log.Fields{"parentId":parentDeviceId})
+		log.Error("no-parent-found", log.Fields{"parentId": parentDeviceId})
 		return status.Errorf(codes.NotFound, "%s", parentDeviceId)
 	}
 
@@ -350,7 +350,7 @@
 	// This will be triggered on every update to the device.
 	handlers := dMgr.stateTransitions.GetTransitionHandler(previous, current)
 	if handlers == nil {
-		log.Debugw("handlers-not-found", log.Fields{"deviceId": current.Id})
+		log.Debugw("no-op-transition", log.Fields{"deviceId": current.Id})
 		return nil
 	}
 	for _, handler := range handlers {
@@ -379,7 +379,7 @@
 		log.Errorw("device-not-found", log.Fields{"deviceId": deviceId})
 		return err
 	}
-	if !device.Root{
+	if !device.Root {
 		log.Errorw("device-not-root", log.Fields{"deviceId": deviceId})
 		return status.Errorf(codes.FailedPrecondition, "%s", deviceId)
 	}
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index b3fecc0..e46a418 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -50,24 +50,24 @@
 func NewTransitionMap(dMgr *DeviceManager) *TransitionMap {
 	var transitionMap TransitionMap
 	transitionMap.transitions = make([]Transition, 0)
-	transitionMap.transitions = append(transitionMap.transitions,
-		Transition{
-			deviceType:    any,
-			previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handlers:      []TransitionHandler{dMgr.activateDevice}})
+	//transitionMap.transitions = append(transitionMap.transitions,
+	//	Transition{
+	//		deviceType:    any,
+	//		previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+	//		currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+	//		handlers:      []TransitionHandler{dMgr.activateDevice}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			handlers:      []TransitionHandler{dMgr.notAllowed}})
-	transitionMap.transitions = append(transitionMap.transitions,
-		Transition{
-			deviceType:    any,
-			previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handlers:      []TransitionHandler{dMgr.activateDevice}})
+	//transitionMap.transitions = append(transitionMap.transitions,
+	//	Transition{
+	//		deviceType:    any,
+	//		previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+	//		currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+	//		handlers:      []TransitionHandler{dMgr.activateDevice}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 2913b51..d44ccf4 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -31,7 +31,7 @@
 	"time"
 )
 
-const MAX_RESPONSE_TIME = 500   // milliseconds
+const MAX_RESPONSE_TIME = 500 // milliseconds
 
 type APIHandler struct {
 	deviceMgr        *DeviceManager
@@ -58,9 +58,9 @@
 // and create a KV transaction for that serial number for the current core.
 func (handler *APIHandler) createKvTransaction(ctx context.Context) (*KVTransaction, error) {
 	var (
-		err error
-		ok bool
-		md metadata.MD
+		err    error
+		ok     bool
+		md     metadata.MD
 		serNum []string
 	)
 	if md, ok = metadata.FromIncomingContext(ctx); !ok {
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 4f53474..60692e5 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -107,21 +107,21 @@
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 	// Save the logical device
-	if added := agent.clusterDataProxy.Add("/logical_devices", ld, ""); added == nil {
+	if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
 		log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 	} else {
 		log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 	}
 
-	agent.flowProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.flowProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
 		false)
-	agent.groupProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.groupProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceId),
 		false)
 
 	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	//agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
+	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
 
 	return nil
 }
@@ -193,6 +193,37 @@
 	return nil
 }
 
+//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
+	cloned := proto.Clone(flows).(*ofp.Flows)
+	afterUpdate := agent.flowProxy.Update("/", cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
+	}
+	// TODO:  Remove this code when the model update is fixed
+	ld, _ := agent.getLogicalDeviceWithoutLock()
+	clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
+	clonedDevice.Flows = proto.Clone(flows).(*ofp.Flows)
+	agent.updateLogicalDeviceWithoutLock(clonedDevice)
+	return nil
+}
+
+//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
+	cloned := proto.Clone(flowGroups).(*ofp.FlowGroups)
+	afterUpdate := agent.groupProxy.Update("/", cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
+	}
+	// TODO:  Remove this code when the model update is fixed
+	ld, _ := agent.getLogicalDeviceWithoutLock()
+	clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
+	clonedDevice.FlowGroups = proto.Clone(flowGroups).(*ofp.FlowGroups)
+	agent.updateLogicalDeviceWithoutLock(clonedDevice)
+	return nil
+}
+
+
 // getLogicalDeviceWithoutLock retrieves a logical device from the model without locking it.   This is used only by
 // functions that have already acquired the logical device lock to the model
 func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
@@ -296,30 +327,16 @@
 		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
 }
 
-//updateFlowsWithoutLock updates the flows in the logical device without locking the logical device.  This function
-//must only be called by a function that is holding the lock on the logical device
-func (agent *LogicalDeviceAgent) updateFlowsWithoutLock(flows []*ofp.OfpFlowStats) error {
-	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
-		return status.Error(codes.NotFound, agent.logicalDeviceId)
-	} else {
-		flowsCloned := make([]*ofp.OfpFlowStats, len(flows))
-		copy(flowsCloned, flows)
-		ldevice.Flows.Items = flowsCloned
-		return agent.updateLogicalDeviceWithoutLock(ldevice)
-	}
-}
 
 //updateFlowGroupsWithoutLock updates the flows in the logical device without locking the logical device.  This function
 //must only be called by a function that is holding the lock on the logical device
 func (agent *LogicalDeviceAgent) updateFlowGroupsWithoutLock(groups []*ofp.OfpGroupEntry) error {
-	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
-		return status.Error(codes.NotFound, agent.logicalDeviceId)
-	} else {
-		groupsCloned := make([]*ofp.OfpGroupEntry, len(groups))
-		copy(groupsCloned, groups)
-		ldevice.FlowGroups.Items = groupsCloned
-		return agent.updateLogicalDeviceWithoutLock(ldevice)
+	groupsCloned := make([]*ofp.OfpGroupEntry, len(groups))
+	copy(groupsCloned, groups)
+	if afterUpdate := agent.groupProxy.Update("/", groupsCloned, true, ""); afterUpdate == nil {
+		return errors.New(fmt.Sprintf("update-flow-group-failed:%s", agent.logicalDeviceId))
 	}
+	return nil
 }
 
 //flowAdd adds a flow to the flow table of that logical device
@@ -343,7 +360,7 @@
 		flows = lDevice.Flows.Items
 	}
 
-	oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
+	//oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
 	changed := false
 	checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
 	if checkOverlap {
@@ -373,17 +390,17 @@
 	}
 	if changed {
 		//	Update model
-		if lDevice.Flows == nil {
-			lDevice.Flows = &ofp.Flows{}
+		flowsToUpdate := &ofp.Flows{}
+		if lDevice.Flows != nil {
+			flowsToUpdate = &ofp.Flows{Items: flows}
 		}
-		lDevice.Flows.Items = flows
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(flowsToUpdate); err != nil {
+			log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
-	// For now, force the callback to occur
-	go agent.flowTableUpdated(oldData, lDevice.Flows)
+	//// For now, force the callback to occur
+	//go agent.flowTableUpdated(oldData, lDevice.Flows)
 	return nil
 }
 
@@ -414,9 +431,8 @@
 
 	//Update flows
 	if len(toKeep) < len(flows) {
-		lDevice.Flows.Items = toKeep
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
+			log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
@@ -452,8 +468,7 @@
 
 	//Update flows
 	if len(toKeep) < len(flows) {
-		lDevice.Flows.Items = toKeep
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
@@ -488,8 +503,7 @@
 	}
 
 	if changed {
-		lDevice.Flows.Items = flows
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
@@ -523,19 +537,15 @@
 		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
 	}
 	groups := lDevice.FlowGroups.Items
-	oldData := proto.Clone(lDevice.FlowGroups).(*voltha.FlowGroups)
 	if fu.FindGroup(groups, groupMod.GroupId) == -1 {
 		groups = append(groups, fd.GroupEntryFromGroupMod(groupMod))
-		lDevice.FlowGroups.Items = groups
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+			log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	} else {
 		return errors.New(fmt.Sprintf("Groups %d already present", groupMod.GroupId))
 	}
-	// For now, force the callback to occur
-	go agent.groupTableUpdated(oldData, lDevice.FlowGroups)
 	return nil
 }
 
@@ -572,14 +582,19 @@
 			groupsChanged = true
 		}
 	}
-	if groupsChanged || flowsChanged {
-		lDevice.FlowGroups.Items = groups
-		lDevice.Flows.Items = flows
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+	if groupsChanged {
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+			log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
+	if flowsChanged {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
+			log.Errorw("Cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
+	}
+
 	return nil
 }
 
@@ -609,8 +624,8 @@
 		groupsChanged = true
 	}
 	if groupsChanged {
-		lDevice.FlowGroups.Items = groups
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
+		//lDevice.FlowGroups.Items = groups
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
@@ -939,92 +954,94 @@
 func (agent *LogicalDeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
-	//agent.lockLogicalDevice.Lock()
-	//defer agent.lockLogicalDevice.Unlock()
+	// Run this callback in it's own go routine since callbacks are not invoked in their own
+	// go routine
+	go func(args ...interface{}) interface{} {
+		//agent.lockLogicalDevice.Lock()
+		//defer agent.lockLogicalDevice.Unlock()
 
-	var previousData *ofp.Flows
-	var latestData *ofp.Flows
+		var previousData *ofp.Flows
+		var latestData *ofp.Flows
 
-	var ok bool
-	if previousData, ok = args[0].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-	}
-	if latestData, ok = args[1].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-	}
+		var ok bool
+		if previousData, ok = args[0].(*ofp.Flows); !ok {
+			log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+		}
+		if latestData, ok = args[1].(*ofp.Flows); !ok {
+			log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+		}
 
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debug("flow-update-not-required")
+		if reflect.DeepEqual(previousData.Items, latestData.Items) {
+			log.Debug("flow-update-not-required")
+			return nil
+		}
+
+		// Ensure the device graph has been setup
+		agent.setupDeviceGraph()
+
+		var groups *ofp.FlowGroups
+		lDevice, _ := agent.getLogicalDeviceWithoutLock()
+		groups = lDevice.FlowGroups
+		log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+		for deviceId, value := range deviceRules.GetRules() {
+			agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+			agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+		}
+
 		return nil
-	}
+	}(args...)
 
-	// Ensure the device graph has been setup
-	agent.setupDeviceGraph()
-
-	var groups *ofp.FlowGroups
-	lDevice, _ := agent.getLogicalDeviceWithoutLock()
-	groups = lDevice.FlowGroups
-	log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
-	//groupsIf := agent.groupProxy.Get("/", 1, false, "")
-	//if groups, ok = groupsIf.(*ofp.FlowGroups); !ok {
-	//	log.Errorw("cannot-retrieve-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "group": groupsIf})
-	//	//return errors.New("cannot-retrieve-groups")
-	//	groups = &ofp.FlowGroups{Items:nil}
-	//}
-	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
-	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-	for deviceId, value := range deviceRules.GetRules() {
-		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
-	}
 	return nil
 }
 
 func (agent *LogicalDeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
-	//agent.lockLogicalDevice.Lock()
-	//defer agent.lockLogicalDevice.Unlock()
+	// Run this callback in it's own go routine since callbacks are not invoked in their own
+	// go routine
+	go func(args ...interface{}) interface{} {
+		//agent.lockLogicalDevice.Lock()
+		//defer agent.lockLogicalDevice.Unlock()
 
-	var previousData *ofp.FlowGroups
-	var latestData *ofp.FlowGroups
+		var previousData *ofp.FlowGroups
+		var latestData *ofp.FlowGroups
 
-	var ok bool
-	if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-	}
-	if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-	}
+		var ok bool
+		if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
+			log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+		}
+		if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
+			log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+		}
 
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debug("flow-update-not-required")
+		if reflect.DeepEqual(previousData.Items, latestData.Items) {
+			log.Debug("flow-update-not-required")
+			return nil
+		}
+
+		// Ensure the device graph has been setup
+		agent.setupDeviceGraph()
+
+		var flows *ofp.Flows
+		lDevice, _ := agent.getLogicalDeviceWithoutLock()
+		flows = lDevice.Flows
+		log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+		for deviceId, value := range deviceRules.GetRules() {
+			agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+			agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+		}
 		return nil
-	}
+	}(args...)
 
-	// Ensure the device graph has been setup
-	agent.setupDeviceGraph()
-
-	var flows *ofp.Flows
-	lDevice, _ := agent.getLogicalDeviceWithoutLock()
-	flows = lDevice.Flows
-	log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
-	//flowsIf := agent.flowProxy.Get("/", 1, false, "")
-	//if flows, ok = flowsIf.(*ofp.Flows); !ok {
-	//	log.Errorw("cannot-retrieve-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "flows": flows})
-	//	//return errors.New("cannot-retrieve-groups")
-	//	flows = &ofp.Flows{Items:nil}
-	//}
-	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
-	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-	for deviceId, value := range deviceRules.GetRules() {
-		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
-	}
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) packetOut(packet *ofp.OfpPacketOut ) {
+func (agent *LogicalDeviceAgent) packetOut(packet *ofp.OfpPacketOut) {
 	log.Debugw("packet-out", log.Fields{"packet": packet.GetInPort()})
 	outPort := fd.GetPacketOutPort(packet)
 	//frame := packet.GetData()
@@ -1032,10 +1049,8 @@
 	agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet)
 }
 
-
 func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
 	log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
 	packet_in := fd.MkPacketIn(port, packet)
 	log.Debugw("sending-packet-in", log.Fields{"packet-in": packet_in})
 }
-
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 4625518..64743cc 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -33,18 +33,18 @@
 	logicalDeviceAgents        map[string]*LogicalDeviceAgent
 	deviceMgr                  *DeviceManager
 	adapterProxy               *AdapterProxy
-	kafkaProxy                 *kafka.KafkaMessagingProxy
+	kafkaICProxy               *kafka.InterContainerProxy
 	clusterDataProxy           *model.Proxy
 	exitChannel                chan int
 	lockLogicalDeviceAgentsMap sync.RWMutex
 }
 
-func newLogicalDeviceManager(deviceMgr *DeviceManager, kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
+func newLogicalDeviceManager(deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
 	var logicalDeviceMgr LogicalDeviceManager
 	logicalDeviceMgr.exitChannel = make(chan int, 1)
 	logicalDeviceMgr.logicalDeviceAgents = make(map[string]*LogicalDeviceAgent)
 	logicalDeviceMgr.deviceMgr = deviceMgr
-	logicalDeviceMgr.kafkaProxy = kafkaProxy
+	logicalDeviceMgr.kafkaICProxy = kafkaICProxy
 	logicalDeviceMgr.clusterDataProxy = cdProxy
 	logicalDeviceMgr.lockLogicalDeviceAgentsMap = sync.RWMutex{}
 	return &logicalDeviceMgr
@@ -325,7 +325,7 @@
 	sendAPIResponse(ctx, ch, res)
 }
 
-func (ldMgr *LogicalDeviceManager) packetOut( packetOut *openflow_13.PacketOut) {
+func (ldMgr *LogicalDeviceManager) packetOut(packetOut *openflow_13.PacketOut) {
 	log.Debugw("packetOut", log.Fields{"logicalDeviceId": packetOut.Id})
 	if agent := ldMgr.getLogicalDeviceAgent(packetOut.Id); agent != nil {
 		agent.packetOut(packetOut.PacketOut)
@@ -343,4 +343,3 @@
 	}
 	return nil
 }
-
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index bd0e591..a6b90aa 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -715,13 +715,13 @@
 			Type: ofp.OfpMatchType_OFPMT_OXM,
 			OxmFields: []*ofp.OfpOxmField{
 				{
-					OxmClass:ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC,
+					OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC,
 					Field: &ofp.OfpOxmField_OfbField{
 						OfbField: InPort(port)},
 				},
 			},
 		},
-		Data:packet,
+		Data: packet,
 	}
 	return packetIn
 }
diff --git a/rw_core/main.go b/rw_core/main.go
index cd5dbe9..77ce304 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -22,6 +22,7 @@
 	grpcserver "github.com/opencord/voltha-go/common/grpc"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
+	"github.com/opencord/voltha-go/kafka"
 	ca "github.com/opencord/voltha-go/protos/core_adapter"
 	"github.com/opencord/voltha-go/rw_core/config"
 	c "github.com/opencord/voltha-go/rw_core/core"
@@ -38,8 +39,9 @@
 	halted      bool
 	exitChannel chan int
 	//kmp         *kafka.KafkaMessagingProxy
-	grpcServer *grpcserver.GrpcServer
-	core       *c.Core
+	grpcServer  *grpcserver.GrpcServer
+	kafkaClient kafka.Client
+	core        *c.Core
 	//For test
 	receiverChannels []<-chan *ca.InterContainerMessage
 }
@@ -60,6 +62,18 @@
 	return nil, errors.New("unsupported-kv-store")
 }
 
+func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+
+	log.Infow("kafka-client-type", log.Fields{"client": clientType})
+	switch clientType {
+	case "sarama":
+		return kafka.NewSaramaClient(
+			kafka.Host(host),
+			kafka.Port(port)), nil
+	}
+	return nil, errors.New("unsupported-client-type")
+}
+
 func newRWCore(cf *config.RWCoreFlags) *rwCore {
 	var rwCore rwCore
 	rwCore.config = cf
@@ -92,47 +106,10 @@
 	}
 }
 
-//func (rw *rwCore) createGRPCService(context.Context) {
-//	//	create an insecure gserver server
-//	rw.grpcServer = grpcserver.NewGrpcServer(rw.config.GrpcHost, rw.config.GrpcPort, nil, false)
-//	log.Info("grpc-server-created")
-//}
-
-//func (rw *rwCore) startKafkaMessagingProxy(ctx context.Context) error {
-//	log.Infow("starting-kafka-messaging-proxy", log.Fields{"host":rw.config.KafkaAdapterHost,
-//	"port":rw.config.KafkaAdapterPort, "topic":rw.config.CoreTopic})
-//	var err error
-//	if rw.kmp, err = kafka.NewKafkaMessagingProxy(
-//		kafka.KafkaHost(rw.config.KafkaAdapterHost),
-//		kafka.KafkaPort(rw.config.KafkaAdapterPort),
-//		kafka.DefaultTopic(&kafka.Topic{Name: rw.config.CoreTopic})); err != nil {
-//		log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
-//		return err
-//	}
-//	if err = rw.kmp.Start(); err != nil {
-//		log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
-//		return err
-//	}
-//
-//	requestProxy := &c.RequestHandlerProxy{}
-//	rw.kmp.SubscribeWithTarget(kafka.Topic{Name: rw.config.CoreTopic}, requestProxy)
-//
-//	log.Info("started-kafka-messaging-proxy")
-//	return nil
-//}
-
 func (rw *rwCore) start(ctx context.Context) {
 	log.Info("Starting RW Core components")
 
-	//// Setup GRPC Server
-	//rw.createGRPCService(ctx)
-
-	//// Setup Kafka messaging services
-	//if err := rw.startKafkaMessagingProxy(ctx); err != nil {
-	//	log.Fatalw("failed-to-start-kafka-proxy", log.Fields{"err":err})
-	//}
-
-	// Setup KV Client
+	// Setup KV MsgClient
 	log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
 	err := rw.setKVClient()
 	if err == nil {
@@ -144,8 +121,12 @@
 			rw.config.KVTxnKeyDelTime)
 	}
 
+	if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort); err != nil {
+		log.Fatal("Unsupported-kafka-client")
+	}
+
 	// Create the core service
-	rw.core = c.NewCore(rw.config.InstanceID, rw.config, rw.kvClient)
+	rw.core = c.NewCore(rw.config.InstanceID, rw.config, rw.kvClient, rw.kafkaClient)
 
 	// start the core
 	rw.core.Start(ctx)
@@ -155,11 +136,6 @@
 	// Stop leadership tracking
 	rw.halted = true
 
-	//// Stop the Kafka messaging service
-	//if rw.kmp != nil {
-	//	rw.kmp.Stop()
-	//}
-
 	// send exit signal
 	rw.exitChannel <- 0
 
@@ -172,6 +148,12 @@
 		// Close the DB connection
 		rw.kvClient.Close()
 	}
+
+	rw.core.Stop(nil)
+
+	//if rw.kafkaClient != nil {
+	//	rw.kafkaClient.Stop()
+	//}
 }
 
 func waitForExit() int {