[VOL-1800] Implement Performance configuration in Voltha Core.
This is a port of the exisiting voltha 1.x funtionality into
the Voltha 2.0 Core.
Change-Id: I87bf8836fd392c1c7f4a2c45e85323d1cbe0079f
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index 8371e09..bb03c22 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -499,3 +499,21 @@
log.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
+
+func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+ log.Debugw("DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
+ rpc := "DevicePMConfigUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(pmConfigs.Id)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device_pm_config",
+ Value: pmConfigs,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
+ log.Debugw("DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
+ return unPackResponse(rpc, pmConfigs.Id, success, result)
+}
diff --git a/adapters/common/performance_metrics.go b/adapters/common/performance_metrics.go
new file mode 100644
index 0000000..8f74439
--- /dev/null
+++ b/adapters/common/performance_metrics.go
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "github.com/opencord/voltha-protos/go/voltha"
+)
+
+type PmMetrics struct {
+ deviceId string
+ frequency uint32
+ grouped bool
+ frequencyOverride bool
+ metrics map[string]*voltha.PmConfig
+}
+
+type PmMetricsOption func(*PmMetrics)
+
+func Frequency(frequency uint32) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.frequency = frequency
+ }
+}
+
+func Grouped(grouped bool) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.grouped = grouped
+ }
+}
+
+func FrequencyOverride(frequencyOverride bool) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.frequencyOverride = frequencyOverride
+ }
+}
+
+func Metrics(pmNames []string) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.metrics = make(map[string]*voltha.PmConfig)
+ for _, name := range pmNames {
+ args.metrics[name] = &voltha.PmConfig{
+ Name: name,
+ Type: voltha.PmConfig_COUNTER,
+ Enabled: true,
+ }
+ }
+ }
+}
+
+func NewPmMetrics(deviceId string, opts ...PmMetricsOption) *PmMetrics {
+ pm := &PmMetrics{deviceId: deviceId}
+ for _, option := range opts {
+ option(pm)
+ }
+ return pm
+}
+
+func (pm *PmMetrics) ToPmConfigs() *voltha.PmConfigs {
+ pmConfigs := &voltha.PmConfigs{
+ Id: pm.deviceId,
+ DefaultFreq: pm.frequency,
+ Grouped: pm.grouped,
+ FreqOverride: pm.frequencyOverride,
+ }
+ for _, v := range pm.metrics {
+ pmConfigs.Metrics = append(pmConfigs.Metrics, &voltha.PmConfig{Name: v.Name, Type: v.Type, Enabled: v.Enabled})
+ }
+ return pmConfigs
+}
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index 7ce4414..45d8b72 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -256,7 +256,7 @@
}
//Update the core reference for that device
rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
- //Invoke the Disable_device API on the adapter
+ //Invoke the delete_device API on the adapter
if err := rhp.adapter.Delete_device(device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
@@ -303,7 +303,7 @@
}
}
log.Debugw("Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
- //Invoke the adopt device on the adapter
+ //Invoke the bulk flow update API of the adapter
if err := rhp.adapter.Update_flows_bulk(device, flows, groups); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
@@ -346,7 +346,7 @@
}
}
log.Debugw("Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
- //Invoke the adopt device on the adapter
+ //Invoke the incremental flow update API of the adapter
if err := rhp.adapter.Update_flows_incrementally(device, flows, groups); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
@@ -354,6 +354,39 @@
}
func (rhp *RequestHandlerProxy) Update_pm_config(args []*ic.Argument) (*empty.Empty, error) {
+ log.Debug("Update_pm_config")
+ if len(args) < 2 {
+ log.Warn("Update_pm_config-invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ pmConfigs := &voltha.PmConfigs{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "pm_configs":
+ if err := ptypes.UnmarshalAny(arg.Value, pmConfigs); err != nil {
+ log.Warnw("cannot-unmarshal-pm-configs", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ log.Debugw("Update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ //Invoke the pm config update API of the adapter
+ if err := rhp.adapter.Update_pm_config(device, pmConfigs); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
diff --git a/adapters/simulated_olt/adaptercore/device_handler.go b/adapters/simulated_olt/adaptercore/device_handler.go
index 2c1abbf..dd9b2b6 100644
--- a/adapters/simulated_olt/adaptercore/device_handler.go
+++ b/adapters/simulated_olt/adaptercore/device_handler.go
@@ -29,6 +29,24 @@
"sync"
)
+// A set of pm names to create the initial pm config. This is used only for testing in this simulated adapter
+var pmNames = []string{
+ "tx_64_pkts",
+ "tx_65_127_pkts",
+ "tx_128_255_pkts",
+ "tx_256_511_pkts",
+ "tx_512_1023_pkts",
+ "tx_1024_1518_pkts",
+ "tx_1519_9k_pkts",
+ "rx_64_pkts",
+ "rx_65_127_pkts",
+ "rx_128_255_pkts",
+ "rx_256_511_pkts",
+ "rx_512_1023_pkts",
+ "rx_1024_1518_pkts",
+ "rx_1519_9k_pkts",
+}
+
//DeviceHandler follows the same patterns as ponsim_olt. The only difference is that it does not
// interact with an OLT device.
type DeviceHandler struct {
@@ -41,6 +59,7 @@
ponPort *voltha.Port
exitChannel chan int
lockDevice sync.RWMutex
+ metrics *com.PmMetrics
}
//NewDeviceHandler creates a new device handler
@@ -54,6 +73,14 @@
dh.simulatedOLT = adapter
dh.exitChannel = make(chan int, 1)
dh.lockDevice = sync.RWMutex{}
+ // Set up PON metrics
+ dh.metrics = com.NewPmMetrics(
+ cloned.Id,
+ com.Frequency(150),
+ com.Grouped(false),
+ com.FrequencyOverride(false),
+ com.Metrics(pmNames),
+ )
return &dh
}
@@ -105,6 +132,11 @@
log.Errorw("error-updating-device", log.Fields{"deviceId": device.Id, "error": err})
}
+ // Now, set the initial PM configuration for that device
+ if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
+ log.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
// Now create the NNI Port
dh.nniPort = &voltha.Port{
PortNo: 2,
@@ -278,3 +310,9 @@
// For now we do nothing with it
return
}
+
+func (dh *DeviceHandler) UpdatePmConfigs(device *voltha.Device, pmConfigs *voltha.PmConfigs) {
+ log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ // For now we do nothing with it
+ return
+}
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index 4349a1b..8b6d47e 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -241,8 +241,17 @@
return nil
}
-func (so *SimulatedOLT) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
- return errors.New("UnImplemented")
+func (so *SimulatedOLT) Update_pm_config(device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Debugw("update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.UpdatePmConfigs(device, pmConfigs)
+ }
+ return nil
}
func (so *SimulatedOLT) Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error {
diff --git a/adapters/simulated_onu/adaptercore/device_handler.go b/adapters/simulated_onu/adaptercore/device_handler.go
index 61b1c0b..908fdd5 100644
--- a/adapters/simulated_onu/adaptercore/device_handler.go
+++ b/adapters/simulated_onu/adaptercore/device_handler.go
@@ -30,6 +30,21 @@
"time"
)
+// A set of pm names to create the initial pm config. This is used only for testing in this simulated adapter
+var pmNames = []string{
+ "tx_64_pkts",
+ "tx_65_127_pkts",
+ "tx_128_255_pkts",
+ "tx_1024_1518_pkts",
+ "tx_1519_9k_pkts",
+ "rx_64_pkts",
+ "rx_64_pkts",
+ "rx_65_127_pkts",
+ "rx_128_255_pkts",
+ "rx_1024_1518_pkts",
+ "rx_1519_9k_pkts",
+}
+
//DeviceHandler follows the same patterns as ponsim_olt. The only difference is that it does not
// interact with an OLT device.
type DeviceHandler struct {
@@ -42,6 +57,7 @@
ponPort *voltha.Port
exitChannel chan int
lockDevice sync.RWMutex
+ metrics *com.PmMetrics
}
//NewDeviceHandler creates a new device handler
@@ -55,6 +71,13 @@
dh.simulatedOLT = adapter
dh.exitChannel = make(chan int, 1)
dh.lockDevice = sync.RWMutex{}
+ dh.metrics = com.NewPmMetrics(
+ cloned.Id,
+ com.Frequency(150),
+ com.Grouped(false),
+ com.FrequencyOverride(false),
+ com.Metrics(pmNames),
+ )
return &dh
}
@@ -113,6 +136,11 @@
log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
}
+ // Now, set the initial PM configuration for that device
+ if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
+ log.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
// Sleep to mimic the omci management channel creation with the OLT
time.Sleep(10 * time.Millisecond)
@@ -268,3 +296,9 @@
// For now we do nothing with it
return
}
+
+func (dh *DeviceHandler) UpdatePmConfigs(device *voltha.Device, pmConfigs *voltha.PmConfigs) {
+ log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ // For now we do nothing with it
+ return
+}
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index d8d4127..02448f5 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -233,8 +233,17 @@
return nil
}
-func (so *SimulatedONU) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
- return errors.New("UnImplemented")
+func (so *SimulatedONU) Update_pm_config(device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Debugw("update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.UpdatePmConfigs(device, pmConfigs)
+ }
+ return nil
}
func (so *SimulatedONU) Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error {