VOL-3735 Retrieve port counters using the GetExtValue(SingleGetValue) rpc from the openolt adapter

Change-Id: I195f7c9fdc4b677c333ae9e4bb3547bb95312fdf
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index fa8dd52..ef1b179 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -45,6 +45,7 @@
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
 	rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
 	"github.com/opencord/voltha-protos/v4/go/common"
+	"github.com/opencord/voltha-protos/v4/go/extension"
 	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
 	of "github.com/opencord/voltha-protos/v4/go/openflow_13"
 	oop "github.com/opencord/voltha-protos/v4/go/openolt"
@@ -62,6 +63,7 @@
 	McastFlowOrGroupAdd    = "McastFlowOrGroupAdd"
 	McastFlowOrGroupModify = "McastFlowOrGroupModify"
 	McastFlowOrGroupRemove = "McastFlowOrGroupRemove"
+	oltPortInfoTimeout     = 3
 )
 
 //DeviceHandler will interact with the OLT device.
@@ -2479,3 +2481,79 @@
 		}
 	}
 }
+
+func (dh *DeviceHandler) getOltPortCounters(ctx context.Context, oltPortInfo *extension.GetOltPortCounters) *extension.SingleGetValueResponse {
+
+	singleValResp := extension.SingleGetValueResponse{
+		Response: &extension.GetValueResponse{
+			Response: &extension.GetValueResponse_PortCoutners{
+				PortCoutners: &extension.GetOltPortCountersResponse{},
+			},
+		},
+	}
+
+	errResp := func(status extension.GetValueResponse_Status,
+		reason extension.GetValueResponse_ErrorReason) *extension.SingleGetValueResponse {
+		return &extension.SingleGetValueResponse{
+			Response: &extension.GetValueResponse{
+				Status:    status,
+				ErrReason: reason,
+			},
+		}
+	}
+
+	if oltPortInfo.PortType != extension.GetOltPortCounters_Port_ETHERNET_NNI &&
+		oltPortInfo.PortType != extension.GetOltPortCounters_Port_PON_OLT {
+		//send error response
+		logger.Debugw(ctx, "getOltPortCounters invalid portType", log.Fields{"oltPortInfo": oltPortInfo.PortType})
+		return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INVALID_PORT_TYPE)
+	}
+	statIndChn := make(chan bool, 1)
+	dh.portStats.RegisterForStatIndication(ctx, portStatsType, statIndChn, oltPortInfo.PortNo, oltPortInfo.PortType)
+	defer dh.portStats.DeRegisterFromStatIndication(ctx, portStatsType, statIndChn)
+	//request openOlt agent to send the the port statistics indication
+
+	go func() {
+		_, err := dh.Client.CollectStatistics(ctx, new(oop.Empty))
+		if err != nil {
+			logger.Errorw(ctx, "getOltPortCounters CollectStatistics failed ", log.Fields{"err": err})
+		}
+	}()
+	select {
+	case <-statIndChn:
+		//indication received for ports stats
+		logger.Debugw(ctx, "getOltPortCounters recvd statIndChn", log.Fields{"oltPortInfo": oltPortInfo})
+	case <-time.After(oltPortInfoTimeout * time.Second):
+		logger.Debugw(ctx, "getOltPortCounters timeout happened", log.Fields{"oltPortInfo": oltPortInfo})
+		return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_TIMEOUT)
+	case <-ctx.Done():
+		logger.Debugw(ctx, "getOltPortCounters ctx Done ", log.Fields{"oltPortInfo": oltPortInfo})
+		return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_TIMEOUT)
+	}
+	if oltPortInfo.PortType == extension.GetOltPortCounters_Port_ETHERNET_NNI {
+		//get nni stats
+		intfID := PortNoToIntfID(oltPortInfo.PortNo, voltha.Port_ETHERNET_NNI)
+		logger.Debugw(ctx, "getOltPortCounters intfID  ", log.Fields{"intfID": intfID})
+		cmnni := dh.portStats.collectNNIMetrics(intfID)
+		if cmnni == nil {
+			//TODO define the error reason
+			return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INTERNAL_ERROR)
+		}
+		dh.portStats.updateGetOltPortCountersResponse(ctx, &singleValResp, cmnni)
+		return &singleValResp
+
+	} else if oltPortInfo.PortType == extension.GetOltPortCounters_Port_PON_OLT {
+		// get pon stats
+		intfID := PortNoToIntfID(oltPortInfo.PortNo, voltha.Port_PON_OLT)
+		if val, ok := dh.activePorts.Load(intfID); ok && val == true {
+			cmpon := dh.portStats.collectPONMetrics(intfID)
+			if cmpon == nil {
+				//TODO define the error reason
+				return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INTERNAL_ERROR)
+			}
+			dh.portStats.updateGetOltPortCountersResponse(ctx, &singleValResp, cmpon)
+			return &singleValResp
+		}
+	}
+	return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INTERNAL_ERROR)
+}