This update addresses the following:
1.  Decouple the kafka messaging proxy from the kafka client.  This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters.  This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch.   This will be dealt in a separate
update.

Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 92f00bf..7e7f42a 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -33,7 +33,7 @@
 
 type DeviceAgent struct {
 	deviceId         string
-	deviceType 		string
+	deviceType       string
 	lastData         *voltha.Device
 	adapterProxy     *AdapterProxy
 	deviceMgr        *DeviceManager
@@ -79,18 +79,18 @@
 	if added := agent.clusterDataProxy.Add("/devices", agent.lastData, ""); added == nil {
 		log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
 	}
-	agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
-	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
+	agent.deviceProxy = agent.clusterDataProxy.Root.CreateProxy("/devices/"+agent.deviceId, false)
+	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
 
-	agent.flowProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.flowProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/devices/%s/flows", agent.deviceId),
 		false)
-	agent.groupProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.groupProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
 		false)
 
 	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	//agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
+	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
 
 	log.Debug("device-agent-started")
 }
@@ -249,12 +249,6 @@
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		//TODO: callback will be invoked to handle this state change
-		//For now force the state transition to happen
-		if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
-			return err
-		}
 	}
 	return nil
 }
@@ -289,7 +283,8 @@
 		agent.lockDevice.Unlock()
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
-		if device.AdminState != voltha.AdminState_DISABLED {
+		if (device.AdminState != voltha.AdminState_DISABLED) &&
+			(device.AdminState != voltha.AdminState_PREPROVISIONED) {
 			log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
 			//TODO:  Needs customized error message
 			agent.lockDevice.Unlock()
@@ -311,13 +306,6 @@
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		//TODO: callback will be invoked to handle this state change
-		//For now force the state transition to happen
-		if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
-			return err
-		}
-
 	}
 	return nil
 }
@@ -379,38 +367,54 @@
 	return nil
 }
 
-
-// TODO: implement when callback from the data model is ready
 // processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
 func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
-	log.Debug("!!!!!!!!!!!!!!!!!!!!!!!!!")
-	log.Debugw("processUpdate", log.Fields{"deviceId": agent.deviceId, "args": args})
+	//// Run this callback in its own go routine
+	go func(args ...interface{}) interface{} {
+		var previous *voltha.Device
+		var current *voltha.Device
+		var ok bool
+		if len(args) == 2 {
+			if previous, ok = args[0].(*voltha.Device); !ok {
+				log.Errorw("invalid-callback-type", log.Fields{"data": args[0]})
+				return nil
+			}
+			if current, ok = args[1].(*voltha.Device); !ok {
+				log.Errorw("invalid-callback-type", log.Fields{"data": args[1]})
+				return nil
+			}
+		} else {
+			log.Errorw("too-many-args-in-callback", log.Fields{"len": len(args)})
+			return nil
+		}
+		// Perform the state transition in it's own go routine
+		agent.deviceMgr.processTransition(previous, current)
+		return nil
+	}(args...)
+
 	return nil
 }
 
 func (agent *DeviceAgent) updateDevice(device *voltha.Device) error {
 	agent.lockDevice.Lock()
-	//defer agent.lockDevice.Unlock()
+	defer agent.lockDevice.Unlock()
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
-	// Get the dev info from the model
-	if storedData, err := agent.getDeviceWithoutLock(); err != nil {
-		agent.lockDevice.Unlock()
-		return status.Errorf(codes.NotFound, "%s", device.Id)
-	} else {
-		// store the changed data
-		cloned := proto.Clone(device).(*voltha.Device)
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
-		agent.lockDevice.Unlock()
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", device.Id)
-		}
-		// Perform the state transition
-		if err := agent.deviceMgr.processTransition(storedData, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
-			return err
-		}
-		return nil
+	cloned := proto.Clone(device).(*voltha.Device)
+	afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "%s", device.Id)
 	}
+	return nil
+}
+
+func (agent *DeviceAgent) updateDeviceWithoutLock(device *voltha.Device) error {
+	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
+	cloned := proto.Clone(device).(*voltha.Device)
+	afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "%s", device.Id)
+	}
+	return nil
 }
 
 func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
@@ -439,11 +443,6 @@
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		// Perform the state transition
-		if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
-			return err
-		}
 		return nil
 	}
 }
@@ -482,11 +481,6 @@
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		// Perform the state transition
-		if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
-			return err
-		}
 		return nil
 	}
 }
@@ -561,20 +555,11 @@
 				break
 			}
 		}
-		//To track an issue when adding peer-port.
-		log.Debugw("before-peer-added", log.Fields{"device": cloned})
 		// Store the device
 		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
-		//To track an issue when adding peer-port.
-		if d, ok := afterUpdate.(*voltha.Device); ok {
-			log.Debugw("after-peer-added", log.Fields{"device": d})
-		} else {
-			log.Debug("after-peer-added-incorrect-type", log.Fields{"type": reflect.ValueOf(afterUpdate).Type()})
-		}
-
 		return nil
 	}
 }
@@ -615,7 +600,7 @@
 	groups := device.FlowGroups
 
 	// Send update to adapters
-	// Check whether the device supports incremental flow changes
+	// TODO: Check whether the device supports incremental flow changes
 	// Assume false for test
 	acceptsAddRemoveFlowUpdates := false
 	if !acceptsAddRemoveFlowUpdates {
@@ -691,7 +676,7 @@
 	flows := device.Flows
 
 	// Send update to adapters
-	// Check whether the device supports incremental flow changes
+	// TODO: Check whether the device supports incremental flow changes
 	// Assume false for test
 	acceptsAddRemoveFlowUpdates := false
 	if !acceptsAddRemoveFlowUpdates {