[VOL-1800] Implement Performance configuration in Voltha Core.
This is a port of the exisiting voltha 1.x funtionality into
the Voltha 2.0 Core.
Change-Id: I87bf8836fd392c1c7f4a2c45e85323d1cbe0079f
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 23b2073..a606a75 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -472,9 +472,24 @@
return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) UpdatePmConfig(device voltha.Device, pmConfigs voltha.PmConfigs) error {
- log.Debug("UpdatePmConfig")
- return nil
+func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
+ log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id})
+ toTopic := ap.getAdapterTopic(device.Adapter)
+ rpc := "Update_pm_config"
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "pm_configs",
+ Value: pmConfigs,
+ }
+
+ replyToTopic := ap.getCoreTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ log.Debugw("UpdatePmConfigs-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
}
func (ap *AdapterProxy) ReceivePacketOut(deviceId voltha.ID, egressPortNo int, msg interface{}) error {
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 0d37255..1a00db8 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -984,13 +984,12 @@
}
func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 3 {
+ if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
pmConfigs := &voltha.PmConfigs{}
- init := &ic.BoolType{}
transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
@@ -999,11 +998,6 @@
log.Warnw("cannot-unmarshal-pm-config", log.Fields{"error": err})
return nil, err
}
- case "init":
- if err := ptypes.UnmarshalAny(arg.Value, init); err != nil {
- log.Warnw("cannot-unmarshal-boolean", log.Fields{"error": err})
- return nil, err
- }
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
@@ -1012,7 +1006,7 @@
}
}
log.Debugw("DevicePMConfigUpdate", log.Fields{"deviceId": pmConfigs.Id, "configs": pmConfigs,
- "init": init, "transactionID": transactionID.Val})
+ "transactionID": transactionID.Val})
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
@@ -1029,10 +1023,7 @@
return nil, nil
}
- go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
- //if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
- // return nil, err
- //}
+ go rhp.deviceMgr.initPmConfigs(pmConfigs.Id, pmConfigs)
return new(empty.Empty), nil
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 0198254..a202c82 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -644,6 +644,66 @@
return nil
}
+func (agent *DeviceAgent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("updatePmConfigs", log.Fields{"id": pmConfigs.Id})
+ // Work only on latest data
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // clone the device
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+ // Store the device
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ // Send the request to the adapter
+ if err := agent.adapterProxy.UpdatePmConfigs(ctx, cloned, pmConfigs); err != nil {
+ log.Errorw("update-pm-configs-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
+ }
+ return nil
+ }
+}
+
+func (agent *DeviceAgent) initPmConfigs(pmConfigs *voltha.PmConfigs) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("initPmConfigs", log.Fields{"id": pmConfigs.Id})
+ // Work only on latest data
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // clone the device
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+ // Store the device
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ return nil
+ }
+}
+
+func (agent *DeviceAgent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
+ agent.lockDevice.RLock()
+ defer agent.lockDevice.RUnlock()
+ log.Debugw("listPmConfigs", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ cloned := proto.Clone(device).(*voltha.Device)
+ return cloned.PmConfigs, nil
+ }
+}
+
func (agent *DeviceAgent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -1131,27 +1191,6 @@
}
}
-func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debug("updatePmConfigs")
- // Work only on latest data
- if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
- return status.Errorf(codes.NotFound, "%s", agent.deviceId)
- } else {
- // clone the device
- cloned := proto.Clone(storeDevice).(*voltha.Device)
- cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
- // Store the device
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
- return nil
- }
-}
-
func (agent *DeviceAgent) addPort(port *voltha.Port) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 257b707..f92645b 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -596,13 +596,38 @@
return status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) updatePmConfigs(deviceId string, pmConfigs *voltha.PmConfigs) error {
+// updatePmConfigs updates the PM configs. This is executed when the northbound gRPC API is invoked, typically
+// following a user action
+func (dMgr *DeviceManager) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs, ch chan interface{}) {
+ var res interface{}
+ if pmConfigs.Id == "" {
+ res = status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
+ } else if agent := dMgr.getDeviceAgent(pmConfigs.Id); agent != nil {
+ res = agent.updatePmConfigs(ctx, pmConfigs)
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", pmConfigs.Id)
+ }
+ sendResponse(ctx, ch, res)
+}
+
+// initPmConfigs initialize the pm configs as defined by the adapter.
+func (dMgr *DeviceManager) initPmConfigs(deviceId string, pmConfigs *voltha.PmConfigs) error {
+ if pmConfigs.Id == "" {
+ return status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
+ }
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.updatePmConfigs(pmConfigs)
+ return agent.initPmConfigs(pmConfigs)
}
return status.Errorf(codes.NotFound, "%s", deviceId)
}
+func (dMgr *DeviceManager) listPmConfigs(ctx context.Context, deviceId string) (*voltha.PmConfigs, error) {
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.listPmConfigs(ctx)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*ic.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
@@ -617,7 +642,6 @@
return agent.getPorts(ctx, portType), nil
}
return nil, status.Errorf(codes.NotFound, "%s", deviceId)
-
}
func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*ic.PortCapability, error) {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index d5e0305..8180ecd 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -725,7 +725,30 @@
out := new(empty.Empty)
return out, nil
}
- return nil, errors.New("UnImplemented")
+ if handler.competeForTransaction() {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: configs.Id}); err != nil {
+ return new(empty.Empty), err
+ } else {
+ defer txn.Close()
+ }
+ }
+
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.updatePmConfigs(ctx, configs, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
+}
+
+func (handler *APIHandler) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+ log.Debugw("ListDevicePmConfigs-request", log.Fields{"deviceId": *id})
+ if handler.competeForTransaction() {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+ return &voltha.PmConfigs{}, err
+ } else {
+ defer txn.Close()
+ }
+ }
+ return handler.deviceMgr.listPmConfigs(ctx, id.Id)
}
func (handler *APIHandler) CreateAlarmFilter(ctx context.Context, filter *voltha.AlarmFilter) (*voltha.AlarmFilter, error) {