VOL-2463 : Enable and disable pon/NNI port Core changes

Change-Id: I7671daf47bad2e2a1ba183d458941e033d529ced
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index ef767bd..b7798ad 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -564,3 +564,51 @@
 	log.Debugw("SimulateAlarm-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
+
+func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+	log.Debugw("disablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
+	rpc := "disable_port"
+	deviceID := &ic.StrType{Val: device.Id}
+	toTopic := ap.getAdapterTopic(device.Adapter)
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	args := make([]*kafka.KVArg, 2)
+	args[0] = &kafka.KVArg{
+		Key:   "deviceId",
+		Value: deviceID,
+	}
+
+	args[1] = &kafka.KVArg{
+		Key:   "port",
+		Value: port,
+	}
+
+	replyToTopic := ap.getCoreTopic()
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	log.Debugw("disablePort-response", log.Fields{"device-id": device.Id, "port-no": port.PortNo, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+	log.Debugw("enablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
+	rpc := "enable_port"
+	deviceID := &ic.StrType{Val: device.Id}
+	toTopic := ap.getAdapterTopic(device.Adapter)
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	args := make([]*kafka.KVArg, 2)
+	args[0] = &kafka.KVArg{
+		Key:   "deviceId",
+		Value: deviceID,
+	}
+
+	args[1] = &kafka.KVArg{
+		Key:   "port",
+		Value: port,
+	}
+
+	replyToTopic := ap.getCoreTopic()
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	log.Debugw("enablePort-response", log.Fields{"device-id": device.Id, "port-no": port.PortNo, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
+}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 913f9e4..7dd5062 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -1167,28 +1167,13 @@
 	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) enablePorts(ctx context.Context) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-
-	cloned := agent.getDeviceWithoutLock()
-
-	for _, port := range cloned.Ports {
-		port.AdminState = voltha.AdminState_ENABLED
-		port.OperStatus = voltha.OperStatus_ACTIVE
-	}
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *DeviceAgent) disablePorts(ctx context.Context) error {
-	log.Debugw("disablePorts", log.Fields{"deviceid": agent.deviceID})
+func (agent *DeviceAgent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
+	log.Debugw("updatePortsOperState", log.Fields{"device-id": agent.deviceID})
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	cloned := agent.getDeviceWithoutLock()
 	for _, port := range cloned.Ports {
-		port.AdminState = voltha.AdminState_DISABLED
-		port.OperStatus = voltha.OperStatus_UNKNOWN
+		port.OperStatus = operStatus
 	}
 	// Store the device
 	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
@@ -1208,12 +1193,6 @@
 	for _, port := range cloned.Ports {
 		if port.Type == portType && port.PortNo == portNo {
 			port.OperStatus = operStatus
-			// Set the admin status to ENABLED if the operational status is ACTIVE
-			// TODO: Set by northbound system?
-			if operStatus == voltha.OperStatus_ACTIVE {
-				port.AdminState = voltha.AdminState_ENABLED
-			}
-			break
 		}
 	}
 	log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
@@ -1263,11 +1242,8 @@
 		}
 	}
 	cp := proto.Clone(port).(*voltha.Port)
-	// Set the admin state of the port to ENABLE if the operational state is ACTIVE
-	// TODO: Set by northbound system?
-	if cp.OperStatus == voltha.OperStatus_ACTIVE {
-		cp.AdminState = voltha.AdminState_ENABLED
-	}
+	// Set the admin state of the port to ENABLE
+	cp.AdminState = voltha.AdminState_ENABLED
 	cloned.Ports = append(cloned.Ports, cp)
 	// Store the device
 	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
@@ -1394,3 +1370,73 @@
 	// Store the device
 	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
+
+func (agent *DeviceAgent) disablePort(ctx context.Context, Port *voltha.Port) error {
+	var cp *voltha.Port
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+	// Get the most up to date the device info
+	device := agent.getDeviceWithoutLock()
+	for _, port := range device.Ports {
+		if port.PortNo == Port.PortNo {
+			port.AdminState = voltha.AdminState_DISABLED
+			cp = proto.Clone(port).(*voltha.Port)
+			break
+
+		}
+	}
+	if cp == nil {
+		return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
+	}
+
+	if cp.Type != voltha.Port_PON_OLT {
+		return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", cp.Type)
+	}
+	// Store the device
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
+		log.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+		return err
+	}
+	//send request to adapter
+	if err := agent.adapterProxy.disablePort(ctx, device, cp); err != nil {
+		log.Debugw("DisablePort-error", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+		return err
+	}
+	return nil
+}
+
+func (agent *DeviceAgent) enablePort(ctx context.Context, Port *voltha.Port) error {
+	var cp *voltha.Port
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+	// Get the most up to date the device info
+	device := agent.getDeviceWithoutLock()
+	for _, port := range device.Ports {
+		if port.PortNo == Port.PortNo {
+			port.AdminState = voltha.AdminState_ENABLED
+			cp = proto.Clone(port).(*voltha.Port)
+			break
+		}
+	}
+
+	if cp == nil {
+		return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
+	}
+
+	if cp.Type != voltha.Port_PON_OLT {
+		return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", cp.Type)
+	}
+	// Store the device
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
+		log.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+		return err
+	}
+	//send request to adapter
+	if err := agent.adapterProxy.enablePort(ctx, device, cp); err != nil {
+		log.Debugw("EnablePort-error", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+		return err
+	}
+	return nil
+}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 6d87143..df94772 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -918,19 +918,16 @@
 func (dMgr *DeviceManager) updatePortsState(ctx context.Context, deviceID string, state voltha.OperStatus_Types) error {
 	log.Debugw("updatePortsState", log.Fields{"deviceid": deviceID})
 
-	var adminState voltha.AdminState_Types
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		switch state {
 		case voltha.OperStatus_ACTIVE:
-			adminState = voltha.AdminState_ENABLED
-			if err := agent.enablePorts(ctx); err != nil {
-				log.Warnw("enable-all-ports-failed", log.Fields{"deviceId": deviceID, "error": err})
+			if err := agent.updatePortsOperState(ctx, state); err != nil {
+				log.Warnw("updatePortsOperState-failed", log.Fields{"deviceId": deviceID, "error": err})
 				return err
 			}
 		case voltha.OperStatus_UNKNOWN:
-			adminState = voltha.AdminState_DISABLED
-			if err := agent.disablePorts(ctx); err != nil {
-				log.Warnw("disable-all-ports-failed", log.Fields{"deviceId": deviceID, "error": err})
+			if err := agent.updatePortsOperState(ctx, state); err != nil {
+				log.Warnw("updatePortsOperState-failed", log.Fields{"deviceId": deviceID, "error": err})
 				return err
 			}
 		default:
@@ -942,7 +939,7 @@
 			log.Warnw("non-existent-device", log.Fields{"deviceId": deviceID, "error": err})
 			return err
 		}
-		if err := dMgr.logicalDeviceMgr.updatePortsState(ctx, device, adminState); err != nil {
+		if err := dMgr.logicalDeviceMgr.updatePortsState(ctx, device, state); err != nil {
 			log.Warnw("failed-updating-ports-state", log.Fields{"deviceId": deviceID, "error": err})
 			return err
 		}
@@ -1486,3 +1483,29 @@
 	}
 	return status.Errorf(codes.NotFound, "%s", deviceID)
 }
+
+func (dMgr *DeviceManager) enablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
+	log.Debugw("enablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+	var res interface{}
+	if agent := dMgr.getDeviceAgent(ctx, port.DeviceId); agent != nil {
+		res = agent.enablePort(ctx, port)
+		log.Debugw("enablePort-result", log.Fields{"result": res})
+	} else {
+		res = status.Errorf(codes.NotFound, "%s", port.DeviceId)
+	}
+
+	sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) disablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
+	log.Debugw("disablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+	var res interface{}
+	if agent := dMgr.getDeviceAgent(ctx, port.DeviceId); agent != nil {
+		res = agent.disablePort(ctx, port)
+		log.Debugw("disablePort-result", log.Fields{"result": res})
+	} else {
+		res = status.Errorf(codes.NotFound, "%s", port.DeviceId)
+	}
+
+	sendResponse(ctx, ch, res)
+}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index fd54a50..c0678a3 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -1216,3 +1216,44 @@
 func (handler *APIHandler) UpdateMembership(context.Context, *voltha.Membership) (*empty.Empty, error) {
 	return &empty.Empty{}, errors.New("UnImplemented")
 }
+
+func (handler *APIHandler) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+	log.Debugw("EnablePort-request", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+	if isTestMode(ctx) {
+		return &empty.Empty{}, nil
+	}
+
+	if handler.competeForTransaction() {
+		txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: port.DeviceId})
+		if err != nil {
+			return nil, err
+		}
+		defer txn.Close(ctx)
+	}
+
+	ch := make(chan interface{})
+	defer close(ch)
+	go handler.deviceMgr.enablePort(ctx, port, ch)
+	return waitForNilResponseOnSuccess(ctx, ch)
+}
+
+func (handler *APIHandler) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+
+	log.Debugw("DisablePort-request", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+	if isTestMode(ctx) {
+		return &empty.Empty{}, nil
+	}
+
+	if handler.competeForTransaction() {
+		txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: port.DeviceId})
+		if err != nil {
+			return nil, err
+		}
+		defer txn.Close(ctx)
+	}
+
+	ch := make(chan interface{})
+	defer close(ch)
+	go handler.deviceMgr.disablePort(ctx, port, ch)
+	return waitForNilResponseOnSuccess(ctx, ch)
+}
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index b0404c4..c143f42 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -517,6 +517,92 @@
 	err = waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
 	assert.Nil(t, err)
 }
+func (nb *NBTest) testDisableAndEnablePort(t *testing.T, nbi *APIHandler) {
+	//Get an OLT device
+	var cp *voltha.Port
+	oltDevice, err := nb.getADevice(true, nbi)
+	assert.Nil(t, err)
+	assert.NotNil(t, oltDevice)
+
+	for _, cp = range oltDevice.Ports {
+		if cp.Type == voltha.Port_PON_OLT {
+			break
+		}
+
+	}
+	assert.NotNil(t, cp)
+	cp.DeviceId = oltDevice.Id
+
+	// Disable the NW Port of oltDevice
+	_, err = nbi.DisablePort(getContext(), cp)
+	assert.Nil(t, err)
+	// Wait for the olt device Port  to be disabled
+	var vdFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+		for _, port := range device.Ports {
+			if port.PortNo == cp.PortNo {
+				return port.AdminState == voltha.AdminState_DISABLED
+			}
+		}
+		return false
+	}
+	err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+	assert.Nil(t, err)
+	// Wait for the logical device to satisfy the expected condition
+	var vlFunction = func(ld *voltha.LogicalDevice) bool {
+		for _, lp := range ld.Ports {
+			if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
+				return false
+			}
+		}
+		return true
+	}
+	err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+	assert.Nil(t, err)
+
+	// Enable the NW Port of oltDevice
+	_, err = nbi.EnablePort(getContext(), cp)
+	assert.Nil(t, err)
+
+	// Wait for the olt device Port to be enabled
+	vdFunction = func(device *voltha.Device) bool {
+		for _, port := range device.Ports {
+			if port.PortNo == cp.PortNo {
+				return port.AdminState == voltha.AdminState_ENABLED
+			}
+		}
+		return false
+	}
+	err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+	assert.Nil(t, err)
+	// Wait for the logical device to satisfy the expected condition
+	vlFunction = func(ld *voltha.LogicalDevice) bool {
+		for _, lp := range ld.Ports {
+			if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+				lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
+				return false
+			}
+		}
+		return true
+	}
+	err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+	assert.Nil(t, err)
+
+	// Disable a non-PON port
+	for _, cp = range oltDevice.Ports {
+		if cp.Type != voltha.Port_PON_OLT {
+			break
+		}
+
+	}
+	assert.NotNil(t, cp)
+	cp.DeviceId = oltDevice.Id
+
+	// Disable the NW Port of oltDevice
+	_, err = nbi.DisablePort(getContext(), cp)
+	assert.NotNil(t, err)
+
+}
 
 func TestSuite1(t *testing.T) {
 	nb := newNBTest()
@@ -549,6 +635,8 @@
 
 		// 5. Test disable and ReEnable a root device
 		nb.testDisableAndReEnableRootDevice(t, nbi)
+		// 6. Test disable and Enable pon port of OLT device
+		nb.testDisableAndEnablePort(t, nbi)
 
 		// 6. Test disable and delete all devices
 		nb.testDisableAndDeleteAllDevice(t, nbi)
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 80dcfd7..7274e9f 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -441,7 +441,7 @@
 }
 
 // updatePortsState updates the ports state related to the device
-func (agent *LogicalDeviceAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.AdminState_Types) error {
+func (agent *LogicalDeviceAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
 	log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
@@ -449,16 +449,14 @@
 	cloned := agent.getLogicalDeviceWithoutLock()
 	for _, lport := range cloned.Ports {
 		if lport.DeviceId == device.Id {
-			switch state {
-			case voltha.AdminState_ENABLED:
+			if state == voltha.OperStatus_ACTIVE {
 				lport.OfpPort.Config = lport.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
 				lport.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
-			case voltha.AdminState_DISABLED:
+			} else {
 				lport.OfpPort.Config = lport.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
 				lport.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
-			default:
-				log.Warnw("unsupported-state-change", log.Fields{"deviceId": device.Id, "state": state})
 			}
+
 		}
 	}
 	// Updating the logical device will trigger the poprt change events to be populated to the controller
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 4026ba1..853404e 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -521,7 +521,7 @@
 	return nil
 }
 
-func (ldMgr *LogicalDeviceManager) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.AdminState_Types) error {
+func (ldMgr *LogicalDeviceManager) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
 	log.Debugw("updatePortsState", log.Fields{"deviceId": device.Id, "state": state, "current-data": device})
 
 	var ldID *string
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index 876ebea..28d06da 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -239,3 +239,13 @@
 func (ta *Adapter) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) { // nolint
 	return nil, nil
 }
+
+// Enable_port -
+func (ta *Adapter) Enable_port(deviceId string, port *voltha.Port) error { //nolint
+	return nil
+}
+
+// Disable_port -
+func (ta *Adapter) Disable_port(deviceId string, port *voltha.Port) error { //nolint
+	return nil
+}
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index e7fed22..b9cef0c 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -229,3 +229,30 @@
 	}()
 	return nil
 }
+
+// Enable_port -
+func (oltA *OLTAdapter) Enable_port(deviceId string, Port *voltha.Port) error { //nolint
+	go func() {
+
+		if Port.Type == voltha.Port_PON_OLT {
+			if err := oltA.coreProxy.PortStateUpdate(context.TODO(), deviceId, voltha.Port_PON_OLT, Port.PortNo, voltha.OperStatus_ACTIVE); err != nil {
+				log.Fatalf("updating-ports-failed", log.Fields{"device-id": deviceId, "error": err})
+			}
+		}
+
+	}()
+	return nil
+}
+
+// Disable_port -
+func (oltA *OLTAdapter) Disable_port(deviceId string, Port *voltha.Port) error { //nolint
+	go func() {
+
+		if Port.Type == voltha.Port_PON_OLT {
+			if err := oltA.coreProxy.PortStateUpdate(context.TODO(), deviceId, voltha.Port_PON_OLT, Port.PortNo, voltha.OperStatus_DISCOVERED); err != nil {
+				log.Fatalf("updating-ports-failed", log.Fields{"device-id": deviceId, "error": err})
+			}
+		}
+	}()
+	return nil
+}