VOL-1173 : Removed hash based storage; replaced with per device protobuf

- Ensured proxies issue callbacks instead of forcing with goroutines
- Fixed mutex issue with proxy component

Change-Id: Idabd3257c6d264c0f607ee228e406810304dab43
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 784b506..9e8710b 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -51,7 +51,9 @@
 	var agent DeviceAgent
 	agent.adapterProxy = ap
 	cloned := (proto.Clone(device)).(*voltha.Device)
-	cloned.Id = CreateDeviceId()
+	if cloned.Id == "" {
+		cloned.Id = CreateDeviceId()
+	}
 	cloned.AdminState = voltha.AdminState_PREPROVISIONED
 	cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
 	cloned.Flows = &ofp.Flows{Items: nil}
@@ -180,15 +182,13 @@
 	} else {
 		oldData = proto.Clone(storedData.Flows).(*voltha.Flows)
 		log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows, "old": oldData})
+
 		// store the changed data
-		storedData.Flows.Items = flows
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
+		afterUpdate := agent.flowProxy.Update("/", &ofp.Flows{Items: flows}, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 
-		// For now, force the callback to occur
-		go agent.flowTableUpdated(oldData, &ofp.Flows{Items: flows})
 		return nil
 	}
 }
@@ -196,21 +196,16 @@
 func (agent *DeviceAgent) updateGroups(groups []*ofp.OfpGroupEntry) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
-	var oldData *voltha.FlowGroups
 	log.Debugw("updateGroups", log.Fields{"deviceId": agent.deviceId, "groups": groups})
-	if storedData, err := agent.getDeviceWithoutLock(); err != nil {
+	if _, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
-		oldData = proto.Clone(storedData.FlowGroups).(*voltha.FlowGroups)
 		// store the changed data
-		storedData.FlowGroups.Items = groups
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
+		afterUpdate := agent.groupProxy.Update("/", &ofp.FlowGroups{Items: groups}, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 
-		// For now, force the callback to occur
-		go agent.groupTableUpdated(oldData, &ofp.FlowGroups{Items: groups})
 		return nil
 	}
 }
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 45584a1..682de48 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -18,7 +18,6 @@
 import (
 	"context"
 	"errors"
-	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/kafka"
@@ -184,20 +183,37 @@
 	return device.Root, nil
 }
 
+// GetDevice retrieves the latest device information from the data model
 func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
 	log.Debug("ListDevices")
 	result := &voltha.Devices{}
-	dMgr.lockDeviceAgentsMap.Lock()
-	defer dMgr.lockDeviceAgentsMap.Unlock()
-	for _, agent := range dMgr.deviceAgents {
-		if device, err := agent.getDevice(); err == nil {
-			cloned := proto.Clone(device).(*voltha.Device)
-			result.Items = append(result.Items, cloned)
+	if devices := dMgr.clusterDataProxy.Get("/devices", 0, false, ""); devices != nil {
+		for _, device := range devices.([]interface{}) {
+			if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
+				agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+				dMgr.addDeviceAgentToMap(agent)
+				agent.start(nil)
+			}
+			result.Items = append(result.Items, device.(*voltha.Device))
 		}
 	}
 	return result, nil
 }
 
+//func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
+//	log.Debug("ListDevices")
+//	result := &voltha.Devices{}
+//	dMgr.lockDeviceAgentsMap.Lock()
+//	defer dMgr.lockDeviceAgentsMap.Unlock()
+//	for _, agent := range dMgr.deviceAgents {
+//		if device, err := agent.getDevice(); err == nil {
+//			//cloned := proto.Clone(device).(*voltha.Device)
+//			result.Items = append(result.Items, device)
+//		}
+//	}
+//	return result, nil
+//}
+
 func (dMgr *DeviceManager) updateDevice(device *voltha.Device) error {
 	log.Debugw("updateDevice", log.Fields{"deviceid": device.Id, "device": device})
 	if agent := dMgr.getDeviceAgent(device.Id); agent != nil {
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 8a69967..ea94788 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -50,12 +50,13 @@
 	flowDecomposer    *fd.FlowDecomposer
 }
 
-func newLogicalDeviceAgent(id string, device *voltha.Device, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager,
+func newLogicalDeviceAgent(id string, deviceId string, ldeviceMgr *LogicalDeviceManager,
+	deviceMgr *DeviceManager,
 	cdProxy *model.Proxy) *LogicalDeviceAgent {
 	var agent LogicalDeviceAgent
 	agent.exitChannel = make(chan int, 1)
 	agent.logicalDeviceId = id
-	agent.rootDeviceId = device.Id
+	agent.rootDeviceId = deviceId
 	agent.deviceMgr = deviceMgr
 	agent.clusterDataProxy = cdProxy
 	agent.ldeviceMgr = ldeviceMgr
@@ -148,8 +149,7 @@
 	defer agent.lockLogicalDevice.Unlock()
 	logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
 	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
-		cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
-		return cloned, nil
+		return lDevice, nil
 	}
 	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
 }
@@ -162,7 +162,7 @@
 	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
 		lPorts := make([]*voltha.LogicalPort, 0)
 		for _, port := range lDevice.Ports {
-			lPorts = append(lPorts, proto.Clone(port).(*voltha.LogicalPort))
+			lPorts = append(lPorts, port)
 		}
 		return &voltha.LogicalPorts{Items: lPorts}, nil
 	}
@@ -195,31 +195,19 @@
 
 //updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
 func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
-	cloned := proto.Clone(flows).(*ofp.Flows)
-	afterUpdate := agent.flowProxy.Update("/", cloned, false, "")
+	afterUpdate := agent.flowProxy.Update("/", flows, false, "")
 	if afterUpdate == nil {
 		return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
 	}
-	// TODO:  Remove this code when the model update is fixed
-	ld, _ := agent.getLogicalDeviceWithoutLock()
-	clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
-	clonedDevice.Flows = proto.Clone(flows).(*ofp.Flows)
-	agent.updateLogicalDeviceWithoutLock(clonedDevice)
 	return nil
 }
 
 //updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
 func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
-	cloned := proto.Clone(flowGroups).(*ofp.FlowGroups)
-	afterUpdate := agent.groupProxy.Update("/", cloned, false, "")
+	afterUpdate := agent.groupProxy.Update("/", flowGroups, false, "")
 	if afterUpdate == nil {
 		return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
 	}
-	// TODO:  Remove this code when the model update is fixed
-	ld, _ := agent.getLogicalDeviceWithoutLock()
-	clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
-	clonedDevice.FlowGroups = proto.Clone(flowGroups).(*ofp.FlowGroups)
-	agent.updateLogicalDeviceWithoutLock(clonedDevice)
 	return nil
 }
 
@@ -229,8 +217,7 @@
 	log.Debug("getLogicalDeviceWithoutLock")
 	logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
 	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
-		cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
-		return cloned, nil
+		return lDevice, nil
 	}
 	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
 }
@@ -260,7 +247,6 @@
 		return status.Error(codes.NotFound, agent.logicalDeviceId)
 	} else {
 		log.Infow("!!!!!!!!!!!ADDING-UNI", log.Fields{"deviceId": childDevice.Id})
-		cloned := proto.Clone(ldevice).(*voltha.LogicalDevice)
 		portCap.Port.RootPort = false
 		//TODO: For now use the channel id assigned by the OLT as logical port number
 		lPortNo := childDevice.ProxyAddress.ChannelId
@@ -269,17 +255,15 @@
 		portCap.Port.OfpPort.Name = portCap.Port.Id
 		portCap.Port.DeviceId = childDevice.Id
 		portCap.Port.DevicePortNo = uniPort
-		lp := proto.Clone(portCap.Port).(*voltha.LogicalPort)
-		lp.DeviceId = childDevice.Id
-		cloned.Ports = append(cloned.Ports, lp)
-		return agent.updateLogicalDeviceWithoutLock(cloned)
+		portCap.Port.DeviceId = childDevice.Id
+		ldevice.Ports = append(ldevice.Ports, portCap.Port)
+		return agent.updateLogicalDeviceWithoutLock(ldevice)
 	}
 }
 
 //updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
 func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
-	cloned := proto.Clone(logicalDevice).(*voltha.LogicalDevice)
-	afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, cloned, false, "")
+	afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, logicalDevice, false, "")
 	if afterUpdate == nil {
 		return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
 	}
@@ -397,8 +381,6 @@
 			return err
 		}
 	}
-	//// For now, force the callback to occur
-	//go agent.flowTableUpdated(oldData, lDevice.Flows)
 	return nil
 }
 
@@ -952,45 +934,36 @@
 func (agent *LogicalDeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
-	// Run this callback in it's own go routine since callbacks are not invoked in their own
-	// go routine
-	go func(args ...interface{}) interface{} {
-		//agent.lockLogicalDevice.Lock()
-		//defer agent.lockLogicalDevice.Unlock()
+	var previousData *ofp.Flows
+	var latestData *ofp.Flows
 
-		var previousData *ofp.Flows
-		var latestData *ofp.Flows
+	var ok bool
+	if previousData, ok = args[0].(*ofp.Flows); !ok {
+		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+	}
+	if latestData, ok = args[1].(*ofp.Flows); !ok {
+		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+	}
 
-		var ok bool
-		if previousData, ok = args[0].(*ofp.Flows); !ok {
-			log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-		}
-		if latestData, ok = args[1].(*ofp.Flows); !ok {
-			log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-		}
-
-		if reflect.DeepEqual(previousData.Items, latestData.Items) {
-			log.Debug("flow-update-not-required")
-			return nil
-		}
-
-		// Ensure the device graph has been setup
-		agent.setupDeviceGraph()
-
-		var groups *ofp.FlowGroups
-		lDevice, _ := agent.getLogicalDeviceWithoutLock()
-		groups = lDevice.FlowGroups
-		log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
-		deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
-		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
-		for deviceId, value := range deviceRules.GetRules() {
-			agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-			agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
-		}
-
+	if reflect.DeepEqual(previousData.Items, latestData.Items) {
+		log.Debug("flow-update-not-required")
 		return nil
-	}(args...)
+	}
+
+	// Ensure the device graph has been setup
+	agent.setupDeviceGraph()
+
+	var groups *ofp.FlowGroups
+	lDevice, _ := agent.getLogicalDeviceWithoutLock()
+	groups = lDevice.FlowGroups
+	log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
+	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
+	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+	for deviceId, value := range deviceRules.GetRules() {
+		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+	}
 
 	return nil
 }
@@ -998,43 +971,35 @@
 func (agent *LogicalDeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
-	// Run this callback in it's own go routine since callbacks are not invoked in their own
-	// go routine
-	go func(args ...interface{}) interface{} {
-		//agent.lockLogicalDevice.Lock()
-		//defer agent.lockLogicalDevice.Unlock()
+	var previousData *ofp.FlowGroups
+	var latestData *ofp.FlowGroups
 
-		var previousData *ofp.FlowGroups
-		var latestData *ofp.FlowGroups
+	var ok bool
+	if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
+		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+	}
+	if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
+		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+	}
 
-		var ok bool
-		if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
-			log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-		}
-		if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
-			log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-		}
-
-		if reflect.DeepEqual(previousData.Items, latestData.Items) {
-			log.Debug("flow-update-not-required")
-			return nil
-		}
-
-		// Ensure the device graph has been setup
-		agent.setupDeviceGraph()
-
-		var flows *ofp.Flows
-		lDevice, _ := agent.getLogicalDeviceWithoutLock()
-		flows = lDevice.Flows
-		log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
-		deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
-		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-		for deviceId, value := range deviceRules.GetRules() {
-			agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-			agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
-		}
+	if reflect.DeepEqual(previousData.Items, latestData.Items) {
+		log.Debug("flow-update-not-required")
 		return nil
-	}(args...)
+	}
+
+	// Ensure the device graph has been setup
+	agent.setupDeviceGraph()
+
+	var flows *ofp.Flows
+	lDevice, _ := agent.getLogicalDeviceWithoutLock()
+	flows = lDevice.Flows
+	log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
+	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
+	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+	for deviceId, value := range deviceRules.GetRules() {
+		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+	}
 
 	return nil
 }
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 64743cc..5f9cea2 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -106,18 +106,40 @@
 }
 
 func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
-	log.Debug("listLogicalDevices")
+	log.Debug("ListAllLogicalDevices")
 	result := &voltha.LogicalDevices{}
-	ldMgr.lockLogicalDeviceAgentsMap.Lock()
-	defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
-	for _, agent := range ldMgr.logicalDeviceAgents {
-		if lDevice, err := agent.GetLogicalDevice(); err == nil {
-			result.Items = append(result.Items, lDevice)
+	if logicalDevices := ldMgr.clusterDataProxy.Get("/logical_devices", 0, false, ""); logicalDevices != nil {
+		for _, logicalDevice := range logicalDevices.([]interface{}) {
+			if agent := ldMgr.getLogicalDeviceAgent(logicalDevice.(*voltha.LogicalDevice).Id); agent == nil {
+				agent = newLogicalDeviceAgent(
+					logicalDevice.(*voltha.LogicalDevice).Id,
+					logicalDevice.(*voltha.LogicalDevice).RootDeviceId,
+					ldMgr,
+					ldMgr.deviceMgr,
+					ldMgr.clusterDataProxy,
+				)
+				ldMgr.addLogicalDeviceAgentToMap(agent)
+				go agent.start(nil)
+			}
+			result.Items = append(result.Items, logicalDevice.(*voltha.LogicalDevice))
 		}
 	}
 	return result, nil
 }
 
+//func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
+//	log.Debug("listLogicalDevices")
+//	result := &voltha.LogicalDevices{}
+//	ldMgr.lockLogicalDeviceAgentsMap.Lock()
+//	defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+//	for _, agent := range ldMgr.logicalDeviceAgents {
+//		if lDevice, err := agent.GetLogicalDevice(); err == nil {
+//			result.Items = append(result.Items, lDevice)
+//		}
+//	}
+//	return result, nil
+//}
+
 func (ldMgr *LogicalDeviceManager) createLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
 	log.Debugw("creating-logical-device", log.Fields{"deviceId": device.Id})
 	// Sanity check
@@ -137,7 +159,7 @@
 	}
 	log.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
 
-	agent := newLogicalDeviceAgent(id, device, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+	agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
 	ldMgr.addLogicalDeviceAgentToMap(agent)
 	go agent.start(ctx)