[VOL-1800] Implement Performance configuration in Voltha Core.

This is a port of the exisiting voltha 1.x funtionality into
the Voltha 2.0 Core.

Change-Id: I87bf8836fd392c1c7f4a2c45e85323d1cbe0079f
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 23b2073..a606a75 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -472,9 +472,24 @@
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
-func (ap *AdapterProxy) UpdatePmConfig(device voltha.Device, pmConfigs voltha.PmConfigs) error {
-	log.Debug("UpdatePmConfig")
-	return nil
+func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
+	log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id})
+	toTopic := ap.getAdapterTopic(device.Adapter)
+	rpc := "Update_pm_config"
+	args := make([]*kafka.KVArg, 2)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "pm_configs",
+		Value: pmConfigs,
+	}
+
+	replyToTopic := ap.getCoreTopic()
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	log.Debugw("UpdatePmConfigs-response", log.Fields{"deviceid": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) ReceivePacketOut(deviceId voltha.ID, egressPortNo int, msg interface{}) error {
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 0d37255..1a00db8 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -984,13 +984,12 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 3 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	pmConfigs := &voltha.PmConfigs{}
-	init := &ic.BoolType{}
 	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
@@ -999,11 +998,6 @@
 				log.Warnw("cannot-unmarshal-pm-config", log.Fields{"error": err})
 				return nil, err
 			}
-		case "init":
-			if err := ptypes.UnmarshalAny(arg.Value, init); err != nil {
-				log.Warnw("cannot-unmarshal-boolean", log.Fields{"error": err})
-				return nil, err
-			}
 		case kafka.TransactionKey:
 			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
 				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
@@ -1012,7 +1006,7 @@
 		}
 	}
 	log.Debugw("DevicePMConfigUpdate", log.Fields{"deviceId": pmConfigs.Id, "configs": pmConfigs,
-		"init": init, "transactionID": transactionID.Val})
+		"transactionID": transactionID.Val})
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
@@ -1029,10 +1023,7 @@
 		return nil, nil
 	}
 
-	go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
-	//if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
-	//	return nil, err
-	//}
+	go rhp.deviceMgr.initPmConfigs(pmConfigs.Id, pmConfigs)
 
 	return new(empty.Empty), nil
 }
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 0198254..a202c82 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -644,6 +644,66 @@
 	return nil
 }
 
+func (agent *DeviceAgent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("updatePmConfigs", log.Fields{"id": pmConfigs.Id})
+	// Work only on latest data
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		// clone the device
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+		// Store the device
+		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
+		if afterUpdate == nil {
+			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		}
+		// Send the request to the adapter
+		if err := agent.adapterProxy.UpdatePmConfigs(ctx, cloned, pmConfigs); err != nil {
+			log.Errorw("update-pm-configs-error", log.Fields{"id": agent.lastData.Id, "error": err})
+			return err
+		}
+		return nil
+	}
+}
+
+func (agent *DeviceAgent) initPmConfigs(pmConfigs *voltha.PmConfigs) error {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("initPmConfigs", log.Fields{"id": pmConfigs.Id})
+	// Work only on latest data
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		// clone the device
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+		// Store the device
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
+		if afterUpdate == nil {
+			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		}
+		return nil
+	}
+}
+
+func (agent *DeviceAgent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
+	agent.lockDevice.RLock()
+	defer agent.lockDevice.RUnlock()
+	log.Debugw("listPmConfigs", log.Fields{"id": agent.deviceId})
+	// Get the most up to date the device info
+	if device, err := agent.getDeviceWithoutLock(); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		cloned := proto.Clone(device).(*voltha.Device)
+		return cloned.PmConfigs, nil
+	}
+}
+
 func (agent *DeviceAgent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
@@ -1131,27 +1191,6 @@
 	}
 }
 
-func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debug("updatePmConfigs")
-	// Work only on latest data
-	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
-		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
-	} else {
-		// clone the device
-		cloned := proto.Clone(storeDevice).(*voltha.Device)
-		cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
-		// Store the device
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-		return nil
-	}
-}
-
 func (agent *DeviceAgent) addPort(port *voltha.Port) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 257b707..f92645b 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -596,13 +596,38 @@
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-func (dMgr *DeviceManager) updatePmConfigs(deviceId string, pmConfigs *voltha.PmConfigs) error {
+// updatePmConfigs updates the PM configs.  This is executed when the northbound gRPC API is invoked, typically
+// following a user action
+func (dMgr *DeviceManager) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs, ch chan interface{}) {
+	var res interface{}
+	if pmConfigs.Id == "" {
+		res = status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
+	} else if agent := dMgr.getDeviceAgent(pmConfigs.Id); agent != nil {
+		res = agent.updatePmConfigs(ctx, pmConfigs)
+	} else {
+		res = status.Errorf(codes.NotFound, "%s", pmConfigs.Id)
+	}
+	sendResponse(ctx, ch, res)
+}
+
+// initPmConfigs initialize the pm configs as defined by the adapter.
+func (dMgr *DeviceManager) initPmConfigs(deviceId string, pmConfigs *voltha.PmConfigs) error {
+	if pmConfigs.Id == "" {
+		return status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
+	}
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
-		return agent.updatePmConfigs(pmConfigs)
+		return agent.initPmConfigs(pmConfigs)
 	}
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
+func (dMgr *DeviceManager) listPmConfigs(ctx context.Context, deviceId string) (*voltha.PmConfigs, error) {
+	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+		return agent.listPmConfigs(ctx)
+	}
+	return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
 func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*ic.SwitchCapability, error) {
 	log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
@@ -617,7 +642,6 @@
 		return agent.getPorts(ctx, portType), nil
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", deviceId)
-
 }
 
 func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*ic.PortCapability, error) {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index d5e0305..8180ecd 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -725,7 +725,30 @@
 		out := new(empty.Empty)
 		return out, nil
 	}
-	return nil, errors.New("UnImplemented")
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: configs.Id}); err != nil {
+			return new(empty.Empty), err
+		} else {
+			defer txn.Close()
+		}
+	}
+
+	ch := make(chan interface{})
+	defer close(ch)
+	go handler.deviceMgr.updatePmConfigs(ctx, configs, ch)
+	return waitForNilResponseOnSuccess(ctx, ch)
+}
+
+func (handler *APIHandler) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+	log.Debugw("ListDevicePmConfigs-request", log.Fields{"deviceId": *id})
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &voltha.PmConfigs{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
+	return handler.deviceMgr.listPmConfigs(ctx, id.Id)
 }
 
 func (handler *APIHandler) CreateAlarmFilter(ctx context.Context, filter *voltha.AlarmFilter) (*voltha.AlarmFilter, error) {