[VOL-1462] Sync data between two voltha cores in the same pair

This commit consists of the following updates:
1) Background data syncing between two cores after a transaction
is completed by one core.
2) Add transaction management to southbound APIs (adapter facing).
This is enabled got adapter registration only for now.
3) Fix an issue with flow decomposition
4) Add the rough-in to allow a packet to be send to an OFAgent
with a transaction ID.  Two cores can therefore send the same
packet and let the OFAgent discard the duplicate.  The work in
OFAgent remains.
5) Cleanups

Change-Id: Ibe9d75edb66cfd6a0954bdfeb16a7e7c8a3c53b6
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 8bf8664..116e2bb 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -54,10 +54,10 @@
 	cloned := (proto.Clone(device)).(*voltha.Device)
 	if cloned.Id == "" {
 		cloned.Id = CreateDeviceId()
+		cloned.AdminState = voltha.AdminState_PREPROVISIONED
+		cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
+		cloned.Flows = &ofp.Flows{Items: nil}
 	}
-	cloned.AdminState = voltha.AdminState_PREPROVISIONED
-	cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
-	cloned.Flows = &ofp.Flows{Items: nil}
 	if !device.GetRoot() && device.ProxyAddress != nil {
 		// Set the default vlan ID to the one specified by the parent adapter.  It can be
 		// overwritten by the child adapter during a device update request
@@ -74,15 +74,29 @@
 	return &agent
 }
 
-// start save the device to the data model and registers for callbacks on that device
-func (agent *DeviceAgent) start(ctx context.Context) {
+// start save the device to the data model and registers for callbacks on that device if loadFromdB is false.  Otherwise,
+// it will load the data from the dB and setup teh necessary callbacks and proxies.
+func (agent *DeviceAgent) start(ctx context.Context, loadFromdB bool) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
-	log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
-	// Add the initial device to the local model
-	if added := agent.clusterDataProxy.AddWithID("/devices", agent.deviceId, agent.lastData, ""); added == nil {
-		log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
+	log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceId})
+	if loadFromdB {
+		if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+			if d, ok := device.(*voltha.Device); ok {
+				agent.lastData = proto.Clone(d).(*voltha.Device)
+			}
+		} else {
+			log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceId})
+			return status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+		}
+		log.Debugw("device-loaded-from-dB", log.Fields{"device": agent.lastData})
+	} else {
+		// Add the initial device to the local model
+		if added := agent.clusterDataProxy.AddWithID("/devices", agent.deviceId, agent.lastData, ""); added == nil {
+			log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
+		}
 	}
+
 	agent.deviceProxy = agent.clusterDataProxy.Root.CreateProxy("/devices/"+agent.deviceId, false)
 	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
 
@@ -97,6 +111,7 @@
 	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
 
 	log.Debug("device-agent-started")
+	return nil
 }
 
 // stop stops the device agent.  Not much to do for now
@@ -112,7 +127,7 @@
 func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
-	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
 		if d, ok := device.(*voltha.Device); ok {
 			cloned := proto.Clone(d).(*voltha.Device)
 			return cloned, nil
@@ -124,7 +139,7 @@
 // getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
 // This function is meant so that we do not have duplicate code all over the device agent functions
 func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
-	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
 		if d, ok := device.(*voltha.Device); ok {
 			cloned := proto.Clone(d).(*voltha.Device)
 			return cloned, nil
@@ -808,7 +823,7 @@
 }
 
 //flowTableUpdated is the callback after flows have been updated in the model to push them
-//to the adapterAgents
+//to the adapters
 func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
@@ -842,7 +857,7 @@
 	}
 	groups := device.FlowGroups
 
-	// Send update to adapterAgents
+	// Send update to adapters
 	dType := agent.adapterMgr.getDeviceType(device.Type)
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
@@ -882,7 +897,7 @@
 }
 
 //groupTableUpdated is the callback after group table has been updated in the model to push them
-//to the adapterAgents
+//to the adapters
 func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
@@ -916,11 +931,9 @@
 	}
 	flows := device.Flows
 
-	// Send update to adapterAgents
-	// TODO: Check whether the device supports incremental flow changes
-	// Assume false for test
-	acceptsAddRemoveFlowUpdates := false
-	if !acceptsAddRemoveFlowUpdates {
+	// Send update to adapters
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+	if !dType.AcceptsAddRemoveFlowUpdates {
 		if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
 			log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
 			return err