[VOL-5291] - On demand onu stats from olt

Change-Id: I65e7b5a8bd93ec862726a7302dcc18be16855b4f
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index b12dca1..2c18980 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -57,6 +57,7 @@
 	"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
 	of "github.com/opencord/voltha-protos/v5/go/openflow_13"
 	oop "github.com/opencord/voltha-protos/v5/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v5/go/tech_profile"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc"
 	codes "google.golang.org/grpc/codes"
@@ -3679,6 +3680,133 @@
 	return &singleValResp
 }
 
+func (dh *DeviceHandler) CheckOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32) (*rsrcMgr.OnuGemInfo, error) {
+	onuGemInfo, err := dh.resourceMgr[intfID].GetOnuGemInfo(ctx, onuID)
+	if err != nil {
+		logger.Errorw(ctx, "Unable to find onuGemInfo", log.Fields{"onuID": onuID})
+		return nil, err
+	}
+	if onuGemInfo != nil {
+		if len(onuGemInfo.UniPorts) == 0 {
+			logger.Errorw(ctx, "No uniports present for the ONU", log.Fields{"onuID": onuID})
+			return nil, err
+		}
+		logger.Debugw(ctx, "Received onuGemInfo", log.Fields{"onuID": onuID, "onuGemInfo": onuGemInfo})
+		return onuGemInfo, nil
+	}
+	logger.Errorw(ctx, "Received nil onuGemInfo", log.Fields{"onuID": onuID})
+	return nil, fmt.Errorf("onuGemInfo received as null, onuID : %d", onuID)
+}
+
+func (dh *DeviceHandler) getAllocGemStats(ctx context.Context, intfID uint32, onuID uint32, onuGemInfo *rsrcMgr.OnuGemInfo, singleValResp *extension.SingleGetValueResponse) error {
+	var err error
+	var allocStats *oop.OnuAllocIdStatistics
+	var onuGemStats *oop.GemPortStatistics
+	for _, uni := range onuGemInfo.UniPorts {
+		uniID := plt.UniIDFromPortNum(uni)
+		uniPath := getUniPortPath(dh.device.Id, intfID, int32(onuID), int32(uniID))
+		tpIDs := dh.resourceMgr[intfID].GetTechProfileIDForOnu(ctx, onuID, uniID)
+		if len(tpIDs) == 0 {
+			logger.Errorw(ctx, "No TPIDs present for the Uni", log.Fields{"onuID": onuID, "uniPort": uni})
+			continue
+		}
+		for _, tpId := range tpIDs {
+			tpPath := dh.flowMgr[intfID].getTPpath(ctx, uniPath, uint32(tpId))
+			techProfileInstance, _ := dh.flowMgr[intfID].techprofile.GetTPInstance(ctx, tpPath)
+			onuStatsFromOlt := extension.OnuAllocGemStatsFromOltResponse{}
+			if techProfileInstance != nil {
+				switch tpInst := techProfileInstance.(type) {
+				case *tp_pb.TechProfileInstance:
+					allocId := tpInst.UsScheduler.AllocId
+					onuAllocPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, AllocId: allocId}
+					allocStats, err = dh.Client.GetAllocIdStatistics(ctx, &onuAllocPkt)
+					if err != nil {
+						logger.Errorw(ctx, "Error received from openolt for GetAllocIdStatistics", log.Fields{"onuID": onuID, "AllodId": allocId, "Error": err})
+						return err
+					}
+					allocIdInfo := extension.OnuAllocIdInfoFromOlt{}
+					allocIdInfo.AllocId = allocStats.AllocId
+					allocIdInfo.RxBytes = allocStats.RxBytes
+
+					onuStatsFromOlt.AllocIdInfo = &allocIdInfo
+
+					gemPorts := tpInst.UpstreamGemPortAttributeList
+					for _, gem := range gemPorts {
+						onuGemPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, GemportId: gem.GemportId}
+						onuGemStats, err = dh.Client.GetGemPortStatistics(ctx, &onuGemPkt)
+						if err != nil {
+							logger.Errorw(ctx, "Error received from openolt for GetGemPortStatistics", log.Fields{"onuID": onuID, "GemPortId": gem.GemportId, "Error": err})
+							return err
+						}
+						gemStatsInfo := extension.OnuGemPortInfoFromOlt{}
+						gemStatsInfo.GemId = onuGemStats.GemportId
+						gemStatsInfo.RxBytes = onuGemStats.RxBytes
+						gemStatsInfo.RxPackets = onuGemStats.RxPackets
+						gemStatsInfo.TxBytes = onuGemStats.TxBytes
+						gemStatsInfo.TxPackets = onuGemStats.TxPackets
+
+						onuStatsFromOlt.GemPortInfo = append(onuStatsFromOlt.GemPortInfo, &gemStatsInfo)
+					}
+					singleValResp.Response.GetOnuStatsFromOltResponse().AllocGemStatsInfo = append(singleValResp.Response.GetOnuStatsFromOltResponse().AllocGemStatsInfo, &onuStatsFromOlt)
+
+				default:
+					logger.Errorw(ctx, "Invalid Techprofile type received", log.Fields{"onuID": onuID, "TpId": tpId})
+					return err
+				}
+			} else {
+				logger.Errorw(ctx, "No TpInstance found for the TpID", log.Fields{"onuID": onuID, "TpId": tpId})
+				continue
+			}
+		}
+	}
+	return err
+}
+
+//nolint:unparam
+func (dh *DeviceHandler) getOnuStatsFromOlt(ctx context.Context, onuInfo *extension.GetOnuStatsFromOltRequest, onuDevice *voltha.Device) *extension.SingleGetValueResponse {
+	singleValResp := extension.SingleGetValueResponse{
+		Response: &extension.GetValueResponse{
+			Status: extension.GetValueResponse_OK,
+			Response: &extension.GetValueResponse_OnuStatsFromOltResponse{
+				OnuStatsFromOltResponse: &extension.GetOnuStatsFromOltResponse{},
+			},
+		},
+	}
+	errResp := func(status extension.GetValueResponse_Status,
+		reason extension.GetValueResponse_ErrorReason) *extension.SingleGetValueResponse {
+		return &extension.SingleGetValueResponse{
+			Response: &extension.GetValueResponse{
+				Status:    status,
+				ErrReason: reason,
+			},
+		}
+	}
+
+	var intfID, onuID uint32
+	if onuDevice != nil {
+		onuID = onuDevice.ProxyAddress.OnuId
+		intfID = plt.PortNoToIntfID(onuDevice.ParentPortNo, voltha.Port_PON_OLT)
+	}
+
+	onuKey := dh.formOnuKey(intfID, onuID)
+	if _, ok := dh.onus.Load(onuKey); !ok {
+		logger.Errorw(ctx, "getOnuStatsFromOlt request received for invalid device", log.Fields{"deviceId": onuDevice.Id, "intfID": intfID, "onuID": onuID})
+		return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INVALID_DEVICE)
+	}
+	logger.Debugw(ctx, "getOnuStatsFromOlt request received", log.Fields{"intfID": intfID, "onuID": onuID})
+
+	onuGemInfo, err := dh.CheckOnuGemInfo(ctx, intfID, onuID)
+	if err == nil {
+		err = dh.getAllocGemStats(ctx, intfID, onuID, onuGemInfo, &singleValResp)
+		if err != nil {
+			return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INTERNAL_ERROR)
+		}
+	} else {
+		return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INVALID_DEVICE)
+	}
+	return &singleValResp
+}
+
 func (dh *DeviceHandler) getOnuInfo(ctx context.Context, intfID uint32, onuID *uint32) (*oop.OnuInfo, error) {
 	Onu := oop.Onu{IntfId: intfID, OnuId: *onuID}
 	OnuInfo, err := dh.Client.GetOnuInfo(ctx, &Onu)
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 834c560..bca0287 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -37,6 +37,7 @@
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
 	"github.com/opencord/voltha-openolt-adapter/pkg/mocks"
+	cmn "github.com/opencord/voltha-protos/v5/go/common"
 	ia "github.com/opencord/voltha-protos/v5/go/inter_adapter"
 	of "github.com/opencord/voltha-protos/v5/go/openflow_13"
 	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
@@ -856,7 +857,7 @@
 		{"handleIndication-11", dh1, args{indication: &oop.Indication{Data: &oop.Indication_OnuInd{OnuInd: &oop.OnuIndication{IntfId: 1, OnuId: 1, OperState: "down", AdminState: "down"}}}}},
 		{"handleIndication-12", dh1, args{indication: &oop.Indication{Data: &oop.Indication_OmciInd{OmciInd: &oop.OmciIndication{IntfId: 1, OnuId: 1, Pkt: []byte("onu123-random value")}}}}},
 		{"handleIndication-13", dh1, args{indication: &oop.Indication{Data: &oop.Indication_PktInd{PktInd: &oop.PacketIndication{IntfType: "nni", IntfId: 1, GemportId: 1, FlowId: 1234, PortNo: 1}}}}},
-		{"handleIndication-14", dh1, args{indication: &oop.Indication{Data: &oop.Indication_PortStats{PortStats: &oop.PortStatistics{IntfId: 1, RxBytes: 100, RxPackets: 100, RxUcastPackets: 100, RxMcastPackets: 100, RxBcastPackets: 100, RxErrorPackets: 100, TxBytes: 100, TxPackets: 100, TxUcastPackets: 100, TxMcastPackets: 100, TxBcastPackets: 100, TxErrorPackets: 100, RxCrcErrors: 100, BipErrors: 100, Timestamp: 1000}}}}},
+		{"handleIndication-14", dh1, args{indication: &oop.Indication{Data: &oop.Indication_PortStats{PortStats: &cmn.PortStatistics{IntfId: 1, RxBytes: 100, RxPackets: 100, RxUcastPackets: 100, RxMcastPackets: 100, RxBcastPackets: 100, RxErrorPackets: 100, TxBytes: 100, TxPackets: 100, TxUcastPackets: 100, TxMcastPackets: 100, TxBcastPackets: 100, TxErrorPackets: 100, RxCrcErrors: 100, BipErrors: 100, Timestamp: 1000}}}}},
 		{"handleIndication-15", dh1, args{indication: &oop.Indication{Data: &oop.Indication_FlowStats{FlowStats: &oop.FlowStatistics{RxBytes: 100, RxPackets: 100, TxBytes: 100, TxPackets: 100, Timestamp: 1000}}}}},
 		{"handleIndication-16", dh1, args{indication: &oop.Indication{Data: &oop.Indication_AlarmInd{AlarmInd: &oop.AlarmIndication{}}}}},
 		{"handleIndication-17", dh1, args{indication: &oop.Indication{Data: &oop.Indication_PktInd{PktInd: &oop.PacketIndication{IntfType: "nni", FlowId: 1234, PortNo: 1}}}}},
@@ -876,7 +877,7 @@
 		{"handleIndication-29", dh2, args{indication: &oop.Indication{Data: &oop.Indication_OnuInd{OnuInd: &oop.OnuIndication{IntfId: 1, OnuId: 1, OperState: "down", AdminState: "down"}}}}},
 		{"handleIndication-30", dh2, args{indication: &oop.Indication{Data: &oop.Indication_OmciInd{OmciInd: &oop.OmciIndication{IntfId: 1, OnuId: 1, Pkt: []byte("onu123-random value")}}}}},
 		{"handleIndication-31", dh2, args{indication: &oop.Indication{Data: &oop.Indication_PktInd{PktInd: &oop.PacketIndication{IntfType: "nni", IntfId: 1, GemportId: 1, FlowId: 1234, PortNo: 1}}}}},
-		{"handleIndication-32", dh2, args{indication: &oop.Indication{Data: &oop.Indication_PortStats{PortStats: &oop.PortStatistics{IntfId: 1, RxBytes: 100, RxPackets: 100, RxUcastPackets: 100, RxMcastPackets: 100, RxBcastPackets: 100, RxErrorPackets: 100, TxBytes: 100, TxPackets: 100, TxUcastPackets: 100, TxMcastPackets: 100, TxBcastPackets: 100, TxErrorPackets: 100, RxCrcErrors: 100, BipErrors: 100, Timestamp: 1000}}}}},
+		{"handleIndication-32", dh2, args{indication: &oop.Indication{Data: &oop.Indication_PortStats{PortStats: &cmn.PortStatistics{IntfId: 1, RxBytes: 100, RxPackets: 100, RxUcastPackets: 100, RxMcastPackets: 100, RxBcastPackets: 100, RxErrorPackets: 100, TxBytes: 100, TxPackets: 100, TxUcastPackets: 100, TxMcastPackets: 100, TxBcastPackets: 100, TxErrorPackets: 100, RxCrcErrors: 100, BipErrors: 100, Timestamp: 1000}}}}},
 		{"handleIndication-33", dh2, args{indication: &oop.Indication{Data: &oop.Indication_FlowStats{FlowStats: &oop.FlowStatistics{RxBytes: 100, RxPackets: 100, TxBytes: 100, TxPackets: 100, Timestamp: 1000}}}}},
 		{"handleIndication-34", dh2, args{indication: &oop.Indication{Data: &oop.Indication_AlarmInd{AlarmInd: &oop.AlarmIndication{}}}}},
 		//
diff --git a/internal/pkg/core/openolt.go b/internal/pkg/core/openolt.go
index 44ac313..aa63387 100644
--- a/internal/pkg/core/openolt.go
+++ b/internal/pkg/core/openolt.go
@@ -406,6 +406,8 @@
 // GetSingleValue handles get uni status on ONU and ondemand metric on OLT
 func (oo *OpenOLT) GetSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
 	logger.Infow(ctx, "single_get_value_request", log.Fields{"request": request})
+	var handler *DeviceHandler
+	var onuDevice *voltha.Device
 
 	errResp := func(status extension.GetValueResponse_Status,
 		reason extension.GetValueResponse_ErrorReason) *extension.SingleGetValueResponse {
@@ -416,7 +418,15 @@
 			},
 		}
 	}
-	if handler := oo.getDeviceHandler(request.TargetId); handler != nil {
+
+	switch request.GetRequest().GetRequest().(type) {
+	case *extension.GetValueRequest_OnuStatsFromOlt:
+		handler, onuDevice = oo.GetDeviceHandlerFromChild(ctx, request.TargetId)
+	default:
+		handler = oo.getDeviceHandler(request.TargetId)
+	}
+
+	if handler != nil {
 		switch reqType := request.GetRequest().GetRequest().(type) {
 		case *extension.GetValueRequest_OltPortInfo:
 			return handler.getOltPortCounters(ctx, reqType.OltPortInfo), nil
@@ -428,6 +438,8 @@
 			return handler.getPONRxPower(ctx, reqType.OltRxPower), nil
 		case *extension.GetValueRequest_OffloadedAppsStats:
 			return handler.getOltOffloadStats(ctx, reqType.OffloadedAppsStats), nil
+		case *extension.GetValueRequest_OnuStatsFromOlt:
+			return handler.getOnuStatsFromOlt(ctx, reqType.OnuStatsFromOlt, onuDevice), nil
 		default:
 			return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_UNSUPPORTED), nil
 		}
@@ -465,6 +477,23 @@
 	return errResp(extension.SetValueResponse_ERROR, extension.SetValueResponse_INVALID_DEVICE_ID), nil
 }
 
+func (oo *OpenOLT) GetDeviceHandlerFromChild(ctx context.Context, deviceId string) (*DeviceHandler, *voltha.Device) {
+	oo.lockDeviceHandlersMap.Lock()
+	defer oo.lockDeviceHandlersMap.Unlock()
+
+	for parentId, handler := range oo.deviceHandlers {
+		devices, _ := handler.getChildDevicesFromCore(ctx, parentId)
+		if devices != nil {
+			for _, onuDevice := range devices.Items {
+				if onuDevice.Id == deviceId {
+					return handler, onuDevice
+				}
+			}
+		}
+	}
+	return nil, nil
+}
+
 /*
  *  OLT Inter-adapter service
  */
diff --git a/internal/pkg/core/statsmanager.go b/internal/pkg/core/statsmanager.go
index 1f9d7f9..8cbc243 100644
--- a/internal/pkg/core/statsmanager.go
+++ b/internal/pkg/core/statsmanager.go
@@ -29,6 +29,7 @@
 
 	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
+	"github.com/opencord/voltha-protos/v5/go/common"
 	"github.com/opencord/voltha-protos/v5/go/extension"
 	"github.com/opencord/voltha-protos/v5/go/openolt"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
@@ -612,7 +613,7 @@
 }
 
 // PortStatisticsIndication handles the port statistics indication
-func (StatMgr *OpenOltStatisticsMgr) PortStatisticsIndication(ctx context.Context, PortStats *openolt.PortStatistics, NumPonPorts uint32) {
+func (StatMgr *OpenOltStatisticsMgr) PortStatisticsIndication(ctx context.Context, PortStats *common.PortStatistics, NumPonPorts uint32) {
 	StatMgr.PortsStatisticsKpis(ctx, PortStats, NumPonPorts)
 	logger.Debugw(ctx, "received-port-stats-indication", log.Fields{"port-stats": PortStats})
 	// Indicate that PortStatisticsIndication is handled
@@ -628,7 +629,7 @@
 }
 
 // PortsStatisticsKpis map the port stats values into a dictionary, creates the kpiEvent and then publish to Kafka
-func (StatMgr *OpenOltStatisticsMgr) PortsStatisticsKpis(ctx context.Context, PortStats *openolt.PortStatistics, NumPonPorts uint32) {
+func (StatMgr *OpenOltStatisticsMgr) PortsStatisticsKpis(ctx context.Context, PortStats *common.PortStatistics, NumPonPorts uint32) {
 	/*map the port stats values into a dictionary
 	  Create a kpoEvent and publish to Kafka
 
diff --git a/internal/pkg/core/statsmanager_test.go b/internal/pkg/core/statsmanager_test.go
index 1cec548..fe845d6 100644
--- a/internal/pkg/core/statsmanager_test.go
+++ b/internal/pkg/core/statsmanager_test.go
@@ -23,6 +23,7 @@
 	"reflect"
 	"testing"
 
+	"github.com/opencord/voltha-protos/v5/go/common"
 	"github.com/opencord/voltha-protos/v5/go/openolt"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
@@ -45,14 +46,14 @@
 	StatMgr := NewOpenOltStatsMgr(context.Background(), dh)
 
 	type args struct {
-		PortStats *openolt.PortStatistics
+		PortStats *common.PortStatistics
 	}
 	tests := []struct {
 		name string
 		args args
 	}{
 		// TODO: Add test cases.
-		{"PortStatisticsIndication", args{PortStats: &openolt.PortStatistics{}}},
+		{"PortStatisticsIndication", args{PortStats: &common.PortStatistics{}}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {