[VOL-1605] Update disable/reenable device management logic
This is the initial commit of updating the device management
logic around disable and reenable of a device.
Change-Id: If6d40a0055e5e1ab61503b9ae9c5a4070ec53f35
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index ed9f761..9eae89e 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -677,6 +677,56 @@
return new(empty.Empty), nil
}
+func (rhp *AdapterRequestHandlerProxy) PortsStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 2 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ deviceId := &voltha.ID{}
+ operStatus := &ic.IntType{}
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+ log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case "oper_status":
+ if err := ptypes.UnmarshalAny(arg.Value, operStatus); err != nil {
+ log.Warnw("cannot-unmarshal-operStatus", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ log.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
+
+ if rhp.TestMode { // Execute only for test cases
+ return nil, nil
+ }
+
+ go rhp.deviceMgr.updatePortsState(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val))
+
+ return new(empty.Empty), nil
+}
+
func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 836269e..fcac69c 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -17,6 +17,7 @@
import (
"context"
+ "fmt"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
@@ -140,7 +141,7 @@
return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
}
-// enableDevice activates a preprovisioned or disable device
+// enableDevice activates a preprovisioned or a disable device
func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -161,13 +162,24 @@
if device.AdminState == voltha.AdminState_ENABLED {
log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceId})
- //TODO: Needs customized error message
return nil
}
- //TODO: if parent device is disabled then do not enable device
- // Verify whether we need to adopt the device the first time
- // TODO: A state machine for these state transitions would be better (we just have to handle
- // a limited set of states now or it may be an overkill)
+ // If this is a child device then verify the parent state before proceeding
+ if !agent.isRootdevice {
+ if parent := agent.deviceMgr.getParentDevice(device); parent != nil {
+ if parent.AdminState == voltha.AdminState_DISABLED ||
+ parent.AdminState == voltha.AdminState_DELETED ||
+ parent.AdminState == voltha.AdminState_UNKNOWN {
+ err = status.Error(codes.FailedPrecondition, fmt.Sprintf("incorrect-parent-state: %s %d", parent.Id, parent.AdminState))
+ log.Warnw("incorrect-parent-state", log.Fields{"id": agent.deviceId, "error": err})
+ return err
+ }
+ } else {
+ err = status.Error(codes.Unavailable, fmt.Sprintf("parent-not-existent: %s ", device.Id))
+ log.Warnw("parent-not-existent", log.Fields{"id": agent.deviceId, "error": err})
+ return err
+ }
+ }
if device.AdminState == voltha.AdminState_PREPROVISIONED {
// First send the request to an Adapter and wait for a response
if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
@@ -312,15 +324,14 @@
//disableDevice disable a device
func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
agent.lockDevice.Lock()
- //defer agent.lockDevice.Unlock()
log.Debugw("disableDevice", log.Fields{"id": agent.deviceId})
// Get the most up to date the device info
if device, err := agent.getDeviceWithoutLock(); err != nil {
+ agent.lockDevice.Unlock()
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
if device.AdminState == voltha.AdminState_DISABLED {
log.Debugw("device-already-disabled", log.Fields{"id": agent.deviceId})
- //TODO: Needs customized error message
agent.lockDevice.Unlock()
return nil
}
@@ -333,11 +344,6 @@
// Received an Ack (no error found above). Now update the device in the model to the expected state
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = voltha.AdminState_DISABLED
- // Set the state of all ports on that device to disable
- for _, port := range cloned.Ports {
- port.AdminState = voltha.AdminState_DISABLED
- port.OperStatus = voltha.OperStatus_UNKNOWN
- }
if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
agent.lockDevice.Unlock()
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
@@ -774,6 +780,46 @@
}
}
+func (agent *DeviceAgent) enablePorts() error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // clone the device
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ for _, port := range cloned.Ports {
+ port.AdminState = voltha.AdminState_ENABLED
+ port.OperStatus = voltha.OperStatus_ACTIVE
+ }
+ // Store the device
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ return nil
+ }
+}
+
+func (agent *DeviceAgent) disablePorts() error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // clone the device
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ for _, port := range cloned.Ports {
+ port.AdminState = voltha.AdminState_DISABLED
+ port.OperStatus = voltha.OperStatus_UNKNOWN
+ }
+ // Store the device
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ return nil
+ }
+}
+
func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
agent.lockDevice.Lock()
// Work only on latest data
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f6540e4..d173308 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -608,6 +608,43 @@
return status.Errorf(codes.NotFound, "%s", deviceId)
}
+//updatePortsState updates all ports on the device
+func (dMgr *DeviceManager) updatePortsState(deviceId string, state voltha.OperStatus_OperStatus) error {
+ log.Debugw("updatePortsState", log.Fields{"deviceid": deviceId})
+
+ var adminState voltha.AdminState_AdminState
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ switch state {
+ case voltha.OperStatus_ACTIVE:
+ adminState = voltha.AdminState_ENABLED
+ if err := agent.enablePorts(); err != nil {
+ log.Warnw("enable-all-ports-failed", log.Fields{"deviceId": deviceId, "error": err})
+ return err
+ }
+ case voltha.OperStatus_UNKNOWN:
+ adminState = voltha.AdminState_DISABLED
+ if err := agent.disablePorts(); err != nil {
+ log.Warnw("disable-all-ports-failed", log.Fields{"deviceId": deviceId, "error": err})
+ return err
+ }
+ default:
+ return status.Error(codes.Unimplemented, "state-change-not-implemented")
+ }
+ // Notify the logical device about the state change
+ if device, err := dMgr.GetDevice(deviceId); err != nil {
+ log.Warnw("non-existent-device", log.Fields{"deviceId": deviceId, "error": err})
+ return err
+ } else {
+ if err := dMgr.logicalDeviceMgr.updatePortsState(device, adminState); err != nil {
+ log.Warnw("failed-updating-ports-state", log.Fields{"deviceId": deviceId, "error": err})
+ return err
+ }
+ return nil
+ }
+ }
+ return status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
func (dMgr *DeviceManager) childDeviceDetected(parentDeviceId string, parentPortNo int64, deviceType string,
channelId int64, vendorId string, serialNumber string, onuId int64) error {
log.Debugw("childDeviceDetected", log.Fields{"parentDeviceId": parentDeviceId})
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index a2b4494..c07335e 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -313,6 +313,40 @@
return err
}
+// updatePortsState updates the ports state related to the device
+func (agent *LogicalDeviceAgent) updatePortsState(device *voltha.Device, state voltha.AdminState_AdminState) error {
+ log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ // Get the latest logical device info
+ if ld, err := agent.getLogicalDeviceWithoutLock(); err != nil {
+ log.Warnw("logical-device-unknown", log.Fields{"ldeviceId": agent.logicalDeviceId, "error": err})
+ return err
+ } else {
+ cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
+ for _, lport := range cloned.Ports {
+ if lport.DeviceId == device.Id {
+ switch state {
+ case voltha.AdminState_ENABLED:
+ lport.OfpPort.Config = lport.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ lport.OfpPort.State = lport.OfpPort.State & ^uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ case voltha.AdminState_DISABLED:
+ lport.OfpPort.Config = lport.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ lport.OfpPort.State = lport.OfpPort.State | uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ default:
+ log.Warnw("unsupported-state-change", log.Fields{"deviceId": device.Id, "state": state})
+ }
+ }
+ }
+ // Updating the logical device will trigger the poprt change events to be populated to the controller
+ if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+ log.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceId, "error": err})
+ return err
+ }
+ }
+ return nil
+}
+
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 2191745..e9b5a4f 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -230,6 +230,16 @@
return nil, status.Errorf(codes.NotFound, "%s", device.Id)
}
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceIdFromDeviceId(deviceId string) (*string, error) {
+ // Get the device
+ var device *voltha.Device
+ var err error
+ if device, err = ldMgr.deviceMgr.GetDevice(deviceId); err != nil {
+ return nil, err
+ }
+ return ldMgr.getLogicalDeviceId(device)
+}
+
func (ldMgr *LogicalDeviceManager) getLogicalPortId(device *voltha.Device) (*voltha.LogicalPortId, error) {
// Get the logical device where this device is attached
var lDeviceId *string
@@ -356,6 +366,24 @@
return nil
}
+func (ldMgr *LogicalDeviceManager) updatePortsState(device *voltha.Device, state voltha.AdminState_AdminState) error {
+ log.Debugw("updatePortsState", log.Fields{"deviceId": device.Id, "state": state})
+
+ var ldId *string
+ var err error
+ //Get the logical device Id for this device
+ if ldId, err = ldMgr.getLogicalDeviceId(device); err != nil {
+ log.Warnw("no-logical-device-found", log.Fields{"deviceId": device.Id, "error": err})
+ return err
+ }
+ if agent := ldMgr.getLogicalDeviceAgent(*ldId); agent != nil {
+ if err := agent.updatePortsState(device, state); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func (ldMgr *LogicalDeviceManager) updateFlowTable(ctx context.Context, id string, flow *openflow_13.OfpFlowMod, ch chan interface{}) {
log.Debugw("updateFlowTable", log.Fields{"logicalDeviceId": id})
var res interface{}