VOL-2463 : Adding enable/disable PON port.
2. Raising and clearing alarm on disable and enable PON port.
3. Avoiding statistics collection for disabled pon.

Change-Id: I794d002a9fbf906cfb6aacbee0c6fea758617a61
diff --git a/.golangci.yml b/.golangci.yml
index 3c1a75a..6da1be5 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -77,6 +77,8 @@
     - "don't use underscores in Go names; method Cancel_image_download"
     - "don't use underscores in Go names; method Activate_image_update"
     - "don't use underscores in Go names; method Revert_image_update"
+    - "don't use underscores in Go names; method Disable_port"
+    - "don't use underscores in Go names; method Enable_port"
   exclude-use-default: false
 
 # golangci.com configuration
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 3def7bc..2fe1175 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -53,10 +53,6 @@
 	MaxTimeOutInMs = 500
 )
 
-func init() {
-	_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
 //DeviceHandler will interact with the OLT device.
 type DeviceHandler struct {
 	deviceID      string
@@ -82,6 +78,7 @@
 	metrics            *pmmetrics.PmMetrics
 	stopCollector      chan bool
 	stopHeartbeatCheck chan bool
+	activePorts        map[uint32]bool
 }
 
 //OnuDevice represents ONU related info
@@ -136,6 +133,7 @@
 	dh.stopCollector = make(chan bool, 2)
 	dh.stopHeartbeatCheck = make(chan bool, 2)
 	dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
+	dh.activePorts = make(map[uint32]bool)
 	//TODO initialize the support classes.
 	return &dh
 }
@@ -231,8 +229,11 @@
 	var operStatus common.OperStatus_Types
 	if state == "up" {
 		operStatus = voltha.OperStatus_ACTIVE
+		//populating the intfStatus map
+		dh.activePorts[intfID] = true
 	} else {
 		operStatus = voltha.OperStatus_DISCOVERED
+		dh.activePorts[intfID] = false
 	}
 	portNum := IntfIDToPortNo(intfID, portType)
 	label := GetportLabel(portNum, portType)
@@ -384,6 +385,7 @@
 			// TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
 			// Handle pon port update
 			go dh.addPort(intfOperInd.GetIntfId(), voltha.Port_PON_OLT, intfOperInd.GetOperState())
+			go dh.eventMgr.oltIntfOperIndication(indication.GetIntfOperInd(), dh.deviceID, raisedTs)
 		}
 		log.Infow("Received interface oper indication ", log.Fields{"InterfaceOperInd": intfOperInd})
 	case *oop.Indication_OnuDiscInd:
@@ -545,11 +547,9 @@
 		log.Errorw("Failed to fetch device device", log.Fields{"err": err})
 		return err
 	}
-	cloned := proto.Clone(device).(*voltha.Device)
-	// Update the all ports (if available) on that device to ACTIVE.
-	// The ports do not normally exist, unless the device is coming back from a reboot
-	if err := dh.coreProxy.PortsStateUpdate(context.TODO(), cloned.Id, voltha.OperStatus_ACTIVE); err != nil {
-		log.Errorw("updating-ports-failed", log.Fields{"deviceID": device.Id, "error": err})
+	dh.populateActivePorts(device)
+	if err := dh.updatePortAdminState(device); err != nil {
+		log.Errorw("Error-on-updating-port-status", log.Fields{"device": device})
 		return err
 	}
 
@@ -642,7 +642,7 @@
 			log.Debugf("Publish-NNI-Metrics")
 			// PON Stats
 			NumPonPORTS := dh.resourceMgr.DevInfo.GetPonPorts()
-			for i := uint32(0); i < NumPonPORTS; i++ {
+			for i := uint32(0); i < NumPonPORTS && (dh.activePorts[i]); i++ {
 				cmpon := dh.portStats.collectPONMetrics(i)
 				log.Debugf("Collect-PON-Metrics %v", cmpon)
 
@@ -1224,11 +1224,11 @@
 
 	cloned := proto.Clone(device).(*voltha.Device)
 	// Update the all ports state on that device to enable
-	if err := dh.coreProxy.PortsStateUpdate(context.TODO(), cloned.Id, voltha.OperStatus_ACTIVE); err != nil {
-		log.Errorw("updating-ports-failed", log.Fields{"deviceID": device.Id, "error": err})
+
+	if err := dh.updatePortAdminState(device); err != nil {
+		log.Errorw("Error-on-updating-port-status-after-reenabling-olt", log.Fields{"device": device})
 		return err
 	}
-
 	//Update the device oper status as ACTIVE
 	cloned.OperStatus = voltha.OperStatus_ACTIVE
 	dh.device = cloned
@@ -1237,6 +1237,7 @@
 		log.Errorw("error-updating-device-state", log.Fields{"deviceID": device.Id, "error": err})
 		return err
 	}
+
 	log.Debugw("ReEnableDevice-end", log.Fields{"deviceID": device.Id})
 
 	return nil
@@ -1571,3 +1572,93 @@
 		return
 	}
 }
+
+// EnablePort to enable Pon interface
+func (dh *DeviceHandler) EnablePort(port *voltha.Port) error {
+	log.Debugw("enable-port", log.Fields{"Device": dh.device, "port": port})
+	return dh.invokeDisableorEnablePort(port, true)
+}
+
+// DisablePort to disable pon interface
+func (dh *DeviceHandler) DisablePort(port *voltha.Port) error {
+	log.Debugw("disable-port", log.Fields{"Device": dh.device, "port": port})
+	return dh.invokeDisableorEnablePort(port, false)
+}
+
+func (dh *DeviceHandler) invokeDisableorEnablePort(port *voltha.Port, enablePort bool) error {
+	log.Infow("invokeDisableorEnablePort", log.Fields{"port": port, "Enable": enablePort})
+	if port.GetType() == voltha.Port_ETHERNET_NNI {
+		// Bug is opened for VOL-2505 to support NNI disable feature.
+		log.Infow("voltha-supports-single-nni-hence-disable-of-nni-not-allowed",
+			log.Fields{"Device": dh.device, "port": port})
+		return fmt.Errorf("received-disable-enable-nni-port-request, received-port %s", port.GetType())
+	}
+	// fetch interfaceid from PortNo
+	ponID := PortNoToIntfID(port.GetPortNo(), voltha.Port_PON_OLT)
+	ponIntf := &oop.Interface{IntfId: ponID}
+	var operStatus voltha.OperStatus_Types
+	if enablePort {
+		operStatus = voltha.OperStatus_ACTIVE
+		out, err := dh.Client.EnablePonIf(context.Background(), ponIntf)
+
+		if err != nil {
+			log.Errorw("error-while-enable-Pon-port", log.Fields{"DeviceID": dh.device, "Port": port, "error": err})
+			return err
+		}
+		// updating interface local cache for collecting stats
+		dh.activePorts[ponID] = true
+		log.Infow("enabled-pon-port", log.Fields{"out": out, "DeviceID": dh.device, "Port": port})
+	} else {
+		operStatus = voltha.OperStatus_UNKNOWN
+		out, err := dh.Client.DisablePonIf(context.Background(), ponIntf)
+		if err != nil {
+			log.Errorw("error-while-disabling-interface", log.Fields{"DeviceID": dh.device, "Port": port})
+			return err
+		}
+		// updating interface local cache for collecting stats
+		dh.activePorts[ponID] = false
+		log.Infow("disabled-pon-port", log.Fields{"out": out, "DeviceID": dh.device, "Port": port})
+	}
+	if errs := dh.coreProxy.PortStateUpdate(context.Background(), dh.deviceID, voltha.Port_PON_OLT, port.PortNo, operStatus); errs != nil {
+		log.Errorw("portstate-update-failed", log.Fields{"Device": dh.deviceID, "port": port.PortNo, "error": errs})
+		return errs
+	}
+	return nil
+}
+
+//updatePortAdminState update the ports on reboot and re-enable device.
+func (dh *DeviceHandler) updatePortAdminState(device *voltha.Device) error {
+	cloned := proto.Clone(device).(*voltha.Device)
+	// Disable the port and update the oper_port_status to core
+	// if the Admin state of the port is disabled on reboot and re-enable device.
+	for _, port := range cloned.Ports {
+		if port.AdminState == common.AdminState_DISABLED {
+			if err := dh.invokeDisableorEnablePort(port, false); err != nil {
+				log.Errorw("error-occurred-while-disabling-port", log.Fields{"DeviceId": dh.deviceID, "port": port, "error": err})
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+//populateActivePorts to populate activePorts map
+func (dh *DeviceHandler) populateActivePorts(device *voltha.Device) {
+	log.Info("populateActiveports", log.Fields{"Device": device})
+	for _, port := range device.Ports {
+		if port.Type == voltha.Port_ETHERNET_NNI {
+			if port.OperStatus == voltha.OperStatus_ACTIVE {
+				dh.activePorts[PortNoToIntfID(port.PortNo, voltha.Port_ETHERNET_NNI)] = true
+			} else {
+				dh.activePorts[PortNoToIntfID(port.PortNo, voltha.Port_ETHERNET_NNI)] = false
+			}
+		}
+		if port.Type == voltha.Port_PON_OLT {
+			if port.OperStatus == voltha.OperStatus_ACTIVE {
+				dh.activePorts[PortNoToIntfID(port.PortNo, voltha.Port_PON_OLT)] = true
+			} else {
+				dh.activePorts[PortNoToIntfID(port.PortNo, voltha.Port_PON_OLT)] = false
+			}
+		}
+	}
+}
diff --git a/adaptercore/device_handler_test.go b/adaptercore/device_handler_test.go
index 82cd862..7fbfea8 100644
--- a/adaptercore/device_handler_test.go
+++ b/adaptercore/device_handler_test.go
@@ -186,7 +186,7 @@
 	dh.resourceMgr.ResourceMgrs[2] = ponmgr
 	dh.flowMgr = NewFlowManager(dh, dh.resourceMgr)
 	dh.Client = &mocks.MockOpenoltClient{}
-	dh.eventMgr = &OpenOltEventMgr{eventProxy: &mocks.MockEventProxy{}}
+	dh.eventMgr = &OpenOltEventMgr{eventProxy: &mocks.MockEventProxy{}, handler: dh}
 	dh.transitionMap = &TransitionMap{}
 	dh.portStats = &OpenOltStatisticsMgr{}
 
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
index 7a53524..cfa3299 100644
--- a/adaptercore/openolt.go
+++ b/adaptercore/openolt.go
@@ -345,3 +345,39 @@
 func (oo *OpenOLT) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
 	return nil, errors.New("unImplemented")
 }
+
+// Enable_port to Enable PON/NNI interface
+func (oo *OpenOLT) Enable_port(deviceID string, port *voltha.Port) error {
+	log.Infow("Enable_port", log.Fields{"deviceId": deviceID, "port": port})
+	return oo.enableDisablePort(deviceID, port, true)
+}
+
+// Disable_port to Disable pon/nni interface
+func (oo *OpenOLT) Disable_port(deviceID string, port *voltha.Port) error {
+	log.Infow("Disable_port", log.Fields{"deviceId": deviceID, "port": port})
+	return oo.enableDisablePort(deviceID, port, false)
+}
+
+// enableDisablePort to Disable pon or Enable PON interface
+func (oo *OpenOLT) enableDisablePort(deviceID string, port *voltha.Port, enablePort bool) error {
+	log.Infow("enableDisablePort", log.Fields{"deviceId": deviceID, "port": port})
+	if port == nil {
+		log.Errorw("port-cannot-be-nil", log.Fields{"Device": deviceID, "port": port})
+		return errors.New("sent-port-cannot-be-nil")
+	}
+	if handler := oo.getDeviceHandler(deviceID); handler != nil {
+		log.Debugw("Enable_Disable_Port", log.Fields{"deviceId": deviceID, "port": port})
+		if enablePort {
+			if err := handler.EnablePort(port); err != nil {
+				log.Errorw("error-occurred-during-enable-port", log.Fields{"deviceID": deviceID, "port": port, "error": err})
+				return err
+			}
+		} else {
+			if err := handler.DisablePort(port); err != nil {
+				log.Errorw("error-occurred-during-disable-port", log.Fields{"Device": deviceID, "port": port})
+				return err
+			}
+		}
+	}
+	return nil
+}
diff --git a/adaptercore/openolt_eventmgr.go b/adaptercore/openolt_eventmgr.go
index 75086dc..835abe6 100644
--- a/adaptercore/openolt_eventmgr.go
+++ b/adaptercore/openolt_eventmgr.go
@@ -18,11 +18,13 @@
 package adaptercore
 
 import (
+	ctx "context"
 	"fmt"
 	"strconv"
 
 	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/common"
 	oop "github.com/opencord/voltha-protos/v3/go/openolt"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
@@ -47,6 +49,7 @@
 	onuLossOfKeySyncEvent   = "ONU_LOSS_OF_KEY_SYNC"
 	onuLossOfFrameEvent     = "ONU_LOSS_OF_FRAME"
 	onuLossOfPloamEvent     = "ONU_LOSS_OF_PLOAM"
+	ponIntfDownIndiction    = "OLT_PON_INTERFACE_DOWN"
 )
 
 const (
@@ -446,3 +449,40 @@
 	log.Infow("ONU loss of key sync event sent to KAFKA", log.Fields{"onu-id": onuLOKI.OnuId, "intf-id": onuLOKI.IntfId})
 	return nil
 }
+
+// oltIntfOperIndication handles Up and Down state of an OLT PON ports
+func (em *OpenOltEventMgr) oltIntfOperIndication(ifindication *oop.IntfOperIndication, deviceID string, raisedTs int64) {
+	var de voltha.DeviceEvent
+	context := make(map[string]string)
+	portID := IntfIDToPortNo(ifindication.IntfId, voltha.Port_PON_OLT)
+	device, err := em.handler.coreProxy.GetDevice(ctx.Background(), deviceID, deviceID)
+	if err != nil {
+		log.Errorw("Error while fetching Device object", log.Fields{"DeviceId": deviceID})
+	}
+	for _, port := range device.Ports {
+		if port.PortNo == portID {
+			// Events are suppressed if the Port Adminstate is not enabled.
+			if port.AdminState != common.AdminState_ENABLED {
+				log.Infow("Port disable/enable event not generated because, The port is not enabled by operator", log.Fields{"deviceId": deviceID, "port": port})
+				return
+			}
+			break
+		}
+	}
+	/* Populating event context */
+	context["oper-state"] = ifindication.GetOperState()
+	/* Populating device event body */
+	de.Context = context
+	de.ResourceId = deviceID
+
+	if ifindication.GetOperState() == operationStateDown {
+		de.DeviceEventName = fmt.Sprintf("%s_%s", ponIntfDownIndiction, "RAISE_EVENT")
+	} else if ifindication.OperState == operationStateUp {
+		de.DeviceEventName = fmt.Sprintf("%s_%s", ponIntfDownIndiction, "CLEAR_EVENT")
+	}
+	/* Send event to KAFKA */
+	if err := em.eventProxy.SendDeviceEvent(&de, communication, olt, raisedTs); err != nil {
+		log.Errorw("failed-to-send-olt-intf-oper-status-event", log.Fields{"err": err})
+	}
+	log.Info("sent-olt-intf-oper-status-event-to-kafka")
+}
diff --git a/adaptercore/openolt_test.go b/adaptercore/openolt_test.go
index aefd862..7bdfdcd 100644
--- a/adaptercore/openolt_test.go
+++ b/adaptercore/openolt_test.go
@@ -25,10 +25,6 @@
 import (
 	"context"
 	"errors"
-	"reflect"
-	"sync"
-	"testing"
-
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -37,6 +33,9 @@
 	"github.com/opencord/voltha-protos/v3/go/openflow_13"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"reflect"
+	"sync"
+	"testing"
 )
 
 // mocks the OpenOLT struct.
@@ -205,7 +204,7 @@
 		device *voltha.Device
 	}
 	var device = mockDevice()
-	device.Id = "openolt"
+	device.Id = "olt"
 	tests := []struct {
 		name    string
 		fields  *fields
@@ -900,3 +899,53 @@
 		})
 	}
 }
+
+func TestOpenOLT_Enable_port(t *testing.T) {
+	type args struct {
+		deviceID string
+		port     *voltha.Port
+	}
+	tests := []struct {
+		name    string
+		fields  *fields
+		args    args
+		wantErr bool
+	}{
+		// TODO: Add test cases.
+		{"Enable_port-1", mockOlt(), args{deviceID: "olt", port: &voltha.Port{Type: voltha.Port_PON_OLT, PortNo: 1}}, false},
+		{"Enable_port-2", mockOlt(), args{deviceID: "olt", port: &voltha.Port{Type: voltha.Port_ETHERNET_NNI, PortNo: 1}}, true},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			oo := testOltObject(tt.fields)
+			if err := oo.Enable_port(tt.args.deviceID, tt.args.port); (err != nil) != tt.wantErr {
+				t.Errorf("OpenOLT.Enable_port() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
+
+func TestOpenOLT_Disable_port(t *testing.T) {
+	type args struct {
+		deviceID string
+		port     *voltha.Port
+	}
+	tests := []struct {
+		name    string
+		fields  *fields
+		args    args
+		wantErr bool
+	}{
+		// TODO: Add test cases.
+		{"Disable_port-1", mockOlt(), args{deviceID: "olt", port: &voltha.Port{Type: voltha.Port_PON_OLT, PortNo: 1}}, false},
+		{"Disable_port-2", mockOlt(), args{deviceID: "olt", port: &voltha.Port{Type: voltha.Port_ETHERNET_NNI, PortNo: 1}}, true},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			oo := testOltObject(tt.fields)
+			if err := oo.Disable_port(tt.args.deviceID, tt.args.port); (err != nil) != tt.wantErr {
+				t.Errorf("OpenOLT.Disable_port() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}