[VOL-5291] - On demand onu stats from olt
Change-Id: I65e7b5a8bd93ec862726a7302dcc18be16855b4f
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index b12dca1..2c18980 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -57,6 +57,7 @@
"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
of "github.com/opencord/voltha-protos/v5/go/openflow_13"
oop "github.com/opencord/voltha-protos/v5/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/v5/go/tech_profile"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
@@ -3679,6 +3680,133 @@
return &singleValResp
}
+func (dh *DeviceHandler) CheckOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32) (*rsrcMgr.OnuGemInfo, error) {
+ onuGemInfo, err := dh.resourceMgr[intfID].GetOnuGemInfo(ctx, onuID)
+ if err != nil {
+ logger.Errorw(ctx, "Unable to find onuGemInfo", log.Fields{"onuID": onuID})
+ return nil, err
+ }
+ if onuGemInfo != nil {
+ if len(onuGemInfo.UniPorts) == 0 {
+ logger.Errorw(ctx, "No uniports present for the ONU", log.Fields{"onuID": onuID})
+ return nil, err
+ }
+ logger.Debugw(ctx, "Received onuGemInfo", log.Fields{"onuID": onuID, "onuGemInfo": onuGemInfo})
+ return onuGemInfo, nil
+ }
+ logger.Errorw(ctx, "Received nil onuGemInfo", log.Fields{"onuID": onuID})
+ return nil, fmt.Errorf("onuGemInfo received as null, onuID : %d", onuID)
+}
+
+func (dh *DeviceHandler) getAllocGemStats(ctx context.Context, intfID uint32, onuID uint32, onuGemInfo *rsrcMgr.OnuGemInfo, singleValResp *extension.SingleGetValueResponse) error {
+ var err error
+ var allocStats *oop.OnuAllocIdStatistics
+ var onuGemStats *oop.GemPortStatistics
+ for _, uni := range onuGemInfo.UniPorts {
+ uniID := plt.UniIDFromPortNum(uni)
+ uniPath := getUniPortPath(dh.device.Id, intfID, int32(onuID), int32(uniID))
+ tpIDs := dh.resourceMgr[intfID].GetTechProfileIDForOnu(ctx, onuID, uniID)
+ if len(tpIDs) == 0 {
+ logger.Errorw(ctx, "No TPIDs present for the Uni", log.Fields{"onuID": onuID, "uniPort": uni})
+ continue
+ }
+ for _, tpId := range tpIDs {
+ tpPath := dh.flowMgr[intfID].getTPpath(ctx, uniPath, uint32(tpId))
+ techProfileInstance, _ := dh.flowMgr[intfID].techprofile.GetTPInstance(ctx, tpPath)
+ onuStatsFromOlt := extension.OnuAllocGemStatsFromOltResponse{}
+ if techProfileInstance != nil {
+ switch tpInst := techProfileInstance.(type) {
+ case *tp_pb.TechProfileInstance:
+ allocId := tpInst.UsScheduler.AllocId
+ onuAllocPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, AllocId: allocId}
+ allocStats, err = dh.Client.GetAllocIdStatistics(ctx, &onuAllocPkt)
+ if err != nil {
+ logger.Errorw(ctx, "Error received from openolt for GetAllocIdStatistics", log.Fields{"onuID": onuID, "AllodId": allocId, "Error": err})
+ return err
+ }
+ allocIdInfo := extension.OnuAllocIdInfoFromOlt{}
+ allocIdInfo.AllocId = allocStats.AllocId
+ allocIdInfo.RxBytes = allocStats.RxBytes
+
+ onuStatsFromOlt.AllocIdInfo = &allocIdInfo
+
+ gemPorts := tpInst.UpstreamGemPortAttributeList
+ for _, gem := range gemPorts {
+ onuGemPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, GemportId: gem.GemportId}
+ onuGemStats, err = dh.Client.GetGemPortStatistics(ctx, &onuGemPkt)
+ if err != nil {
+ logger.Errorw(ctx, "Error received from openolt for GetGemPortStatistics", log.Fields{"onuID": onuID, "GemPortId": gem.GemportId, "Error": err})
+ return err
+ }
+ gemStatsInfo := extension.OnuGemPortInfoFromOlt{}
+ gemStatsInfo.GemId = onuGemStats.GemportId
+ gemStatsInfo.RxBytes = onuGemStats.RxBytes
+ gemStatsInfo.RxPackets = onuGemStats.RxPackets
+ gemStatsInfo.TxBytes = onuGemStats.TxBytes
+ gemStatsInfo.TxPackets = onuGemStats.TxPackets
+
+ onuStatsFromOlt.GemPortInfo = append(onuStatsFromOlt.GemPortInfo, &gemStatsInfo)
+ }
+ singleValResp.Response.GetOnuStatsFromOltResponse().AllocGemStatsInfo = append(singleValResp.Response.GetOnuStatsFromOltResponse().AllocGemStatsInfo, &onuStatsFromOlt)
+
+ default:
+ logger.Errorw(ctx, "Invalid Techprofile type received", log.Fields{"onuID": onuID, "TpId": tpId})
+ return err
+ }
+ } else {
+ logger.Errorw(ctx, "No TpInstance found for the TpID", log.Fields{"onuID": onuID, "TpId": tpId})
+ continue
+ }
+ }
+ }
+ return err
+}
+
+//nolint:unparam
+func (dh *DeviceHandler) getOnuStatsFromOlt(ctx context.Context, onuInfo *extension.GetOnuStatsFromOltRequest, onuDevice *voltha.Device) *extension.SingleGetValueResponse {
+ singleValResp := extension.SingleGetValueResponse{
+ Response: &extension.GetValueResponse{
+ Status: extension.GetValueResponse_OK,
+ Response: &extension.GetValueResponse_OnuStatsFromOltResponse{
+ OnuStatsFromOltResponse: &extension.GetOnuStatsFromOltResponse{},
+ },
+ },
+ }
+ errResp := func(status extension.GetValueResponse_Status,
+ reason extension.GetValueResponse_ErrorReason) *extension.SingleGetValueResponse {
+ return &extension.SingleGetValueResponse{
+ Response: &extension.GetValueResponse{
+ Status: status,
+ ErrReason: reason,
+ },
+ }
+ }
+
+ var intfID, onuID uint32
+ if onuDevice != nil {
+ onuID = onuDevice.ProxyAddress.OnuId
+ intfID = plt.PortNoToIntfID(onuDevice.ParentPortNo, voltha.Port_PON_OLT)
+ }
+
+ onuKey := dh.formOnuKey(intfID, onuID)
+ if _, ok := dh.onus.Load(onuKey); !ok {
+ logger.Errorw(ctx, "getOnuStatsFromOlt request received for invalid device", log.Fields{"deviceId": onuDevice.Id, "intfID": intfID, "onuID": onuID})
+ return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INVALID_DEVICE)
+ }
+ logger.Debugw(ctx, "getOnuStatsFromOlt request received", log.Fields{"intfID": intfID, "onuID": onuID})
+
+ onuGemInfo, err := dh.CheckOnuGemInfo(ctx, intfID, onuID)
+ if err == nil {
+ err = dh.getAllocGemStats(ctx, intfID, onuID, onuGemInfo, &singleValResp)
+ if err != nil {
+ return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INTERNAL_ERROR)
+ }
+ } else {
+ return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INVALID_DEVICE)
+ }
+ return &singleValResp
+}
+
func (dh *DeviceHandler) getOnuInfo(ctx context.Context, intfID uint32, onuID *uint32) (*oop.OnuInfo, error) {
Onu := oop.Onu{IntfId: intfID, OnuId: *onuID}
OnuInfo, err := dh.Client.GetOnuInfo(ctx, &Onu)
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 834c560..bca0287 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -37,6 +37,7 @@
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
"github.com/opencord/voltha-openolt-adapter/pkg/mocks"
+ cmn "github.com/opencord/voltha-protos/v5/go/common"
ia "github.com/opencord/voltha-protos/v5/go/inter_adapter"
of "github.com/opencord/voltha-protos/v5/go/openflow_13"
ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
@@ -856,7 +857,7 @@
{"handleIndication-11", dh1, args{indication: &oop.Indication{Data: &oop.Indication_OnuInd{OnuInd: &oop.OnuIndication{IntfId: 1, OnuId: 1, OperState: "down", AdminState: "down"}}}}},
{"handleIndication-12", dh1, args{indication: &oop.Indication{Data: &oop.Indication_OmciInd{OmciInd: &oop.OmciIndication{IntfId: 1, OnuId: 1, Pkt: []byte("onu123-random value")}}}}},
{"handleIndication-13", dh1, args{indication: &oop.Indication{Data: &oop.Indication_PktInd{PktInd: &oop.PacketIndication{IntfType: "nni", IntfId: 1, GemportId: 1, FlowId: 1234, PortNo: 1}}}}},
- {"handleIndication-14", dh1, args{indication: &oop.Indication{Data: &oop.Indication_PortStats{PortStats: &oop.PortStatistics{IntfId: 1, RxBytes: 100, RxPackets: 100, RxUcastPackets: 100, RxMcastPackets: 100, RxBcastPackets: 100, RxErrorPackets: 100, TxBytes: 100, TxPackets: 100, TxUcastPackets: 100, TxMcastPackets: 100, TxBcastPackets: 100, TxErrorPackets: 100, RxCrcErrors: 100, BipErrors: 100, Timestamp: 1000}}}}},
+ {"handleIndication-14", dh1, args{indication: &oop.Indication{Data: &oop.Indication_PortStats{PortStats: &cmn.PortStatistics{IntfId: 1, RxBytes: 100, RxPackets: 100, RxUcastPackets: 100, RxMcastPackets: 100, RxBcastPackets: 100, RxErrorPackets: 100, TxBytes: 100, TxPackets: 100, TxUcastPackets: 100, TxMcastPackets: 100, TxBcastPackets: 100, TxErrorPackets: 100, RxCrcErrors: 100, BipErrors: 100, Timestamp: 1000}}}}},
{"handleIndication-15", dh1, args{indication: &oop.Indication{Data: &oop.Indication_FlowStats{FlowStats: &oop.FlowStatistics{RxBytes: 100, RxPackets: 100, TxBytes: 100, TxPackets: 100, Timestamp: 1000}}}}},
{"handleIndication-16", dh1, args{indication: &oop.Indication{Data: &oop.Indication_AlarmInd{AlarmInd: &oop.AlarmIndication{}}}}},
{"handleIndication-17", dh1, args{indication: &oop.Indication{Data: &oop.Indication_PktInd{PktInd: &oop.PacketIndication{IntfType: "nni", FlowId: 1234, PortNo: 1}}}}},
@@ -876,7 +877,7 @@
{"handleIndication-29", dh2, args{indication: &oop.Indication{Data: &oop.Indication_OnuInd{OnuInd: &oop.OnuIndication{IntfId: 1, OnuId: 1, OperState: "down", AdminState: "down"}}}}},
{"handleIndication-30", dh2, args{indication: &oop.Indication{Data: &oop.Indication_OmciInd{OmciInd: &oop.OmciIndication{IntfId: 1, OnuId: 1, Pkt: []byte("onu123-random value")}}}}},
{"handleIndication-31", dh2, args{indication: &oop.Indication{Data: &oop.Indication_PktInd{PktInd: &oop.PacketIndication{IntfType: "nni", IntfId: 1, GemportId: 1, FlowId: 1234, PortNo: 1}}}}},
- {"handleIndication-32", dh2, args{indication: &oop.Indication{Data: &oop.Indication_PortStats{PortStats: &oop.PortStatistics{IntfId: 1, RxBytes: 100, RxPackets: 100, RxUcastPackets: 100, RxMcastPackets: 100, RxBcastPackets: 100, RxErrorPackets: 100, TxBytes: 100, TxPackets: 100, TxUcastPackets: 100, TxMcastPackets: 100, TxBcastPackets: 100, TxErrorPackets: 100, RxCrcErrors: 100, BipErrors: 100, Timestamp: 1000}}}}},
+ {"handleIndication-32", dh2, args{indication: &oop.Indication{Data: &oop.Indication_PortStats{PortStats: &cmn.PortStatistics{IntfId: 1, RxBytes: 100, RxPackets: 100, RxUcastPackets: 100, RxMcastPackets: 100, RxBcastPackets: 100, RxErrorPackets: 100, TxBytes: 100, TxPackets: 100, TxUcastPackets: 100, TxMcastPackets: 100, TxBcastPackets: 100, TxErrorPackets: 100, RxCrcErrors: 100, BipErrors: 100, Timestamp: 1000}}}}},
{"handleIndication-33", dh2, args{indication: &oop.Indication{Data: &oop.Indication_FlowStats{FlowStats: &oop.FlowStatistics{RxBytes: 100, RxPackets: 100, TxBytes: 100, TxPackets: 100, Timestamp: 1000}}}}},
{"handleIndication-34", dh2, args{indication: &oop.Indication{Data: &oop.Indication_AlarmInd{AlarmInd: &oop.AlarmIndication{}}}}},
//
diff --git a/internal/pkg/core/openolt.go b/internal/pkg/core/openolt.go
index 44ac313..aa63387 100644
--- a/internal/pkg/core/openolt.go
+++ b/internal/pkg/core/openolt.go
@@ -406,6 +406,8 @@
// GetSingleValue handles get uni status on ONU and ondemand metric on OLT
func (oo *OpenOLT) GetSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
logger.Infow(ctx, "single_get_value_request", log.Fields{"request": request})
+ var handler *DeviceHandler
+ var onuDevice *voltha.Device
errResp := func(status extension.GetValueResponse_Status,
reason extension.GetValueResponse_ErrorReason) *extension.SingleGetValueResponse {
@@ -416,7 +418,15 @@
},
}
}
- if handler := oo.getDeviceHandler(request.TargetId); handler != nil {
+
+ switch request.GetRequest().GetRequest().(type) {
+ case *extension.GetValueRequest_OnuStatsFromOlt:
+ handler, onuDevice = oo.GetDeviceHandlerFromChild(ctx, request.TargetId)
+ default:
+ handler = oo.getDeviceHandler(request.TargetId)
+ }
+
+ if handler != nil {
switch reqType := request.GetRequest().GetRequest().(type) {
case *extension.GetValueRequest_OltPortInfo:
return handler.getOltPortCounters(ctx, reqType.OltPortInfo), nil
@@ -428,6 +438,8 @@
return handler.getPONRxPower(ctx, reqType.OltRxPower), nil
case *extension.GetValueRequest_OffloadedAppsStats:
return handler.getOltOffloadStats(ctx, reqType.OffloadedAppsStats), nil
+ case *extension.GetValueRequest_OnuStatsFromOlt:
+ return handler.getOnuStatsFromOlt(ctx, reqType.OnuStatsFromOlt, onuDevice), nil
default:
return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_UNSUPPORTED), nil
}
@@ -465,6 +477,23 @@
return errResp(extension.SetValueResponse_ERROR, extension.SetValueResponse_INVALID_DEVICE_ID), nil
}
+func (oo *OpenOLT) GetDeviceHandlerFromChild(ctx context.Context, deviceId string) (*DeviceHandler, *voltha.Device) {
+ oo.lockDeviceHandlersMap.Lock()
+ defer oo.lockDeviceHandlersMap.Unlock()
+
+ for parentId, handler := range oo.deviceHandlers {
+ devices, _ := handler.getChildDevicesFromCore(ctx, parentId)
+ if devices != nil {
+ for _, onuDevice := range devices.Items {
+ if onuDevice.Id == deviceId {
+ return handler, onuDevice
+ }
+ }
+ }
+ }
+ return nil, nil
+}
+
/*
* OLT Inter-adapter service
*/
diff --git a/internal/pkg/core/statsmanager.go b/internal/pkg/core/statsmanager.go
index 1f9d7f9..8cbc243 100644
--- a/internal/pkg/core/statsmanager.go
+++ b/internal/pkg/core/statsmanager.go
@@ -29,6 +29,7 @@
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
+ "github.com/opencord/voltha-protos/v5/go/common"
"github.com/opencord/voltha-protos/v5/go/extension"
"github.com/opencord/voltha-protos/v5/go/openolt"
"github.com/opencord/voltha-protos/v5/go/voltha"
@@ -612,7 +613,7 @@
}
// PortStatisticsIndication handles the port statistics indication
-func (StatMgr *OpenOltStatisticsMgr) PortStatisticsIndication(ctx context.Context, PortStats *openolt.PortStatistics, NumPonPorts uint32) {
+func (StatMgr *OpenOltStatisticsMgr) PortStatisticsIndication(ctx context.Context, PortStats *common.PortStatistics, NumPonPorts uint32) {
StatMgr.PortsStatisticsKpis(ctx, PortStats, NumPonPorts)
logger.Debugw(ctx, "received-port-stats-indication", log.Fields{"port-stats": PortStats})
// Indicate that PortStatisticsIndication is handled
@@ -628,7 +629,7 @@
}
// PortsStatisticsKpis map the port stats values into a dictionary, creates the kpiEvent and then publish to Kafka
-func (StatMgr *OpenOltStatisticsMgr) PortsStatisticsKpis(ctx context.Context, PortStats *openolt.PortStatistics, NumPonPorts uint32) {
+func (StatMgr *OpenOltStatisticsMgr) PortsStatisticsKpis(ctx context.Context, PortStats *common.PortStatistics, NumPonPorts uint32) {
/*map the port stats values into a dictionary
Create a kpoEvent and publish to Kafka
diff --git a/internal/pkg/core/statsmanager_test.go b/internal/pkg/core/statsmanager_test.go
index 1cec548..fe845d6 100644
--- a/internal/pkg/core/statsmanager_test.go
+++ b/internal/pkg/core/statsmanager_test.go
@@ -23,6 +23,7 @@
"reflect"
"testing"
+ "github.com/opencord/voltha-protos/v5/go/common"
"github.com/opencord/voltha-protos/v5/go/openolt"
"github.com/opencord/voltha-protos/v5/go/voltha"
)
@@ -45,14 +46,14 @@
StatMgr := NewOpenOltStatsMgr(context.Background(), dh)
type args struct {
- PortStats *openolt.PortStatistics
+ PortStats *common.PortStatistics
}
tests := []struct {
name string
args args
}{
// TODO: Add test cases.
- {"PortStatisticsIndication", args{PortStats: &openolt.PortStatistics{}}},
+ {"PortStatisticsIndication", args{PortStats: &common.PortStatistics{}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {