[VOL-1036] Initial implementation of device lifecycle management

Change-Id: I5aa58fdcbcd852f6f5eef35d48f25f76e20c0418
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 805dd21..d9dacbc 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -17,6 +17,9 @@
 
 import (
 	"context"
+	"reflect"
+	"sync"
+
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
@@ -24,7 +27,6 @@
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
-	"reflect"
 )
 
 type DeviceAgent struct {
@@ -33,64 +35,160 @@
 	adapterProxy     *AdapterProxy
 	deviceMgr        *DeviceManager
 	clusterDataProxy *model.Proxy
+	deviceProxy      *model.Proxy
 	exitChannel      chan int
+	lockDevice       sync.RWMutex
 }
 
 func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
 	var agent DeviceAgent
-	device.Id = CreateDeviceId()
-	agent.deviceId = device.Id
 	agent.adapterProxy = ap
-	agent.lastData = device
+	cloned := (proto.Clone(device)).(*voltha.Device)
+	cloned.Id = CreateDeviceId()
+	cloned.AdminState = voltha.AdminState_PREPROVISIONED
+	agent.deviceId = cloned.Id
+	agent.lastData = cloned
 	agent.deviceMgr = deviceMgr
 	agent.exitChannel = make(chan int, 1)
 	agent.clusterDataProxy = cdProxy
+	agent.lockDevice = sync.RWMutex{}
 	return &agent
 }
 
 func (agent *DeviceAgent) start(ctx context.Context) {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
 	log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
 	// Add the initial device to the local model
 	if added := agent.clusterDataProxy.Add("/devices", agent.lastData, ""); added == nil {
 		log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
 	}
+	agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
+	//agent.deviceProxy = agent.clusterDataProxy.Root.Node.GetProxy("/", false)
+	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
 	log.Debug("device-agent-started")
 }
 
 func (agent *DeviceAgent) Stop(ctx context.Context) {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
 	log.Debug("stopping-device-agent")
 	agent.exitChannel <- 1
 	log.Debug("device-agent-stopped")
 }
 
+func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+		if d, ok := device.(*voltha.Device); ok {
+			cloned := proto.Clone(d).(*voltha.Device)
+			return cloned, nil
+		}
+	}
+	return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+//getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
+// This function is meant so that we do not have duplicate code all over the device agent functions
+func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
+	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+		if d, ok := device.(*voltha.Device); ok {
+			cloned := proto.Clone(d).(*voltha.Device)
+			return cloned, nil
+		}
+	}
+	return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
 func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
-	log.Debugw("enableDevice", log.Fields{"id": agent.lastData.Id, "device": agent.lastData})
-	// Update the device status
-	if device, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("enableDevice", log.Fields{"id": agent.deviceId})
+	if device, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
-		cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
-		cloned.AdminState = voltha.AdminState_ENABLED
-		cloned.OperStatus = voltha.OperStatus_ACTIVATING
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
-		} else {
-			if err := agent.adapterProxy.AdoptDevice(ctx, &cloned); err != nil {
-				log.Debugw("enableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+		if device.AdminState == voltha.AdminState_ENABLED {
+			log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceId})
+			//TODO:  Needs customized error message
+			return nil
+		}
+		// Verify whether we need to adopt the device the first time
+		// TODO: A state machine for these state transitions would be better (we just have to handle
+		// a limited set of states now or it may be an overkill)
+		if device.AdminState == voltha.AdminState_PREPROVISIONED {
+			// First send the request to an Adapter and wait for a response
+			if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
+				log.Debugw("adoptDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
 				return err
 			}
-			agent.lastData = &cloned
+		} else {
+			// First send the request to an Adapter and wait for a response
+			if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
+				log.Debugw("renableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+				return err
+			}
+		}
+		// Received an Ack (no error found above).  Now update the device in the model to the expected state
+		cloned := proto.Clone(device).(*voltha.Device)
+		cloned.AdminState = voltha.AdminState_ENABLED
+		cloned.OperStatus = voltha.OperStatus_ACTIVATING
+		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 	}
 	return nil
 }
 
-func (agent *DeviceAgent) getNNIPorts(ctx context.Context) *voltha.Ports {
-	log.Debugw("getNNIPorts", log.Fields{"id": agent.deviceId})
+func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
+	agent.lockDevice.Lock()
+	//defer agent.lockDevice.Unlock()
+	log.Debugw("disableDevice", log.Fields{"id": agent.deviceId})
+	// Get the most up to date the device info
+	if device, err := agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		if device.AdminState == voltha.AdminState_DISABLED {
+			log.Debugw("device-already-disabled", log.Fields{"id": agent.deviceId})
+			//TODO:  Needs customized error message
+			agent.lockDevice.Unlock()
+			return nil
+		}
+		// First send the request to an Adapter and wait for a response
+		if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
+			log.Debugw("disableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+			agent.lockDevice.Unlock()
+			return err
+		}
+		// Received an Ack (no error found above).  Now update the device in the model to the expected state
+		cloned := proto.Clone(device).(*voltha.Device)
+		cloned.AdminState = voltha.AdminState_DISABLED
+		// Set the state of all ports on that device to disable
+		for _, port := range cloned.Ports {
+			port.AdminState = voltha.AdminState_DISABLED
+			port.OperStatus = voltha.OperStatus_UNKNOWN
+		}
+		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+			agent.lockDevice.Unlock()
+			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+		}
+		agent.lockDevice.Unlock()
+		//TODO: callback will be invoked to handle this state change
+		//For now force the state transition to happen
+		if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
+			log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
+			return err
+		}
+	}
+	return nil
+}
+
+func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+	log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
 	ports := &voltha.Ports{}
 	if device, _ := agent.deviceMgr.getDevice(agent.deviceId); device != nil {
 		for _, port := range device.Ports {
-			if port.Type == voltha.Port_ETHERNET_NNI {
+			if port.Type == portType {
 				ports.Items = append(ports.Items, port)
 			}
 		}
@@ -128,15 +226,25 @@
 	}
 }
 
+func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
+	log.Debug("!!!!!!!!!!!!!!!!!!!!!!!!!")
+	log.Debugw("processUpdate", log.Fields{"deviceId": agent.deviceId, "args": args})
+	return nil
+}
+
 func (agent *DeviceAgent) updateDevice(device *voltha.Device) error {
+	agent.lockDevice.Lock()
+	//defer agent.lockDevice.Unlock()
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
 	// Get the dev info from the model
-	if storedData, err := agent.deviceMgr.getDevice(device.Id); err != nil {
+	if storedData, err := agent.getDeviceWithoutLock(); err != nil {
+		agent.lockDevice.Unlock()
 		return status.Errorf(codes.NotFound, "%s", device.Id)
 	} else {
 		// store the changed data
-		cloned := (proto.Clone(device)).(*voltha.Device)
+		cloned := proto.Clone(device).(*voltha.Device)
 		afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+		agent.lockDevice.Unlock()
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", device.Id)
 		}
@@ -149,26 +257,77 @@
 	}
 }
 
-func (agent *DeviceAgent) updateDeviceState(operState *core_adapter.IntType, connState *core_adapter.IntType) error {
+func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
+	agent.lockDevice.Lock()
+	//defer agent.lockDevice.Unlock()
 	// Work only on latest data
-	if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+		agent.lockDevice.Unlock()
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
 		// clone the device
-		cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
-		if operState != nil {
-			cloned.OperStatus = voltha.OperStatus_OperStatus(operState.Val)
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
+		if s, ok := voltha.ConnectStatus_ConnectStatus_value[connStatus.String()]; ok {
+			log.Debugw("updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
+			cloned.ConnectStatus = connStatus
 		}
-		if connState != nil {
-			cloned.ConnectStatus = voltha.ConnectStatus_ConnectStatus(connState.Val)
+		if s, ok := voltha.OperStatus_OperStatus_value[operStatus.String()]; ok {
+			log.Debugw("updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
+			cloned.OperStatus = operStatus
 		}
-		log.Debugw("DeviceStateUpdate-device", log.Fields{"device": cloned})
+		log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
 		// Store the device
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
+		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+			agent.lockDevice.Unlock()
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
+		agent.lockDevice.Unlock()
 		// Perform the state transition
-		if err := agent.deviceMgr.processTransition(storeDevice, &cloned); err != nil {
+		if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
+			log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
+			return err
+		}
+		return nil
+	}
+}
+
+func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
+	agent.lockDevice.Lock()
+	//defer agent.lockDevice.Unlock()
+	// Work only on latest data
+	// TODO: Get list of ports from device directly instead of the entire device
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+		agent.lockDevice.Unlock()
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		// clone the device
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
+		if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
+			agent.lockDevice.Unlock()
+			return status.Errorf(codes.InvalidArgument, "%s", portType)
+		}
+		for _, port := range cloned.Ports {
+			if port.Type == portType && port.PortNo == portNo {
+				port.OperStatus = operStatus
+				// Set the admin status to ENABLED if the operational status is ACTIVE
+				// TODO: Set by northbound system?
+				if operStatus == voltha.OperStatus_ACTIVE {
+					port.AdminState = voltha.AdminState_ENABLED
+				}
+				break
+			}
+		}
+		log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
+		// Store the device
+		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+			agent.lockDevice.Unlock()
+			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		}
+		agent.lockDevice.Unlock()
+		// Perform the state transition
+		if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
 			log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
 			return err
 		}
@@ -177,17 +336,18 @@
 }
 
 func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
 	log.Debug("updatePmConfigs")
 	// Work only on latest data
-	if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
 		// clone the device
-		cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
-		cp := proto.Clone(pmConfigs)
-		cloned.PmConfigs = cp.(*voltha.PmConfigs)
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
 		// Store the device
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
+		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
@@ -196,21 +356,57 @@
 }
 
 func (agent *DeviceAgent) addPort(port *voltha.Port) error {
-	log.Debug("addPort")
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
 	// Work only on latest data
-	if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
 		// clone the device
-		cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
 		if cloned.Ports == nil {
 			//	First port
+			log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
 			cloned.Ports = make([]*voltha.Port, 0)
 		}
-		cp := proto.Clone(port)
-		cloned.Ports = append(cloned.Ports, cp.(*voltha.Port))
+		cp := proto.Clone(port).(*voltha.Port)
+		// Set the admin state of the port to ENABLE if the operational state is ACTIVE
+		// TODO: Set by northbound system?
+		if cp.OperStatus == voltha.OperStatus_ACTIVE {
+			cp.AdminState = voltha.AdminState_ENABLED
+		}
+		cloned.Ports = append(cloned.Ports, cp)
 		// Store the device
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
+		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+		if afterUpdate == nil {
+			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		}
+		return nil
+	}
+}
+
+func (agent *DeviceAgent) addPeerPort(port *voltha.Port_PeerPort) error {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debug("addPeerPort")
+	// Work only on latest data
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		// clone the device
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		// Get the peer port on the device based on the port no
+		for _, peerPort := range cloned.Ports {
+			if peerPort.PortNo == port.PortNo { // found port
+				cp := proto.Clone(port).(*voltha.Port_PeerPort)
+				peerPort.Peers = append(peerPort.Peers, cp)
+				log.Debugw("found-peer", log.Fields{"portNo": port.PortNo, "deviceId": agent.deviceId})
+				break
+			}
+		}
+		// Store the device
+		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
@@ -220,12 +416,14 @@
 
 // TODO: A generic device update by attribute
 func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
 	if value == nil {
 		return
 	}
 	var storeDevice *voltha.Device
 	var err error
-	if storeDevice, err = agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+	if storeDevice, err = agent.getDeviceWithoutLock(); err != nil {
 		return
 	}
 	updated := false
@@ -247,10 +445,10 @@
 			}
 		}
 	}
-	log.Debugw("update-field-status", log.Fields{"device": storeDevice, "name": name, "updated": updated})
+	log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
 	//	Save the data
-	cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
-	if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
+	cloned := proto.Clone(storeDevice).(*voltha.Device)
+	if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 		log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
 	}
 	return