[VOL-1800] Implement Performance configuration in Voltha Core.
This is a port of the exisiting voltha 1.x funtionality into
the Voltha 2.0 Core.
Change-Id: I87bf8836fd392c1c7f4a2c45e85323d1cbe0079f
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index 8371e09..bb03c22 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -499,3 +499,21 @@
log.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
+
+func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+ log.Debugw("DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
+ rpc := "DevicePMConfigUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(pmConfigs.Id)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device_pm_config",
+ Value: pmConfigs,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
+ log.Debugw("DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
+ return unPackResponse(rpc, pmConfigs.Id, success, result)
+}
diff --git a/adapters/common/performance_metrics.go b/adapters/common/performance_metrics.go
new file mode 100644
index 0000000..8f74439
--- /dev/null
+++ b/adapters/common/performance_metrics.go
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "github.com/opencord/voltha-protos/go/voltha"
+)
+
+type PmMetrics struct {
+ deviceId string
+ frequency uint32
+ grouped bool
+ frequencyOverride bool
+ metrics map[string]*voltha.PmConfig
+}
+
+type PmMetricsOption func(*PmMetrics)
+
+func Frequency(frequency uint32) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.frequency = frequency
+ }
+}
+
+func Grouped(grouped bool) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.grouped = grouped
+ }
+}
+
+func FrequencyOverride(frequencyOverride bool) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.frequencyOverride = frequencyOverride
+ }
+}
+
+func Metrics(pmNames []string) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.metrics = make(map[string]*voltha.PmConfig)
+ for _, name := range pmNames {
+ args.metrics[name] = &voltha.PmConfig{
+ Name: name,
+ Type: voltha.PmConfig_COUNTER,
+ Enabled: true,
+ }
+ }
+ }
+}
+
+func NewPmMetrics(deviceId string, opts ...PmMetricsOption) *PmMetrics {
+ pm := &PmMetrics{deviceId: deviceId}
+ for _, option := range opts {
+ option(pm)
+ }
+ return pm
+}
+
+func (pm *PmMetrics) ToPmConfigs() *voltha.PmConfigs {
+ pmConfigs := &voltha.PmConfigs{
+ Id: pm.deviceId,
+ DefaultFreq: pm.frequency,
+ Grouped: pm.grouped,
+ FreqOverride: pm.frequencyOverride,
+ }
+ for _, v := range pm.metrics {
+ pmConfigs.Metrics = append(pmConfigs.Metrics, &voltha.PmConfig{Name: v.Name, Type: v.Type, Enabled: v.Enabled})
+ }
+ return pmConfigs
+}
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index 7ce4414..45d8b72 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -256,7 +256,7 @@
}
//Update the core reference for that device
rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
- //Invoke the Disable_device API on the adapter
+ //Invoke the delete_device API on the adapter
if err := rhp.adapter.Delete_device(device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
@@ -303,7 +303,7 @@
}
}
log.Debugw("Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
- //Invoke the adopt device on the adapter
+ //Invoke the bulk flow update API of the adapter
if err := rhp.adapter.Update_flows_bulk(device, flows, groups); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
@@ -346,7 +346,7 @@
}
}
log.Debugw("Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
- //Invoke the adopt device on the adapter
+ //Invoke the incremental flow update API of the adapter
if err := rhp.adapter.Update_flows_incrementally(device, flows, groups); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
@@ -354,6 +354,39 @@
}
func (rhp *RequestHandlerProxy) Update_pm_config(args []*ic.Argument) (*empty.Empty, error) {
+ log.Debug("Update_pm_config")
+ if len(args) < 2 {
+ log.Warn("Update_pm_config-invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ pmConfigs := &voltha.PmConfigs{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "pm_configs":
+ if err := ptypes.UnmarshalAny(arg.Value, pmConfigs); err != nil {
+ log.Warnw("cannot-unmarshal-pm-configs", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ log.Debugw("Update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ //Invoke the pm config update API of the adapter
+ if err := rhp.adapter.Update_pm_config(device, pmConfigs); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
diff --git a/adapters/simulated_olt/adaptercore/device_handler.go b/adapters/simulated_olt/adaptercore/device_handler.go
index 2c1abbf..dd9b2b6 100644
--- a/adapters/simulated_olt/adaptercore/device_handler.go
+++ b/adapters/simulated_olt/adaptercore/device_handler.go
@@ -29,6 +29,24 @@
"sync"
)
+// A set of pm names to create the initial pm config. This is used only for testing in this simulated adapter
+var pmNames = []string{
+ "tx_64_pkts",
+ "tx_65_127_pkts",
+ "tx_128_255_pkts",
+ "tx_256_511_pkts",
+ "tx_512_1023_pkts",
+ "tx_1024_1518_pkts",
+ "tx_1519_9k_pkts",
+ "rx_64_pkts",
+ "rx_65_127_pkts",
+ "rx_128_255_pkts",
+ "rx_256_511_pkts",
+ "rx_512_1023_pkts",
+ "rx_1024_1518_pkts",
+ "rx_1519_9k_pkts",
+}
+
//DeviceHandler follows the same patterns as ponsim_olt. The only difference is that it does not
// interact with an OLT device.
type DeviceHandler struct {
@@ -41,6 +59,7 @@
ponPort *voltha.Port
exitChannel chan int
lockDevice sync.RWMutex
+ metrics *com.PmMetrics
}
//NewDeviceHandler creates a new device handler
@@ -54,6 +73,14 @@
dh.simulatedOLT = adapter
dh.exitChannel = make(chan int, 1)
dh.lockDevice = sync.RWMutex{}
+ // Set up PON metrics
+ dh.metrics = com.NewPmMetrics(
+ cloned.Id,
+ com.Frequency(150),
+ com.Grouped(false),
+ com.FrequencyOverride(false),
+ com.Metrics(pmNames),
+ )
return &dh
}
@@ -105,6 +132,11 @@
log.Errorw("error-updating-device", log.Fields{"deviceId": device.Id, "error": err})
}
+ // Now, set the initial PM configuration for that device
+ if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
+ log.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
// Now create the NNI Port
dh.nniPort = &voltha.Port{
PortNo: 2,
@@ -278,3 +310,9 @@
// For now we do nothing with it
return
}
+
+func (dh *DeviceHandler) UpdatePmConfigs(device *voltha.Device, pmConfigs *voltha.PmConfigs) {
+ log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ // For now we do nothing with it
+ return
+}
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index 4349a1b..8b6d47e 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -241,8 +241,17 @@
return nil
}
-func (so *SimulatedOLT) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
- return errors.New("UnImplemented")
+func (so *SimulatedOLT) Update_pm_config(device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Debugw("update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.UpdatePmConfigs(device, pmConfigs)
+ }
+ return nil
}
func (so *SimulatedOLT) Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error {
diff --git a/adapters/simulated_onu/adaptercore/device_handler.go b/adapters/simulated_onu/adaptercore/device_handler.go
index 61b1c0b..908fdd5 100644
--- a/adapters/simulated_onu/adaptercore/device_handler.go
+++ b/adapters/simulated_onu/adaptercore/device_handler.go
@@ -30,6 +30,21 @@
"time"
)
+// A set of pm names to create the initial pm config. This is used only for testing in this simulated adapter
+var pmNames = []string{
+ "tx_64_pkts",
+ "tx_65_127_pkts",
+ "tx_128_255_pkts",
+ "tx_1024_1518_pkts",
+ "tx_1519_9k_pkts",
+ "rx_64_pkts",
+ "rx_64_pkts",
+ "rx_65_127_pkts",
+ "rx_128_255_pkts",
+ "rx_1024_1518_pkts",
+ "rx_1519_9k_pkts",
+}
+
//DeviceHandler follows the same patterns as ponsim_olt. The only difference is that it does not
// interact with an OLT device.
type DeviceHandler struct {
@@ -42,6 +57,7 @@
ponPort *voltha.Port
exitChannel chan int
lockDevice sync.RWMutex
+ metrics *com.PmMetrics
}
//NewDeviceHandler creates a new device handler
@@ -55,6 +71,13 @@
dh.simulatedOLT = adapter
dh.exitChannel = make(chan int, 1)
dh.lockDevice = sync.RWMutex{}
+ dh.metrics = com.NewPmMetrics(
+ cloned.Id,
+ com.Frequency(150),
+ com.Grouped(false),
+ com.FrequencyOverride(false),
+ com.Metrics(pmNames),
+ )
return &dh
}
@@ -113,6 +136,11 @@
log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
}
+ // Now, set the initial PM configuration for that device
+ if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
+ log.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
// Sleep to mimic the omci management channel creation with the OLT
time.Sleep(10 * time.Millisecond)
@@ -268,3 +296,9 @@
// For now we do nothing with it
return
}
+
+func (dh *DeviceHandler) UpdatePmConfigs(device *voltha.Device, pmConfigs *voltha.PmConfigs) {
+ log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ // For now we do nothing with it
+ return
+}
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index d8d4127..02448f5 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -233,8 +233,17 @@
return nil
}
-func (so *SimulatedONU) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
- return errors.New("UnImplemented")
+func (so *SimulatedONU) Update_pm_config(device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Debugw("update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.UpdatePmConfigs(device, pmConfigs)
+ }
+ return nil
}
func (so *SimulatedONU) Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error {
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 23b2073..a606a75 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -472,9 +472,24 @@
return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) UpdatePmConfig(device voltha.Device, pmConfigs voltha.PmConfigs) error {
- log.Debug("UpdatePmConfig")
- return nil
+func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
+ log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id})
+ toTopic := ap.getAdapterTopic(device.Adapter)
+ rpc := "Update_pm_config"
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "pm_configs",
+ Value: pmConfigs,
+ }
+
+ replyToTopic := ap.getCoreTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ log.Debugw("UpdatePmConfigs-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
}
func (ap *AdapterProxy) ReceivePacketOut(deviceId voltha.ID, egressPortNo int, msg interface{}) error {
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 0d37255..1a00db8 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -984,13 +984,12 @@
}
func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 3 {
+ if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
pmConfigs := &voltha.PmConfigs{}
- init := &ic.BoolType{}
transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
@@ -999,11 +998,6 @@
log.Warnw("cannot-unmarshal-pm-config", log.Fields{"error": err})
return nil, err
}
- case "init":
- if err := ptypes.UnmarshalAny(arg.Value, init); err != nil {
- log.Warnw("cannot-unmarshal-boolean", log.Fields{"error": err})
- return nil, err
- }
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
@@ -1012,7 +1006,7 @@
}
}
log.Debugw("DevicePMConfigUpdate", log.Fields{"deviceId": pmConfigs.Id, "configs": pmConfigs,
- "init": init, "transactionID": transactionID.Val})
+ "transactionID": transactionID.Val})
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
@@ -1029,10 +1023,7 @@
return nil, nil
}
- go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
- //if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
- // return nil, err
- //}
+ go rhp.deviceMgr.initPmConfigs(pmConfigs.Id, pmConfigs)
return new(empty.Empty), nil
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 0198254..a202c82 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -644,6 +644,66 @@
return nil
}
+func (agent *DeviceAgent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("updatePmConfigs", log.Fields{"id": pmConfigs.Id})
+ // Work only on latest data
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // clone the device
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+ // Store the device
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ // Send the request to the adapter
+ if err := agent.adapterProxy.UpdatePmConfigs(ctx, cloned, pmConfigs); err != nil {
+ log.Errorw("update-pm-configs-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
+ }
+ return nil
+ }
+}
+
+func (agent *DeviceAgent) initPmConfigs(pmConfigs *voltha.PmConfigs) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("initPmConfigs", log.Fields{"id": pmConfigs.Id})
+ // Work only on latest data
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // clone the device
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+ // Store the device
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ return nil
+ }
+}
+
+func (agent *DeviceAgent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
+ agent.lockDevice.RLock()
+ defer agent.lockDevice.RUnlock()
+ log.Debugw("listPmConfigs", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ cloned := proto.Clone(device).(*voltha.Device)
+ return cloned.PmConfigs, nil
+ }
+}
+
func (agent *DeviceAgent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -1131,27 +1191,6 @@
}
}
-func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debug("updatePmConfigs")
- // Work only on latest data
- if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
- return status.Errorf(codes.NotFound, "%s", agent.deviceId)
- } else {
- // clone the device
- cloned := proto.Clone(storeDevice).(*voltha.Device)
- cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
- // Store the device
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
- return nil
- }
-}
-
func (agent *DeviceAgent) addPort(port *voltha.Port) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 257b707..f92645b 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -596,13 +596,38 @@
return status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) updatePmConfigs(deviceId string, pmConfigs *voltha.PmConfigs) error {
+// updatePmConfigs updates the PM configs. This is executed when the northbound gRPC API is invoked, typically
+// following a user action
+func (dMgr *DeviceManager) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs, ch chan interface{}) {
+ var res interface{}
+ if pmConfigs.Id == "" {
+ res = status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
+ } else if agent := dMgr.getDeviceAgent(pmConfigs.Id); agent != nil {
+ res = agent.updatePmConfigs(ctx, pmConfigs)
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", pmConfigs.Id)
+ }
+ sendResponse(ctx, ch, res)
+}
+
+// initPmConfigs initialize the pm configs as defined by the adapter.
+func (dMgr *DeviceManager) initPmConfigs(deviceId string, pmConfigs *voltha.PmConfigs) error {
+ if pmConfigs.Id == "" {
+ return status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
+ }
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.updatePmConfigs(pmConfigs)
+ return agent.initPmConfigs(pmConfigs)
}
return status.Errorf(codes.NotFound, "%s", deviceId)
}
+func (dMgr *DeviceManager) listPmConfigs(ctx context.Context, deviceId string) (*voltha.PmConfigs, error) {
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.listPmConfigs(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*ic.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
@@ -617,7 +642,6 @@
return agent.getPorts(ctx, portType), nil
}
return nil, status.Errorf(codes.NotFound, "%s", deviceId)
-
}
func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*ic.PortCapability, error) {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index d5e0305..8180ecd 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -725,7 +725,30 @@
out := new(empty.Empty)
return out, nil
}
- return nil, errors.New("UnImplemented")
+ if handler.competeForTransaction() {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: configs.Id}); err != nil {
+ return new(empty.Empty), err
+ } else {
+ defer txn.Close()
+ }
+ }
+
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.updatePmConfigs(ctx, configs, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
+}
+
+func (handler *APIHandler) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+ log.Debugw("ListDevicePmConfigs-request", log.Fields{"deviceId": *id})
+ if handler.competeForTransaction() {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+ return &voltha.PmConfigs{}, err
+ } else {
+ defer txn.Close()
+ }
+ }
+ return handler.deviceMgr.listPmConfigs(ctx, id.Id)
}
func (handler *APIHandler) CreateAlarmFilter(ctx context.Context, filter *voltha.AlarmFilter) (*voltha.AlarmFilter, error) {
diff --git a/tests/core/performance_metrics_test.go b/tests/core/performance_metrics_test.go
new file mode 100644
index 0000000..9e074f5
--- /dev/null
+++ b/tests/core/performance_metrics_test.go
@@ -0,0 +1,313 @@
+// +build integration
+
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "fmt"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-go/common/log"
+ tu "github.com/opencord/voltha-go/tests/utils"
+ "github.com/opencord/voltha-protos/go/common"
+ "github.com/opencord/voltha-protos/go/voltha"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc/metadata"
+ "math"
+ "os"
+ "testing"
+ "time"
+)
+
+var stub voltha.VolthaServiceClient
+var volthaSerialNumberKey string
+
+/*
+ This local "integration" test uses one RW-Core, one simulated_olt and one simulated_onu adapter to test performance
+metrics, in a development environment. It uses docker-compose to set up the local environment. However, it can
+easily be extended to run in k8s environment.
+
+The compose files used are located under %GOPATH/src/github.com/opencord/voltha-go/compose. If the GOPATH is not set
+then you can specify the location of the compose files by using COMPOSE_PATH to set the compose files location.
+
+To run this test: DOCKER_HOST_IP=<local IP> go test -v
+
+*/
+
+var allDevices map[string]*voltha.Device
+var allLogicalDevices map[string]*voltha.LogicalDevice
+
+var composePath string
+
+const (
+ GRPC_PORT = 50057
+ NUM_OLTS = 1
+ NUM_ONUS_PER_OLT = 4 // This should coincide with the number of onus per olt in adapters-simulated.yml file
+)
+
+var parentPmNames = []string{
+ "tx_64_pkts",
+ "tx_65_127_pkts",
+ "tx_128_255_pkts",
+ "tx_256_511_pkts",
+ "tx_512_1023_pkts",
+ "tx_1024_1518_pkts",
+ "tx_1519_9k_pkts",
+ "rx_64_pkts",
+ "rx_65_127_pkts",
+ "rx_128_255_pkts",
+ "rx_256_511_pkts",
+ "rx_512_1023_pkts",
+ "rx_1024_1518_pkts",
+ "rx_1519_9k_pkts",
+}
+
+var childPmNames = []string{
+ "tx_64_pkts",
+ "tx_65_127_pkts",
+ "tx_128_255_pkts",
+ "tx_1024_1518_pkts",
+ "tx_1519_9k_pkts",
+ "rx_64_pkts",
+ "rx_64_pkts",
+ "rx_65_127_pkts",
+ "rx_128_255_pkts",
+ "rx_1024_1518_pkts",
+ "rx_1519_9k_pkts",
+}
+
+func setup() {
+ var err error
+
+ if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+ log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+ log.SetAllLogLevel(log.ErrorLevel)
+
+ volthaSerialNumberKey = "voltha_serial_number"
+ allDevices = make(map[string]*voltha.Device)
+ allLogicalDevices = make(map[string]*voltha.LogicalDevice)
+
+ grpcHostIP := os.Getenv("DOCKER_HOST_IP")
+ goPath := os.Getenv("GOPATH")
+ if goPath != "" {
+ composePath = fmt.Sprintf("%s/src/github.com/opencord/voltha-go/compose", goPath)
+ } else {
+ composePath = os.Getenv("COMPOSE_PATH")
+ }
+
+ fmt.Println("Using compose path:", composePath)
+
+ //Start the simulated environment
+ if err = tu.StartSimulatedEnv(composePath); err != nil {
+ fmt.Println("Failure starting simulated environment:", err)
+ os.Exit(10)
+ }
+
+ stub, err = tu.SetupGrpcConnectionToCore(grpcHostIP, GRPC_PORT)
+ if err != nil {
+ fmt.Println("Failure connecting to Voltha Core:", err)
+ os.Exit(11)
+ }
+
+ // Wait for the simulated devices to be registered in the Voltha Core
+ adapters := []string{"simulated_olt", "simulated_onu"}
+ if _, err = tu.WaitForAdapterRegistration(stub, adapters, 40); err != nil {
+ fmt.Println("Failure retrieving adapters:", err)
+ os.Exit(12)
+ }
+}
+
+func shutdown() {
+ err := tu.StopSimulatedEnv(composePath)
+ if err != nil {
+ fmt.Println("Failure stop simulated environment:", err)
+ }
+}
+
+func refreshLocalDeviceCache(stub voltha.VolthaServiceClient) error {
+ retrievedDevices, err := tu.ListDevices(stub)
+ if err != nil {
+ return err
+ }
+ for _, d := range retrievedDevices.Items {
+ allDevices[d.Id] = d
+ }
+
+ retrievedLogicalDevices, err := tu.ListLogicalDevices(stub)
+ if err != nil {
+ return err
+ }
+
+ for _, ld := range retrievedLogicalDevices.Items {
+ allLogicalDevices[ld.Id] = ld
+ }
+ return nil
+}
+
+func isPresent(pmName string, pmNames []string) bool {
+ for _, name := range pmNames {
+ if name == pmName {
+ return true
+ }
+ }
+ return false
+}
+
+func verifyDevicePMs(t *testing.T, stub voltha.VolthaServiceClient, device *voltha.Device, allPmNames []string, disabledPmNames []string, frequency uint32) {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+ pmConfigs, err := stub.ListDevicePmConfigs(ctx, &common.ID{Id: device.Id})
+ assert.Nil(t, err)
+ assert.Equal(t, device.Id, pmConfigs.Id)
+ assert.Equal(t, uint32(frequency), pmConfigs.DefaultFreq)
+ assert.Equal(t, false, pmConfigs.FreqOverride)
+ assert.Equal(t, false, pmConfigs.Grouped)
+ assert.Nil(t, pmConfigs.Groups)
+ assert.True(t, len(pmConfigs.Metrics) > 0)
+ metrics := pmConfigs.Metrics
+ for _, m := range metrics {
+ if m.Enabled {
+ assert.True(t, isPresent(m.Name, allPmNames))
+ } else {
+ assert.True(t, isPresent(m.Name, disabledPmNames))
+ }
+ assert.Equal(t, voltha.PmConfig_COUNTER, m.Type)
+ }
+}
+
+func verifyInitialPmConfigs(t *testing.T, stub voltha.VolthaServiceClient) {
+ fmt.Println("Verifying initial PM configs")
+ err := refreshLocalDeviceCache(stub)
+ assert.Nil(t, err)
+ for _, d := range allDevices {
+ if d.Root {
+ verifyDevicePMs(t, stub, d, parentPmNames, []string{}, 150)
+ } else {
+ verifyDevicePMs(t, stub, d, childPmNames, []string{}, 150)
+ }
+ }
+}
+
+func verifyPmFrequencyUpdate(t *testing.T, stub voltha.VolthaServiceClient, device *voltha.Device) {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+ pmConfigs, err := stub.ListDevicePmConfigs(ctx, &common.ID{Id: device.Id})
+ assert.Nil(t, err)
+ pmConfigs.DefaultFreq = 10
+ _, err = stub.UpdateDevicePmConfigs(ctx, pmConfigs)
+ assert.Nil(t, err)
+ if device.Root {
+ verifyDevicePMs(t, stub, device, parentPmNames, []string{}, 10)
+ } else {
+ verifyDevicePMs(t, stub, device, childPmNames, []string{}, 10)
+ }
+}
+
+func updatePmFrequencies(t *testing.T, stub voltha.VolthaServiceClient) {
+ fmt.Println("Verifying update to PMs frequencies")
+ err := refreshLocalDeviceCache(stub)
+ assert.Nil(t, err)
+ for _, d := range allDevices {
+ verifyPmFrequencyUpdate(t, stub, d)
+ }
+}
+
+func verifyDisablingSomePmMetrics(t *testing.T, stub voltha.VolthaServiceClient, device *voltha.Device) {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+ pmConfigs, err := stub.ListDevicePmConfigs(ctx, &common.ID{Id: device.Id})
+ assert.Nil(t, err)
+ metricsToDisable := []string{"tx_64_pkts", "rx_64_pkts", "tx_65_127_pkts", "rx_65_127_pkts"}
+ for _, m := range pmConfigs.Metrics {
+ if isPresent(m.Name, metricsToDisable) {
+ m.Enabled = false
+ }
+ }
+ _, err = stub.UpdateDevicePmConfigs(ctx, pmConfigs)
+ assert.Nil(t, err)
+ if device.Root {
+ verifyDevicePMs(t, stub, device, parentPmNames, metricsToDisable, 10)
+ } else {
+ verifyDevicePMs(t, stub, device, childPmNames, metricsToDisable, 10)
+ }
+}
+
+func disableSomePmMetrics(t *testing.T, stub voltha.VolthaServiceClient) {
+ fmt.Println("Verifying disabling of some PMs")
+ err := refreshLocalDeviceCache(stub)
+ assert.Nil(t, err)
+ for _, d := range allDevices {
+ verifyDisablingSomePmMetrics(t, stub, d)
+ }
+}
+
+func createAndEnableDevices(t *testing.T) {
+ err := tu.SetAllLogLevel(stub, voltha.Logging{Level: common.LogLevel_WARNING})
+ assert.Nil(t, err)
+
+ err = tu.SetLogLevel(stub, voltha.Logging{Level: common.LogLevel_DEBUG, PackageName: "github.com/opencord/voltha-go/rw_core/core"})
+ assert.Nil(t, err)
+
+ startTime := time.Now()
+
+ //Pre-provision the parent device
+ oltDevice, err := tu.PreProvisionDevice(stub)
+ assert.Nil(t, err)
+
+ fmt.Println("Creation of ", NUM_OLTS, " OLT devices took:", time.Since(startTime))
+
+ startTime = time.Now()
+
+ //Enable all parent device - this will enable the child devices as well as validate the child devices
+ err = tu.EnableDevice(stub, oltDevice, NUM_ONUS_PER_OLT)
+ assert.Nil(t, err)
+
+ fmt.Println("Enabling of OLT device took:", time.Since(startTime))
+
+ // Wait until the core and adapters sync up after an enabled
+ time.Sleep(time.Duration(math.Max(10, float64(NUM_OLTS*NUM_ONUS_PER_OLT)/2)) * time.Second)
+
+ err = tu.VerifyDevices(stub, NUM_ONUS_PER_OLT)
+ assert.Nil(t, err)
+
+ lds, err := tu.VerifyLogicalDevices(stub, oltDevice, NUM_ONUS_PER_OLT)
+ assert.Nil(t, err)
+ assert.Equal(t, 1, len(lds.Items))
+}
+
+func TestPerformanceMetrics(t *testing.T) {
+ //1. Test creation and activation of the devices. This will validate the devices as well as the logical device created/
+ createAndEnableDevices(t)
+
+ // 2. Test initial PMs on each device
+ verifyInitialPmConfigs(t, stub)
+
+ // 3. Test frequency update of the pmConfigs
+ updatePmFrequencies(t, stub)
+
+ // 4. Test disable some PM metrics
+ disableSomePmMetrics(t, stub)
+}
+
+func TestMain(m *testing.M) {
+ setup()
+ code := m.Run()
+ shutdown()
+ os.Exit(code)
+}