[VOL-1036] Device management implementation. This update includes
the the ability to reboot and delete a device. It contains changes
to both the Go Core and the Twisted ponsim adapters.
Change-Id: I15539827c654d7186cdae3300a107ffc8e921756
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index bccb227..6d78aa4 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -81,6 +81,116 @@
return unPackResponse(rpc, device.Id, success, result)
}
+
+
+func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
+ rpc := "reenable_device"
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
+ rpc := "reboot_device"
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
+ rpc := "delete_device"
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
+ log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
+ log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
+ if success {
+ unpackResult := &ca.SwitchCapability{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ }
+ return unpackResult, nil
+ } else {
+ unpackResult := &ca.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+ }
+}
+
+func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
+ log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ pNo := &ca.IntType{Val: int64(portNo)}
+ args[1] = &kafka.KVArg{
+ Key: "port_no",
+ Value: pNo,
+ }
+
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
+ log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
+ if success {
+ unpackResult := &ca.PortCapability{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ }
+ return unpackResult, nil
+ } else {
+ unpackResult := &ca.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+ }
+}
+
+//TODO: Implement the functions below
+
func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
log.Debug("AdapterDescriptor")
return nil, nil
@@ -106,30 +216,6 @@
return nil
}
-func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
- log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
- rpc := "reenable_device"
- topic := kafka.Topic{Name: device.Type}
- args := make([]*kafka.KVArg, 1)
- args[0] = &kafka.KVArg{
- Key: "device",
- Value: device,
- }
- success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
- log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
- return unPackResponse(rpc, device.Id, success, result)
-}
-
-func (ap *AdapterProxy) RebootDevice(device *voltha.Device) error {
- log.Debug("RebootDevice")
- return nil
-}
-
-func (ap *AdapterProxy) DeleteDevice(device *voltha.Device) error {
- log.Debug("DeleteDevice")
- return nil
-}
-
func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
log.Debug("GetDeviceDetails")
return nil, nil
@@ -193,68 +279,4 @@
func (ap *AdapterProxy) UnSuppressAlarm(filter voltha.AlarmFilter) error {
log.Debug("UnSuppressAlarm")
return nil
-}
-
-func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
- log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
- topic := kafka.Topic{Name: device.Type}
- args := make([]*kafka.KVArg, 1)
- args[0] = &kafka.KVArg{
- Key: "device",
- Value: device,
- }
- success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
- log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
- if success {
- unpackResult := &ca.SwitchCapability{}
- if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return unpackResult, nil
- } else {
- unpackResult := &ca.Error{}
- var err error
- if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- }
- log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
- }
-}
-
-func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
- log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
- topic := kafka.Topic{Name: device.Type}
- args := make([]*kafka.KVArg, 2)
- args[0] = &kafka.KVArg{
- Key: "device",
- Value: device,
- }
- pNo := &ca.IntType{Val: int64(portNo)}
- args[1] = &kafka.KVArg{
- Key: "port_no",
- Value: pNo,
- }
-
- success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
- log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
- if success {
- unpackResult := &ca.PortCapability{}
- if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return unpackResult, nil
- } else {
- unpackResult := &ca.Error{}
- var err error
- if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- }
- log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
- }
-}
+}
\ No newline at end of file
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 7ae9f1a..0c0609e 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -246,14 +246,12 @@
}
}
}
-
log.Debugw("ChildDeviceDetected", log.Fields{"parentDeviceId": pID.Id, "parentPortNo": portNo.Val,
"deviceType": dt.Val, "channelId": chnlId.Val})
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
-
// Run child detection in it's own go routine as it can be a lengthy process
go rhp.deviceMgr.childDeviceDetected(pID.Id, portNo.Val, dt.Val, chnlId.Val)
@@ -281,29 +279,60 @@
log.Warnw("cannot-unmarshal-operStatus", log.Fields{"error": err})
return nil, err
}
- //if operStatus.Val == -1 {
- // operStatus = nil
- //}
case "connect_status":
if err := ptypes.UnmarshalAny(arg.Value, connStatus); err != nil {
log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
return nil, err
}
- //if connStatus.Val == -1 {
- // connStatus = nil
- //}
}
}
log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
-
// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val), voltha.ConnectStatus_ConnectStatus(connStatus.Val))
return new(empty.Empty), nil
}
+func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+ if len(args) < 2 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ deviceId := &voltha.ID{}
+ operStatus := &ca.IntType{}
+ connStatus := &ca.IntType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+ log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case "oper_status":
+ if err := ptypes.UnmarshalAny(arg.Value, operStatus); err != nil {
+ log.Warnw("cannot-unmarshal-operStatus", log.Fields{"error": err})
+ return nil, err
+ }
+ case "connect_status":
+ if err := ptypes.UnmarshalAny(arg.Value, connStatus); err != nil {
+ log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ log.Debugw("ChildrenStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
+ if rhp.TestMode { // Execute only for test cases
+ return nil, nil
+ }
+
+ // When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
+ go rhp.deviceMgr.updateChildrenStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val), voltha.ConnectStatus_ConnectStatus(connStatus.Val))
+ return new(empty.Empty), nil
+}
+
func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
@@ -369,21 +398,14 @@
}
}
}
-
log.Debugw("PortCreated", log.Fields{"deviceId": deviceId.Id, "port": port})
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
-
// Run port creation in its own go routine
go rhp.deviceMgr.addPort(deviceId.Id, port)
- //if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
- // log.Debugw("addport-error", log.Fields{"deviceId": deviceId.Id, "error": err})
- // return nil, status.Errorf(codes.Internal, "%s", err.Error())
- //}
- // Return an Ack
return new(empty.Empty), nil
}
@@ -409,7 +431,6 @@
}
}
}
-
log.Debugw("DevicePMConfigUpdate", log.Fields{"deviceId": pmConfigs.Id, "configs": pmConfigs,
"init": init})
@@ -420,11 +441,5 @@
// Run PM config update in its own go routine
go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
- //if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
- // log.Debugw("update-pmconfigs-error", log.Fields{"deviceId": pmConfigs.Id, "error": err})
- // return nil, status.Errorf(codes.Internal, "%s", err.Error())
- //}
- // Return an Ack
return new(empty.Empty), nil
-
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 480e32f..a8d5abf 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -62,8 +62,8 @@
log.Info("starting-core")
core.startKafkaMessagingProxy(ctx)
log.Info("values", log.Fields{"kmp": core.kmp})
- core.deviceMgr = NewDeviceManager(core.kmp, core.clusterDataProxy)
- core.logicalDeviceMgr = NewLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
+ core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy)
+ core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
core.registerAdapterRequestHandler(ctx, core.deviceMgr, core.logicalDeviceMgr, core.localDataProxy, core.clusterDataProxy)
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
@@ -137,12 +137,12 @@
// callbacks. For now, until the model is ready, devicemanager will keep a reference to the
// logicaldevicemanager to initiate the creation of logical devices
log.Info("starting-DeviceManager")
- core.deviceMgr.Start(ctx, core.logicalDeviceMgr)
+ core.deviceMgr.start(ctx, core.logicalDeviceMgr)
log.Info("started-DeviceManager")
}
func (core *Core) startLogicalDeviceManager(ctx context.Context) {
log.Info("starting-Logical-DeviceManager")
- core.logicalDeviceMgr.Start(ctx)
+ core.logicalDeviceMgr.start(ctx)
log.Info("started-Logical-DeviceManager")
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index d9dacbc..52ab584 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -40,6 +40,8 @@
lockDevice sync.RWMutex
}
+//newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
+//preprovisioning
func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
var agent DeviceAgent
agent.adapterProxy = ap
@@ -55,6 +57,7 @@
return &agent
}
+// start save the device to the data model and registers for callbacks on that device
func (agent *DeviceAgent) start(ctx context.Context) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -64,12 +67,12 @@
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
}
agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
- //agent.deviceProxy = agent.clusterDataProxy.Root.Node.GetProxy("/", false)
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
log.Debug("device-agent-started")
}
-func (agent *DeviceAgent) Stop(ctx context.Context) {
+// stop stops the device agent. Not much to do for now
+func (agent *DeviceAgent) stop(ctx context.Context) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debug("stopping-device-agent")
@@ -77,6 +80,7 @@
log.Debug("device-agent-stopped")
}
+// getDevice retrieves the latest device information from the data model
func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -89,7 +93,7 @@
return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
}
-//getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
+// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
// This function is meant so that we do not have duplicate code all over the device agent functions
func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
@@ -101,6 +105,7 @@
return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
}
+// enableDevice activates a preprovisioned or disable device
func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -113,6 +118,7 @@
//TODO: Needs customized error message
return nil
}
+ //TODO: if parent device is disabled then do not enable device
// Verify whether we need to adopt the device the first time
// TODO: A state machine for these state transitions would be better (we just have to handle
// a limited set of states now or it may be an overkill)
@@ -140,6 +146,7 @@
return nil
}
+//disableDevice disable a device
func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
agent.lockDevice.Lock()
//defer agent.lockDevice.Unlock()
@@ -183,6 +190,70 @@
return nil
}
+func (agent *DeviceAgent) rebootDevice(ctx context.Context) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("rebootDevice", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ if device.AdminState != voltha.AdminState_DISABLED {
+ log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
+ //TODO: Needs customized error message
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
+ }
+ // First send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.RebootDevice(ctx, device); err != nil {
+ log.Debugw("rebootDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
+ }
+ }
+ return nil
+}
+
+func (agent *DeviceAgent) deleteDevice(ctx context.Context) error {
+ agent.lockDevice.Lock()
+ log.Debugw("deleteDevice", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ if device.AdminState != voltha.AdminState_DISABLED {
+ log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
+ //TODO: Needs customized error message
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
+ }
+ // Send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
+ log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ agent.lockDevice.Unlock()
+ return err
+ }
+ // Set the device Admin state to DELETED in order to trigger the callback to delete
+ // child devices, if any
+ // Received an Ack (no error found above). Now update the device in the model to the expected state
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.AdminState = voltha.AdminState_DELETED
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ }
+ agent.lockDevice.Unlock()
+ //TODO: callback will be invoked to handle this state change
+ //For now force the state transition to happen
+ if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
+ log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
+ return err
+ }
+
+ }
+ return nil
+}
+
+// getPorts retrieves the ports information of the device based on the port type.
func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
ports := &voltha.Ports{}
@@ -196,6 +267,8 @@
return ports
}
+// getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
+// parent device
func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*core_adapter.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceId})
if device, err := agent.deviceMgr.getDevice(agent.deviceId); device == nil {
@@ -211,6 +284,8 @@
}
}
+// getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
+// device
func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*core_adapter.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceId})
if device, err := agent.deviceMgr.getDevice(agent.deviceId); device == nil {
@@ -226,6 +301,8 @@
}
}
+// TODO: implement when callback from the data model is ready
+// processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
log.Debug("!!!!!!!!!!!!!!!!!!!!!!!!!")
log.Debugw("processUpdate", log.Fields{"deviceId": agent.deviceId, "args": args})
@@ -405,11 +482,20 @@
break
}
}
+ //To track an issue when adding peer-port.
+ log.Debugw("before-peer-added", log.Fields{"device": cloned})
// Store the device
afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
+ //To track an issue when adding peer-port.
+ if d, ok := afterUpdate.(*voltha.Device); ok {
+ log.Debugw("after-peer-added", log.Fields{"device": d})
+ } else {
+ log.Debug("after-peer-added-incorrect-type", log.Fields{"type": reflect.ValueOf(afterUpdate).Type()})
+ }
+
return nil
}
}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index e3dbed2..1063e29 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -42,7 +42,7 @@
lockDeviceAgentsMap sync.RWMutex
}
-func NewDeviceManager(kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *DeviceManager {
+func newDeviceManager(kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *DeviceManager {
var deviceMgr DeviceManager
deviceMgr.exitChannel = make(chan int, 1)
deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
@@ -53,14 +53,14 @@
return &deviceMgr
}
-func (dMgr *DeviceManager) Start(ctx context.Context, logicalDeviceMgr *LogicalDeviceManager) {
+func (dMgr *DeviceManager) start(ctx context.Context, logicalDeviceMgr *LogicalDeviceManager) {
log.Info("starting-device-manager")
dMgr.logicalDeviceMgr = logicalDeviceMgr
dMgr.stateTransitions = NewTransitionMap(dMgr)
log.Info("device-manager-started")
}
-func (dMgr *DeviceManager) Stop(ctx context.Context) {
+func (dMgr *DeviceManager) stop(ctx context.Context) {
log.Info("stopping-device-manager")
dMgr.exitChannel <- 1
log.Info("device-manager-stopped")
@@ -86,7 +86,14 @@
}
}
+func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ delete(dMgr.deviceAgents, agent.deviceId)
+}
+
func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
+ // TODO If the device is not in memory it needs to be loaded first
dMgr.lockDeviceAgentsMap.Lock()
defer dMgr.lockDeviceAgentsMap.Unlock()
if agent, ok := dMgr.deviceAgents[deviceId]; ok {
@@ -108,7 +115,6 @@
func (dMgr *DeviceManager) enableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
log.Debugw("enableDevice", log.Fields{"deviceid": id})
-
var res interface{}
if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
res = agent.enableDevice(ctx)
@@ -122,7 +128,6 @@
func (dMgr *DeviceManager) disableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
log.Debugw("disableDevice", log.Fields{"deviceid": id})
-
var res interface{}
if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
res = agent.disableDevice(ctx)
@@ -134,6 +139,34 @@
sendResponse(ctx, ch, res)
}
+func (dMgr *DeviceManager) rebootDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
+ log.Debugw("rebootDevice", log.Fields{"deviceid": id})
+ var res interface{}
+ if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
+ res = agent.rebootDevice(ctx)
+ log.Debugw("rebootDevice-result", log.Fields{"result": res})
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", id.Id)
+ }
+ sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) deleteDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
+ log.Debugw("deleteDevice", log.Fields{"deviceid": id})
+ var res interface{}
+ if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
+ res = agent.deleteDevice(ctx)
+ if res == nil { //Success
+ agent.stop(ctx)
+ dMgr.deleteDeviceAgentToMap(agent)
+ }
+ log.Debugw("deleteDevice-result", log.Fields{"result": res})
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", id.Id)
+ }
+ sendResponse(ctx, ch, res)
+}
+
func (dMgr *DeviceManager) getDevice(id string) (*voltha.Device, error) {
log.Debugw("getDevice", log.Fields{"deviceid": id})
if agent := dMgr.getDeviceAgent(id); agent != nil {
@@ -158,7 +191,6 @@
func (dMgr *DeviceManager) updateDevice(device *voltha.Device) error {
log.Debugw("updateDevice", log.Fields{"deviceid": device.Id, "device": device})
-
if agent := dMgr.getDeviceAgent(device.Id); agent != nil {
return agent.updateDevice(device)
}
@@ -195,7 +227,6 @@
func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*core_adapter.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
-
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getSwitchCapability(ctx)
}
@@ -204,7 +235,6 @@
func (dMgr *DeviceManager) getPorts(ctx context.Context, deviceId string, portType voltha.Port_PortType) (*voltha.Ports, error) {
log.Debugw("getPorts", log.Fields{"deviceid": deviceId, "portType": portType})
-
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getPorts(ctx, portType), nil
}
@@ -214,7 +244,6 @@
func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*core_adapter.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceid": deviceId})
-
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getPortCapability(ctx, portNo)
}
@@ -229,6 +258,30 @@
return status.Errorf(codes.NotFound, "%s", deviceId)
}
+func (dMgr *DeviceManager) updateChildrenStatus(deviceId string, operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
+ log.Debugw("updateChildrenStatus", log.Fields{"parentDeviceid": deviceId, "operStatus": operStatus, "connStatus": connStatus})
+ var parentDevice *voltha.Device
+ var err error
+ if parentDevice, err = dMgr.getDevice(deviceId); err != nil {
+ return status.Errorf(codes.Aborted, "%s", err.Error())
+ }
+ var childDeviceIds []string
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+ return status.Errorf(codes.Aborted, "%s", err.Error())
+ }
+ if len(childDeviceIds) == 0 {
+ log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
+ }
+ for _, childDeviceId := range childDeviceIds {
+ if agent := dMgr.getDeviceAgent(childDeviceId); agent != nil {
+ if err = agent.updateDeviceStatus(operStatus, connStatus); err != nil {
+ return status.Errorf(codes.Aborted, "childDevice:%s, error:%s", childDeviceId, err.Error())
+ }
+ }
+ }
+ return nil
+}
+
func (dMgr *DeviceManager) updatePortState(deviceId string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
log.Debugw("updatePortState", log.Fields{"deviceid": deviceId, "portType": portType, "portNo": portNo, "operStatus": operStatus})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
@@ -274,10 +327,6 @@
return err
}
}
- //if handler != nil {
- // log.Debugw("found-handlers", log.Fields{"handlers": funcName(handler)})
- // return handler(current)
- //}
return nil
}
@@ -285,7 +334,7 @@
log.Info("createLogicalDevice")
var logicalId *string
var err error
- if logicalId, err = dMgr.logicalDeviceMgr.CreateLogicalDevice(nil, cDevice); err != nil {
+ if logicalId, err = dMgr.logicalDeviceMgr.createLogicalDevice(nil, cDevice); err != nil {
log.Warnw("createlogical-device-error", log.Fields{"device": cDevice})
return err
}
@@ -297,7 +346,7 @@
func (dMgr *DeviceManager) deleteLogicalDevice(cDevice *voltha.Device) error {
log.Info("deleteLogicalDevice")
var err error
- if err = dMgr.logicalDeviceMgr.DeleteLogicalDevice(nil, cDevice); err != nil {
+ if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(nil, cDevice); err != nil {
log.Warnw("deleteLogical-device-error", log.Fields{"deviceId": cDevice.Id})
return err
}
@@ -310,13 +359,10 @@
func (dMgr *DeviceManager) deleteLogicalPort(cDevice *voltha.Device) error {
log.Info("deleteLogicalPort")
var err error
- if err = dMgr.logicalDeviceMgr.DeleteLogicalPort(nil, cDevice); err != nil {
+ if err = dMgr.logicalDeviceMgr.deleteLogicalPort(nil, cDevice); err != nil {
log.Warnw("deleteLogical-port-error", log.Fields{"deviceId": cDevice.Id})
return err
}
- //// Remove the logical device Id from the parent device
- //logicalId := ""
- //dMgr.UpdateDeviceAttribute(cDevice.Id, "ParentId", logicalId)
return nil
}
@@ -330,38 +376,75 @@
return parentDevice
}
-func (dMgr *DeviceManager) disableAllChildDevices(cDevice *voltha.Device) error {
+/*
+All the functions below are callback functions where they are invoked with the latest and previous data. We can
+therefore use the data as is without trying to get the latest from the model.
+*/
+
+//disableAllChildDevices is invoked as a callback when the parent device is disabled
+func (dMgr *DeviceManager) disableAllChildDevices(parentDevice *voltha.Device) error {
log.Debug("disableAllChildDevices")
var childDeviceIds []string
var err error
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(cDevice); err != nil {
- return status.Errorf(codes.NotFound, "%s", cDevice.Id)
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+ return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
}
if len(childDeviceIds) == 0 {
- log.Debugw("no-child-device", log.Fields{"deviceId": cDevice.Id})
+ log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
}
+ allChildDisable := true
for _, childDeviceId := range childDeviceIds {
if agent := dMgr.getDeviceAgent(childDeviceId); agent != nil {
if err = agent.disableDevice(nil); err != nil {
log.Errorw("failure-disable-device", log.Fields{"deviceId": childDeviceId, "error": err.Error()})
+ allChildDisable = false
}
}
}
+ if !allChildDisable {
+ return err
+ }
return nil
}
-func (dMgr *DeviceManager) getAllChildDeviceIds(cDevice *voltha.Device) ([]string, error) {
- log.Info("getAllChildDeviceIds")
- // Get latest device info
- var device *voltha.Device
+//deleteAllChildDevices is invoked as a callback when the parent device is deleted
+func (dMgr *DeviceManager) deleteAllChildDevices(parentDevice *voltha.Device) error {
+ log.Debug("deleteAllChildDevices")
+ var childDeviceIds []string
var err error
- if device, err = dMgr.getDevice(cDevice.Id); err != nil {
- return nil, status.Errorf(codes.NotFound, "%s", cDevice.Id)
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+ return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
}
+ if len(childDeviceIds) == 0 {
+ log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
+ }
+ allChildDeleted := true
+ for _, childDeviceId := range childDeviceIds {
+ if agent := dMgr.getDeviceAgent(childDeviceId); agent != nil {
+ if err = agent.deleteDevice(nil); err != nil {
+ log.Errorw("failure-delete-device", log.Fields{"deviceId": childDeviceId, "error": err.Error()})
+ allChildDeleted = false
+ } else {
+ agent.stop(nil)
+ dMgr.deleteDeviceAgentToMap(agent)
+ }
+ }
+ }
+ if !allChildDeleted {
+ return err
+ }
+ return nil
+}
+
+//getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
+func (dMgr *DeviceManager) getAllChildDeviceIds(parentDevice *voltha.Device) ([]string, error) {
+ log.Debugw("getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id})
childDeviceIds := make([]string, 0)
- for _, port := range device.Ports {
- for _, peer := range port.Peers {
- childDeviceIds = append(childDeviceIds, peer.DeviceId)
+ if parentDevice != nil {
+ for _, port := range parentDevice.Ports {
+ for _, peer := range port.Peers {
+ childDeviceIds = append(childDeviceIds, peer.DeviceId)
+ }
}
}
return childDeviceIds, nil
@@ -369,7 +452,7 @@
func (dMgr *DeviceManager) addUNILogicalPort(cDevice *voltha.Device) error {
log.Info("addUNILogicalPort")
- if err := dMgr.logicalDeviceMgr.AddUNILogicalPort(nil, cDevice); err != nil {
+ if err := dMgr.logicalDeviceMgr.addUNILogicalPort(nil, cDevice); err != nil {
log.Warnw("addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
return err
}
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index a84f03a..b3fecc0 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -112,7 +112,7 @@
handlers: []TransitionHandler{dMgr.abandonDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
- deviceType: any,
+ deviceType: child,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.notAllowed}})
@@ -128,14 +128,12 @@
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.reEnableDevice}})
-
transitionMap.transitions = append(transitionMap.transitions,
Transition{
- deviceType: any,
+ deviceType: parent,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handlers: []TransitionHandler{dMgr.notAllowed}})
-
+ currentState: DeviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ handlers: []TransitionHandler{dMgr.deleteAllChildDevices}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index d446438..e036998 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -40,12 +40,34 @@
logicalDeviceMgr: lDeviceMgr}
return handler
}
+
+// isTestMode is a helper function to determine a function is invoked for testing only
func isTestMode(ctx context.Context) bool {
md, _ := metadata.FromIncomingContext(ctx)
_, exist := md[common.TestModeKeys_api_test.String()]
return exist
}
+// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
+// response is expected in a successful scenario
+func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
+ select {
+ case res := <-ch:
+ if res == nil {
+ return new(empty.Empty), nil
+ } else if err, ok := res.(error); ok {
+ return new(empty.Empty), err
+ } else {
+ log.Warnw("unexpected-return-type", log.Fields{"result": res})
+ err = status.Errorf(codes.Internal, "%s", res)
+ return new(empty.Empty), err
+ }
+ case <-ctx.Done():
+ log.Debug("client-timeout")
+ return nil, ctx.Err()
+ }
+}
+
func (handler *APIHandler) UpdateLogLevel(ctx context.Context, logging *voltha.Logging) (*empty.Empty, error) {
log.Debugw("UpdateLogLevel-request", log.Fields{"newloglevel": logging.Level, "intval": int(logging.Level)})
out := new(empty.Empty)
@@ -53,7 +75,7 @@
return out, nil
}
-func processEnableDevicePort(ctx context.Context, id *voltha.LogicalPortId, ch chan error) {
+func processEnableDevicePort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
log.Debugw("processEnableDevicePort", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
ch <- status.Errorf(100, "%d-%s", 100, "erreur")
}
@@ -64,15 +86,10 @@
out := new(empty.Empty)
return out, nil
}
- ch := make(chan error)
+ ch := make(chan interface{})
+ defer close(ch)
go processEnableDevicePort(ctx, id, ch)
- select {
- case resp := <-ch:
- close(ch)
- return new(empty.Empty), resp
- case <-ctx.Done():
- return nil, ctx.Err()
- }
+ return waitForNilResponseOnSuccess(ctx, ch)
}
func (handler *APIHandler) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
@@ -126,6 +143,7 @@
return handler.logicalDeviceMgr.listLogicalDevices()
}
+// CreateDevice creates a new parent device in the data model
func (handler *APIHandler) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
log.Debugw("createdevice", log.Fields{"device": *device})
if isTestMode(ctx) {
@@ -153,75 +171,52 @@
}
}
+// EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
func (handler *APIHandler) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
log.Debugw("enabledevice", log.Fields{"id": id})
if isTestMode(ctx) {
- out := new(empty.Empty)
- return out, nil
+ return new(empty.Empty), nil
}
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.enableDevice(ctx, id, ch)
- select {
- case res := <-ch:
- if res == nil {
- return new(empty.Empty), nil
- } else if err, ok := res.(error); ok {
- return new(empty.Empty), err
- } else {
- log.Warnw("enable-device-unexpected-return-type", log.Fields{"result": res})
- err = status.Errorf(codes.Internal, "%s", res)
- return new(empty.Empty), err
- }
- case <-ctx.Done():
- log.Debug("enabledevice-client-timeout")
- return nil, ctx.Err()
- }
+ return waitForNilResponseOnSuccess(ctx, ch)
}
+// DisableDevice disables a device along with any child device it may have
func (handler *APIHandler) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
log.Debugw("disabledevice-request", log.Fields{"id": id})
if isTestMode(ctx) {
- out := new(empty.Empty)
- return out, nil
+ return new(empty.Empty), nil
}
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.disableDevice(ctx, id, ch)
- select {
- case res := <-ch:
- if res == nil {
- return new(empty.Empty), nil
- } else if err, ok := res.(error); ok {
- return new(empty.Empty), err
- } else {
- log.Warnw("disable-device-unexpected-return-type", log.Fields{"result": res})
- err = status.Errorf(codes.Internal, "%s", res)
- return new(empty.Empty), err
- }
- case <-ctx.Done():
- log.Debug("enabledevice-client-timeout")
- return nil, ctx.Err()
- }
- return nil, errors.New("Unimplemented")
+ return waitForNilResponseOnSuccess(ctx, ch)
}
+//RebootDevice invoked the reboot API to the corresponding adapter
func (handler *APIHandler) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
- log.Debugw("disabledevice-request", log.Fields{"id": id})
+ log.Debugw("rebootDevice-request", log.Fields{"id": id})
if isTestMode(ctx) {
- out := new(empty.Empty)
- return out, nil
+ return new(empty.Empty), nil
}
- return nil, errors.New("Unimplemented")
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.rebootDevice(ctx, id, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
}
+// DeleteDevice removes a device from the data model
func (handler *APIHandler) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
log.Debugw("deletedevice-request", log.Fields{"id": id})
if isTestMode(ctx) {
- out := new(empty.Empty)
- return out, nil
+ return new(empty.Empty), nil
}
- return nil, errors.New("Unimplemented")
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.deleteDevice(ctx, id, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
}
func (handler *APIHandler) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 218478f..dce2db7 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -39,7 +39,7 @@
lockLogicalDevice sync.RWMutex
}
-func NewLogicalDeviceAgent(id string, device *voltha.Device, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager,
+func newLogicalDeviceAgent(id string, device *voltha.Device, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager,
cdProxy *model.Proxy) *LogicalDeviceAgent {
var agent LogicalDeviceAgent
agent.exitChannel = make(chan int, 1)
@@ -52,7 +52,8 @@
return &agent
}
-func (agent *LogicalDeviceAgent) Start(ctx context.Context) error {
+// start creates the logical device and add it to the data model
+func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
//Build the logical device based on information retrieved from the device adapter
var switchCap *ca.SwitchCapability
@@ -97,6 +98,22 @@
return nil
}
+// stop stops the logical devuce agent. This removes the logical device from the data model.
+func (agent *LogicalDeviceAgent) stop(ctx context.Context) {
+ log.Info("stopping-logical_device-agent")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ //Remove the logical device from the model
+ if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
+ log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ } else {
+ log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ }
+ agent.exitChannel <- 1
+ log.Info("logical_device-agent-stopped")
+}
+
+// getLogicalDevice locks the logical device model and then retrieves the latest logical device information
func (agent *LogicalDeviceAgent) getLogicalDevice() (*voltha.LogicalDevice, error) {
log.Debug("getLogicalDevice")
agent.lockLogicalDevice.Lock()
@@ -109,6 +126,8 @@
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
+// getLogicalDeviceWithoutLock retrieves a logical device from the model without locking it. This is used only by
+// functions that have already acquired the logical device lock to the model
func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
log.Debug("getLogicalDeviceWithoutLock")
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
@@ -119,6 +138,7 @@
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
+// addUNILogicalPort creates a UNI port on the logical device that represents a child device
func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, portNo uint32) error {
log.Infow("addUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
@@ -179,16 +199,4 @@
return nil
}
-func (agent *LogicalDeviceAgent) Stop(ctx context.Context) {
- log.Info("stopping-logical_device-agent")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
- //Remove the logical device from the model
- if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
- log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
- } else {
- log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
- }
- agent.exitChannel <- 1
- log.Info("logical_device-agent-stopped")
-}
+
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index bef078c..8f8548a 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -38,7 +38,7 @@
lockLogicalDeviceAgentsMap sync.RWMutex
}
-func NewLogicalDeviceManager(deviceMgr *DeviceManager, kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
+func newLogicalDeviceManager(deviceMgr *DeviceManager, kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
var logicalDeviceMgr LogicalDeviceManager
logicalDeviceMgr.exitChannel = make(chan int, 1)
logicalDeviceMgr.logicalDeviceAgents = make(map[string]*LogicalDeviceAgent)
@@ -49,12 +49,12 @@
return &logicalDeviceMgr
}
-func (ldMgr *LogicalDeviceManager) Start(ctx context.Context) {
+func (ldMgr *LogicalDeviceManager) start(ctx context.Context) {
log.Info("starting-logical-device-manager")
log.Info("logical-device-manager-started")
}
-func (ldMgr *LogicalDeviceManager) Stop(ctx context.Context) {
+func (ldMgr *LogicalDeviceManager) stop(ctx context.Context) {
log.Info("stopping-logical-device-manager")
ldMgr.exitChannel <- 1
log.Info("logical-device-manager-stopped")
@@ -105,7 +105,7 @@
return result, nil
}
-func (ldMgr *LogicalDeviceManager) CreateLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
+func (ldMgr *LogicalDeviceManager) createLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
log.Debugw("creating-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
if !device.Root {
@@ -124,15 +124,15 @@
}
log.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
- agent := NewLogicalDeviceAgent(id, device, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+ agent := newLogicalDeviceAgent(id, device, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
ldMgr.addLogicalDeviceAgentToMap(agent)
- go agent.Start(ctx)
+ go agent.start(ctx)
log.Debug("creating-logical-device-ends")
return &id, nil
}
-func (ldMgr *LogicalDeviceManager) DeleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
+func (ldMgr *LogicalDeviceManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("deleting-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
if !device.Root {
@@ -141,7 +141,7 @@
logDeviceId := device.ParentId
if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
// Stop the logical device agent
- agent.Stop(ctx)
+ agent.stop(ctx)
//Remove the logical device agent from the Map
ldMgr.deleteLogicalDeviceAgent(logDeviceId)
}
@@ -165,7 +165,7 @@
}
// DeleteLogicalDevice removes the logical port associated with a child device
-func (ldMgr *LogicalDeviceManager) DeleteLogicalPort(ctx context.Context, device *voltha.Device) error {
+func (ldMgr *LogicalDeviceManager) deleteLogicalPort(ctx context.Context, device *voltha.Device) error {
log.Debugw("deleting-logical-port", log.Fields{"deviceId": device.Id})
// Sanity check
if device.Root {
@@ -184,7 +184,7 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) AddUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
+func (ldMgr *LogicalDeviceManager) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
log.Debugw("AddUNILogicalPort", log.Fields{"deviceId": childDevice.Id})
// Sanity check
if childDevice.Root {