This update addresses the following:
1.  Decouple the kafka messaging proxy from the kafka client.  This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters.  This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch.   This will be dealt in a separate
update.

Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 4f53474..60692e5 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -107,21 +107,21 @@
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 	// Save the logical device
-	if added := agent.clusterDataProxy.Add("/logical_devices", ld, ""); added == nil {
+	if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
 		log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 	} else {
 		log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 	}
 
-	agent.flowProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.flowProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
 		false)
-	agent.groupProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.groupProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceId),
 		false)
 
 	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	//agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
+	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
 
 	return nil
 }
@@ -193,6 +193,37 @@
 	return nil
 }
 
+//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
+	cloned := proto.Clone(flows).(*ofp.Flows)
+	afterUpdate := agent.flowProxy.Update("/", cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
+	}
+	// TODO:  Remove this code when the model update is fixed
+	ld, _ := agent.getLogicalDeviceWithoutLock()
+	clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
+	clonedDevice.Flows = proto.Clone(flows).(*ofp.Flows)
+	agent.updateLogicalDeviceWithoutLock(clonedDevice)
+	return nil
+}
+
+//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
+	cloned := proto.Clone(flowGroups).(*ofp.FlowGroups)
+	afterUpdate := agent.groupProxy.Update("/", cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
+	}
+	// TODO:  Remove this code when the model update is fixed
+	ld, _ := agent.getLogicalDeviceWithoutLock()
+	clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
+	clonedDevice.FlowGroups = proto.Clone(flowGroups).(*ofp.FlowGroups)
+	agent.updateLogicalDeviceWithoutLock(clonedDevice)
+	return nil
+}
+
+
 // getLogicalDeviceWithoutLock retrieves a logical device from the model without locking it.   This is used only by
 // functions that have already acquired the logical device lock to the model
 func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
@@ -296,30 +327,16 @@
 		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
 }
 
-//updateFlowsWithoutLock updates the flows in the logical device without locking the logical device.  This function
-//must only be called by a function that is holding the lock on the logical device
-func (agent *LogicalDeviceAgent) updateFlowsWithoutLock(flows []*ofp.OfpFlowStats) error {
-	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
-		return status.Error(codes.NotFound, agent.logicalDeviceId)
-	} else {
-		flowsCloned := make([]*ofp.OfpFlowStats, len(flows))
-		copy(flowsCloned, flows)
-		ldevice.Flows.Items = flowsCloned
-		return agent.updateLogicalDeviceWithoutLock(ldevice)
-	}
-}
 
 //updateFlowGroupsWithoutLock updates the flows in the logical device without locking the logical device.  This function
 //must only be called by a function that is holding the lock on the logical device
 func (agent *LogicalDeviceAgent) updateFlowGroupsWithoutLock(groups []*ofp.OfpGroupEntry) error {
-	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
-		return status.Error(codes.NotFound, agent.logicalDeviceId)
-	} else {
-		groupsCloned := make([]*ofp.OfpGroupEntry, len(groups))
-		copy(groupsCloned, groups)
-		ldevice.FlowGroups.Items = groupsCloned
-		return agent.updateLogicalDeviceWithoutLock(ldevice)
+	groupsCloned := make([]*ofp.OfpGroupEntry, len(groups))
+	copy(groupsCloned, groups)
+	if afterUpdate := agent.groupProxy.Update("/", groupsCloned, true, ""); afterUpdate == nil {
+		return errors.New(fmt.Sprintf("update-flow-group-failed:%s", agent.logicalDeviceId))
 	}
+	return nil
 }
 
 //flowAdd adds a flow to the flow table of that logical device
@@ -343,7 +360,7 @@
 		flows = lDevice.Flows.Items
 	}
 
-	oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
+	//oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
 	changed := false
 	checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
 	if checkOverlap {
@@ -373,17 +390,17 @@
 	}
 	if changed {
 		//	Update model
-		if lDevice.Flows == nil {
-			lDevice.Flows = &ofp.Flows{}
+		flowsToUpdate := &ofp.Flows{}
+		if lDevice.Flows != nil {
+			flowsToUpdate = &ofp.Flows{Items: flows}
 		}
-		lDevice.Flows.Items = flows
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(flowsToUpdate); err != nil {
+			log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
-	// For now, force the callback to occur
-	go agent.flowTableUpdated(oldData, lDevice.Flows)
+	//// For now, force the callback to occur
+	//go agent.flowTableUpdated(oldData, lDevice.Flows)
 	return nil
 }
 
@@ -414,9 +431,8 @@
 
 	//Update flows
 	if len(toKeep) < len(flows) {
-		lDevice.Flows.Items = toKeep
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
+			log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
@@ -452,8 +468,7 @@
 
 	//Update flows
 	if len(toKeep) < len(flows) {
-		lDevice.Flows.Items = toKeep
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
@@ -488,8 +503,7 @@
 	}
 
 	if changed {
-		lDevice.Flows.Items = flows
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
@@ -523,19 +537,15 @@
 		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
 	}
 	groups := lDevice.FlowGroups.Items
-	oldData := proto.Clone(lDevice.FlowGroups).(*voltha.FlowGroups)
 	if fu.FindGroup(groups, groupMod.GroupId) == -1 {
 		groups = append(groups, fd.GroupEntryFromGroupMod(groupMod))
-		lDevice.FlowGroups.Items = groups
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+			log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	} else {
 		return errors.New(fmt.Sprintf("Groups %d already present", groupMod.GroupId))
 	}
-	// For now, force the callback to occur
-	go agent.groupTableUpdated(oldData, lDevice.FlowGroups)
 	return nil
 }
 
@@ -572,14 +582,19 @@
 			groupsChanged = true
 		}
 	}
-	if groupsChanged || flowsChanged {
-		lDevice.FlowGroups.Items = groups
-		lDevice.Flows.Items = flows
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+	if groupsChanged {
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+			log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
+	if flowsChanged {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
+			log.Errorw("Cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
+	}
+
 	return nil
 }
 
@@ -609,8 +624,8 @@
 		groupsChanged = true
 	}
 	if groupsChanged {
-		lDevice.FlowGroups.Items = groups
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
+		//lDevice.FlowGroups.Items = groups
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
@@ -939,92 +954,94 @@
 func (agent *LogicalDeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
-	//agent.lockLogicalDevice.Lock()
-	//defer agent.lockLogicalDevice.Unlock()
+	// Run this callback in it's own go routine since callbacks are not invoked in their own
+	// go routine
+	go func(args ...interface{}) interface{} {
+		//agent.lockLogicalDevice.Lock()
+		//defer agent.lockLogicalDevice.Unlock()
 
-	var previousData *ofp.Flows
-	var latestData *ofp.Flows
+		var previousData *ofp.Flows
+		var latestData *ofp.Flows
 
-	var ok bool
-	if previousData, ok = args[0].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-	}
-	if latestData, ok = args[1].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-	}
+		var ok bool
+		if previousData, ok = args[0].(*ofp.Flows); !ok {
+			log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+		}
+		if latestData, ok = args[1].(*ofp.Flows); !ok {
+			log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+		}
 
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debug("flow-update-not-required")
+		if reflect.DeepEqual(previousData.Items, latestData.Items) {
+			log.Debug("flow-update-not-required")
+			return nil
+		}
+
+		// Ensure the device graph has been setup
+		agent.setupDeviceGraph()
+
+		var groups *ofp.FlowGroups
+		lDevice, _ := agent.getLogicalDeviceWithoutLock()
+		groups = lDevice.FlowGroups
+		log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+		for deviceId, value := range deviceRules.GetRules() {
+			agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+			agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+		}
+
 		return nil
-	}
+	}(args...)
 
-	// Ensure the device graph has been setup
-	agent.setupDeviceGraph()
-
-	var groups *ofp.FlowGroups
-	lDevice, _ := agent.getLogicalDeviceWithoutLock()
-	groups = lDevice.FlowGroups
-	log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
-	//groupsIf := agent.groupProxy.Get("/", 1, false, "")
-	//if groups, ok = groupsIf.(*ofp.FlowGroups); !ok {
-	//	log.Errorw("cannot-retrieve-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "group": groupsIf})
-	//	//return errors.New("cannot-retrieve-groups")
-	//	groups = &ofp.FlowGroups{Items:nil}
-	//}
-	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
-	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-	for deviceId, value := range deviceRules.GetRules() {
-		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
-	}
 	return nil
 }
 
 func (agent *LogicalDeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
-	//agent.lockLogicalDevice.Lock()
-	//defer agent.lockLogicalDevice.Unlock()
+	// Run this callback in it's own go routine since callbacks are not invoked in their own
+	// go routine
+	go func(args ...interface{}) interface{} {
+		//agent.lockLogicalDevice.Lock()
+		//defer agent.lockLogicalDevice.Unlock()
 
-	var previousData *ofp.FlowGroups
-	var latestData *ofp.FlowGroups
+		var previousData *ofp.FlowGroups
+		var latestData *ofp.FlowGroups
 
-	var ok bool
-	if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-	}
-	if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-	}
+		var ok bool
+		if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
+			log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+		}
+		if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
+			log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+		}
 
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debug("flow-update-not-required")
+		if reflect.DeepEqual(previousData.Items, latestData.Items) {
+			log.Debug("flow-update-not-required")
+			return nil
+		}
+
+		// Ensure the device graph has been setup
+		agent.setupDeviceGraph()
+
+		var flows *ofp.Flows
+		lDevice, _ := agent.getLogicalDeviceWithoutLock()
+		flows = lDevice.Flows
+		log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+		for deviceId, value := range deviceRules.GetRules() {
+			agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+			agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+		}
 		return nil
-	}
+	}(args...)
 
-	// Ensure the device graph has been setup
-	agent.setupDeviceGraph()
-
-	var flows *ofp.Flows
-	lDevice, _ := agent.getLogicalDeviceWithoutLock()
-	flows = lDevice.Flows
-	log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
-	//flowsIf := agent.flowProxy.Get("/", 1, false, "")
-	//if flows, ok = flowsIf.(*ofp.Flows); !ok {
-	//	log.Errorw("cannot-retrieve-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "flows": flows})
-	//	//return errors.New("cannot-retrieve-groups")
-	//	flows = &ofp.Flows{Items:nil}
-	//}
-	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
-	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-	for deviceId, value := range deviceRules.GetRules() {
-		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
-	}
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) packetOut(packet *ofp.OfpPacketOut ) {
+func (agent *LogicalDeviceAgent) packetOut(packet *ofp.OfpPacketOut) {
 	log.Debugw("packet-out", log.Fields{"packet": packet.GetInPort()})
 	outPort := fd.GetPacketOutPort(packet)
 	//frame := packet.GetData()
@@ -1032,10 +1049,8 @@
 	agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet)
 }
 
-
 func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
 	log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
 	packet_in := fd.MkPacketIn(port, packet)
 	log.Debugw("sending-packet-in", log.Fields{"packet-in": packet_in})
 }
-