VOL-2463 : Enable and disable pon/NNI port Core changes
Change-Id: I7671daf47bad2e2a1ba183d458941e033d529ced
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index ef767bd..b7798ad 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -564,3 +564,51 @@
log.Debugw("SimulateAlarm-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
+
+func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+ log.Debugw("disablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
+ rpc := "disable_port"
+ deviceID := &ic.StrType{Val: device.Id}
+ toTopic := ap.getAdapterTopic(device.Adapter)
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "deviceId",
+ Value: deviceID,
+ }
+
+ args[1] = &kafka.KVArg{
+ Key: "port",
+ Value: port,
+ }
+
+ replyToTopic := ap.getCoreTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ log.Debugw("disablePort-response", log.Fields{"device-id": device.Id, "port-no": port.PortNo, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+ log.Debugw("enablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
+ rpc := "enable_port"
+ deviceID := &ic.StrType{Val: device.Id}
+ toTopic := ap.getAdapterTopic(device.Adapter)
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "deviceId",
+ Value: deviceID,
+ }
+
+ args[1] = &kafka.KVArg{
+ Key: "port",
+ Value: port,
+ }
+
+ replyToTopic := ap.getCoreTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ log.Debugw("enablePort-response", log.Fields{"device-id": device.Id, "port-no": port.PortNo, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 913f9e4..7dd5062 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -1167,28 +1167,13 @@
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) enablePorts(ctx context.Context) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
-
- cloned := agent.getDeviceWithoutLock()
-
- for _, port := range cloned.Ports {
- port.AdminState = voltha.AdminState_ENABLED
- port.OperStatus = voltha.OperStatus_ACTIVE
- }
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *DeviceAgent) disablePorts(ctx context.Context) error {
- log.Debugw("disablePorts", log.Fields{"deviceid": agent.deviceID})
+func (agent *DeviceAgent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
+ log.Debugw("updatePortsOperState", log.Fields{"device-id": agent.deviceID})
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
cloned := agent.getDeviceWithoutLock()
for _, port := range cloned.Ports {
- port.AdminState = voltha.AdminState_DISABLED
- port.OperStatus = voltha.OperStatus_UNKNOWN
+ port.OperStatus = operStatus
}
// Store the device
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
@@ -1208,12 +1193,6 @@
for _, port := range cloned.Ports {
if port.Type == portType && port.PortNo == portNo {
port.OperStatus = operStatus
- // Set the admin status to ENABLED if the operational status is ACTIVE
- // TODO: Set by northbound system?
- if operStatus == voltha.OperStatus_ACTIVE {
- port.AdminState = voltha.AdminState_ENABLED
- }
- break
}
}
log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
@@ -1263,11 +1242,8 @@
}
}
cp := proto.Clone(port).(*voltha.Port)
- // Set the admin state of the port to ENABLE if the operational state is ACTIVE
- // TODO: Set by northbound system?
- if cp.OperStatus == voltha.OperStatus_ACTIVE {
- cp.AdminState = voltha.AdminState_ENABLED
- }
+ // Set the admin state of the port to ENABLE
+ cp.AdminState = voltha.AdminState_ENABLED
cloned.Ports = append(cloned.Ports, cp)
// Store the device
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
@@ -1394,3 +1370,73 @@
// Store the device
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
+
+func (agent *DeviceAgent) disablePort(ctx context.Context, Port *voltha.Port) error {
+ var cp *voltha.Port
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+ // Get the most up to date the device info
+ device := agent.getDeviceWithoutLock()
+ for _, port := range device.Ports {
+ if port.PortNo == Port.PortNo {
+ port.AdminState = voltha.AdminState_DISABLED
+ cp = proto.Clone(port).(*voltha.Port)
+ break
+
+ }
+ }
+ if cp == nil {
+ return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
+ }
+
+ if cp.Type != voltha.Port_PON_OLT {
+ return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", cp.Type)
+ }
+ // Store the device
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
+ log.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+ return err
+ }
+ //send request to adapter
+ if err := agent.adapterProxy.disablePort(ctx, device, cp); err != nil {
+ log.Debugw("DisablePort-error", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+ return err
+ }
+ return nil
+}
+
+func (agent *DeviceAgent) enablePort(ctx context.Context, Port *voltha.Port) error {
+ var cp *voltha.Port
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+ // Get the most up to date the device info
+ device := agent.getDeviceWithoutLock()
+ for _, port := range device.Ports {
+ if port.PortNo == Port.PortNo {
+ port.AdminState = voltha.AdminState_ENABLED
+ cp = proto.Clone(port).(*voltha.Port)
+ break
+ }
+ }
+
+ if cp == nil {
+ return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
+ }
+
+ if cp.Type != voltha.Port_PON_OLT {
+ return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", cp.Type)
+ }
+ // Store the device
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
+ log.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+ return err
+ }
+ //send request to adapter
+ if err := agent.adapterProxy.enablePort(ctx, device, cp); err != nil {
+ log.Debugw("EnablePort-error", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+ return err
+ }
+ return nil
+}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 6d87143..df94772 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -918,19 +918,16 @@
func (dMgr *DeviceManager) updatePortsState(ctx context.Context, deviceID string, state voltha.OperStatus_Types) error {
log.Debugw("updatePortsState", log.Fields{"deviceid": deviceID})
- var adminState voltha.AdminState_Types
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
switch state {
case voltha.OperStatus_ACTIVE:
- adminState = voltha.AdminState_ENABLED
- if err := agent.enablePorts(ctx); err != nil {
- log.Warnw("enable-all-ports-failed", log.Fields{"deviceId": deviceID, "error": err})
+ if err := agent.updatePortsOperState(ctx, state); err != nil {
+ log.Warnw("updatePortsOperState-failed", log.Fields{"deviceId": deviceID, "error": err})
return err
}
case voltha.OperStatus_UNKNOWN:
- adminState = voltha.AdminState_DISABLED
- if err := agent.disablePorts(ctx); err != nil {
- log.Warnw("disable-all-ports-failed", log.Fields{"deviceId": deviceID, "error": err})
+ if err := agent.updatePortsOperState(ctx, state); err != nil {
+ log.Warnw("updatePortsOperState-failed", log.Fields{"deviceId": deviceID, "error": err})
return err
}
default:
@@ -942,7 +939,7 @@
log.Warnw("non-existent-device", log.Fields{"deviceId": deviceID, "error": err})
return err
}
- if err := dMgr.logicalDeviceMgr.updatePortsState(ctx, device, adminState); err != nil {
+ if err := dMgr.logicalDeviceMgr.updatePortsState(ctx, device, state); err != nil {
log.Warnw("failed-updating-ports-state", log.Fields{"deviceId": deviceID, "error": err})
return err
}
@@ -1486,3 +1483,29 @@
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
+
+func (dMgr *DeviceManager) enablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
+ log.Debugw("enablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+ var res interface{}
+ if agent := dMgr.getDeviceAgent(ctx, port.DeviceId); agent != nil {
+ res = agent.enablePort(ctx, port)
+ log.Debugw("enablePort-result", log.Fields{"result": res})
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", port.DeviceId)
+ }
+
+ sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) disablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
+ log.Debugw("disablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+ var res interface{}
+ if agent := dMgr.getDeviceAgent(ctx, port.DeviceId); agent != nil {
+ res = agent.disablePort(ctx, port)
+ log.Debugw("disablePort-result", log.Fields{"result": res})
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", port.DeviceId)
+ }
+
+ sendResponse(ctx, ch, res)
+}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index fd54a50..c0678a3 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -1216,3 +1216,44 @@
func (handler *APIHandler) UpdateMembership(context.Context, *voltha.Membership) (*empty.Empty, error) {
return &empty.Empty{}, errors.New("UnImplemented")
}
+
+func (handler *APIHandler) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+ log.Debugw("EnablePort-request", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+ if isTestMode(ctx) {
+ return &empty.Empty{}, nil
+ }
+
+ if handler.competeForTransaction() {
+ txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: port.DeviceId})
+ if err != nil {
+ return nil, err
+ }
+ defer txn.Close(ctx)
+ }
+
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.enablePort(ctx, port, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
+}
+
+func (handler *APIHandler) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+
+ log.Debugw("DisablePort-request", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+ if isTestMode(ctx) {
+ return &empty.Empty{}, nil
+ }
+
+ if handler.competeForTransaction() {
+ txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: port.DeviceId})
+ if err != nil {
+ return nil, err
+ }
+ defer txn.Close(ctx)
+ }
+
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.disablePort(ctx, port, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
+}
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index b0404c4..c143f42 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -517,6 +517,92 @@
err = waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
assert.Nil(t, err)
}
+func (nb *NBTest) testDisableAndEnablePort(t *testing.T, nbi *APIHandler) {
+ //Get an OLT device
+ var cp *voltha.Port
+ oltDevice, err := nb.getADevice(true, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ for _, cp = range oltDevice.Ports {
+ if cp.Type == voltha.Port_PON_OLT {
+ break
+ }
+
+ }
+ assert.NotNil(t, cp)
+ cp.DeviceId = oltDevice.Id
+
+ // Disable the NW Port of oltDevice
+ _, err = nbi.DisablePort(getContext(), cp)
+ assert.Nil(t, err)
+ // Wait for the olt device Port to be disabled
+ var vdFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ for _, port := range device.Ports {
+ if port.PortNo == cp.PortNo {
+ return port.AdminState == voltha.AdminState_DISABLED
+ }
+ }
+ return false
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ // Wait for the logical device to satisfy the expected condition
+ var vlFunction = func(ld *voltha.LogicalDevice) bool {
+ for _, lp := range ld.Ports {
+ if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+ lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ // Enable the NW Port of oltDevice
+ _, err = nbi.EnablePort(getContext(), cp)
+ assert.Nil(t, err)
+
+ // Wait for the olt device Port to be enabled
+ vdFunction = func(device *voltha.Device) bool {
+ for _, port := range device.Ports {
+ if port.PortNo == cp.PortNo {
+ return port.AdminState == voltha.AdminState_ENABLED
+ }
+ }
+ return false
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ // Wait for the logical device to satisfy the expected condition
+ vlFunction = func(ld *voltha.LogicalDevice) bool {
+ for _, lp := range ld.Ports {
+ if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+ lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ // Disable a non-PON port
+ for _, cp = range oltDevice.Ports {
+ if cp.Type != voltha.Port_PON_OLT {
+ break
+ }
+
+ }
+ assert.NotNil(t, cp)
+ cp.DeviceId = oltDevice.Id
+
+ // Disable the NW Port of oltDevice
+ _, err = nbi.DisablePort(getContext(), cp)
+ assert.NotNil(t, err)
+
+}
func TestSuite1(t *testing.T) {
nb := newNBTest()
@@ -549,6 +635,8 @@
// 5. Test disable and ReEnable a root device
nb.testDisableAndReEnableRootDevice(t, nbi)
+ // 6. Test disable and Enable pon port of OLT device
+ nb.testDisableAndEnablePort(t, nbi)
// 6. Test disable and delete all devices
nb.testDisableAndDeleteAllDevice(t, nbi)
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 80dcfd7..7274e9f 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -441,7 +441,7 @@
}
// updatePortsState updates the ports state related to the device
-func (agent *LogicalDeviceAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.AdminState_Types) error {
+func (agent *LogicalDeviceAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
@@ -449,16 +449,14 @@
cloned := agent.getLogicalDeviceWithoutLock()
for _, lport := range cloned.Ports {
if lport.DeviceId == device.Id {
- switch state {
- case voltha.AdminState_ENABLED:
+ if state == voltha.OperStatus_ACTIVE {
lport.OfpPort.Config = lport.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
lport.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
- case voltha.AdminState_DISABLED:
+ } else {
lport.OfpPort.Config = lport.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
lport.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
- default:
- log.Warnw("unsupported-state-change", log.Fields{"deviceId": device.Id, "state": state})
}
+
}
}
// Updating the logical device will trigger the poprt change events to be populated to the controller
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 4026ba1..853404e 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -521,7 +521,7 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.AdminState_Types) error {
+func (ldMgr *LogicalDeviceManager) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
log.Debugw("updatePortsState", log.Fields{"deviceId": device.Id, "state": state, "current-data": device})
var ldID *string