[VOL-1800] Implement Performance configuration in Voltha Core.

This is a port of the exisiting voltha 1.x funtionality into
the Voltha 2.0 Core.

Change-Id: I87bf8836fd392c1c7f4a2c45e85323d1cbe0079f
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index 8371e09..bb03c22 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -499,3 +499,21 @@
 	log.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
+
+func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+	log.Debugw("DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
+	rpc := "DevicePMConfigUpdate"
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	toTopic := ap.getCoreTopic(pmConfigs.Id)
+	replyToTopic := ap.getAdapterTopic()
+
+	args := make([]*kafka.KVArg, 1)
+	args[0] = &kafka.KVArg{
+		Key:   "device_pm_config",
+		Value: pmConfigs,
+	}
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
+	log.Debugw("DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
+	return unPackResponse(rpc, pmConfigs.Id, success, result)
+}
diff --git a/adapters/common/performance_metrics.go b/adapters/common/performance_metrics.go
new file mode 100644
index 0000000..8f74439
--- /dev/null
+++ b/adapters/common/performance_metrics.go
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+	"github.com/opencord/voltha-protos/go/voltha"
+)
+
+type PmMetrics struct {
+	deviceId          string
+	frequency         uint32
+	grouped           bool
+	frequencyOverride bool
+	metrics           map[string]*voltha.PmConfig
+}
+
+type PmMetricsOption func(*PmMetrics)
+
+func Frequency(frequency uint32) PmMetricsOption {
+	return func(args *PmMetrics) {
+		args.frequency = frequency
+	}
+}
+
+func Grouped(grouped bool) PmMetricsOption {
+	return func(args *PmMetrics) {
+		args.grouped = grouped
+	}
+}
+
+func FrequencyOverride(frequencyOverride bool) PmMetricsOption {
+	return func(args *PmMetrics) {
+		args.frequencyOverride = frequencyOverride
+	}
+}
+
+func Metrics(pmNames []string) PmMetricsOption {
+	return func(args *PmMetrics) {
+		args.metrics = make(map[string]*voltha.PmConfig)
+		for _, name := range pmNames {
+			args.metrics[name] = &voltha.PmConfig{
+				Name:    name,
+				Type:    voltha.PmConfig_COUNTER,
+				Enabled: true,
+			}
+		}
+	}
+}
+
+func NewPmMetrics(deviceId string, opts ...PmMetricsOption) *PmMetrics {
+	pm := &PmMetrics{deviceId: deviceId}
+	for _, option := range opts {
+		option(pm)
+	}
+	return pm
+}
+
+func (pm *PmMetrics) ToPmConfigs() *voltha.PmConfigs {
+	pmConfigs := &voltha.PmConfigs{
+		Id:           pm.deviceId,
+		DefaultFreq:  pm.frequency,
+		Grouped:      pm.grouped,
+		FreqOverride: pm.frequencyOverride,
+	}
+	for _, v := range pm.metrics {
+		pmConfigs.Metrics = append(pmConfigs.Metrics, &voltha.PmConfig{Name: v.Name, Type: v.Type, Enabled: v.Enabled})
+	}
+	return pmConfigs
+}
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index 7ce4414..45d8b72 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -256,7 +256,7 @@
 	}
 	//Update the core reference for that device
 	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
-	//Invoke the Disable_device API on the adapter
+	//Invoke the delete_device API on the adapter
 	if err := rhp.adapter.Delete_device(device); err != nil {
 		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
 	}
@@ -303,7 +303,7 @@
 		}
 	}
 	log.Debugw("Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
-	//Invoke the adopt device on the adapter
+	//Invoke the bulk flow update API of the adapter
 	if err := rhp.adapter.Update_flows_bulk(device, flows, groups); err != nil {
 		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
 	}
@@ -346,7 +346,7 @@
 		}
 	}
 	log.Debugw("Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
-	//Invoke the adopt device on the adapter
+	//Invoke the incremental flow update API of the adapter
 	if err := rhp.adapter.Update_flows_incrementally(device, flows, groups); err != nil {
 		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
 	}
@@ -354,6 +354,39 @@
 }
 
 func (rhp *RequestHandlerProxy) Update_pm_config(args []*ic.Argument) (*empty.Empty, error) {
+	log.Debug("Update_pm_config")
+	if len(args) < 2 {
+		log.Warn("Update_pm_config-invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+	device := &voltha.Device{}
+	transactionID := &ic.StrType{}
+	pmConfigs := &voltha.PmConfigs{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device":
+			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+				log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+				return nil, err
+			}
+		case "pm_configs":
+			if err := ptypes.UnmarshalAny(arg.Value, pmConfigs); err != nil {
+				log.Warnw("cannot-unmarshal-pm-configs", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
+	}
+	log.Debugw("Update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+	//Invoke the pm config update API of the adapter
+	if err := rhp.adapter.Update_pm_config(device, pmConfigs); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+	}
 	return new(empty.Empty), nil
 }
 
diff --git a/adapters/simulated_olt/adaptercore/device_handler.go b/adapters/simulated_olt/adaptercore/device_handler.go
index 2c1abbf..dd9b2b6 100644
--- a/adapters/simulated_olt/adaptercore/device_handler.go
+++ b/adapters/simulated_olt/adaptercore/device_handler.go
@@ -29,6 +29,24 @@
 	"sync"
 )
 
+// A set of pm names to create the initial pm config.  This is used only for testing in this simulated adapter
+var pmNames = []string{
+	"tx_64_pkts",
+	"tx_65_127_pkts",
+	"tx_128_255_pkts",
+	"tx_256_511_pkts",
+	"tx_512_1023_pkts",
+	"tx_1024_1518_pkts",
+	"tx_1519_9k_pkts",
+	"rx_64_pkts",
+	"rx_65_127_pkts",
+	"rx_128_255_pkts",
+	"rx_256_511_pkts",
+	"rx_512_1023_pkts",
+	"rx_1024_1518_pkts",
+	"rx_1519_9k_pkts",
+}
+
 //DeviceHandler follows the same patterns as ponsim_olt.  The only difference is that it does not
 // interact with an OLT device.
 type DeviceHandler struct {
@@ -41,6 +59,7 @@
 	ponPort      *voltha.Port
 	exitChannel  chan int
 	lockDevice   sync.RWMutex
+	metrics      *com.PmMetrics
 }
 
 //NewDeviceHandler creates a new device handler
@@ -54,6 +73,14 @@
 	dh.simulatedOLT = adapter
 	dh.exitChannel = make(chan int, 1)
 	dh.lockDevice = sync.RWMutex{}
+	// Set up PON metrics
+	dh.metrics = com.NewPmMetrics(
+		cloned.Id,
+		com.Frequency(150),
+		com.Grouped(false),
+		com.FrequencyOverride(false),
+		com.Metrics(pmNames),
+	)
 	return &dh
 }
 
@@ -105,6 +132,11 @@
 		log.Errorw("error-updating-device", log.Fields{"deviceId": device.Id, "error": err})
 	}
 
+	// Now, set the initial PM configuration for that device
+	if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
+		log.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
+	}
+
 	//	Now create the NNI Port
 	dh.nniPort = &voltha.Port{
 		PortNo:     2,
@@ -278,3 +310,9 @@
 	// For now we do nothing with it
 	return
 }
+
+func (dh *DeviceHandler) UpdatePmConfigs(device *voltha.Device, pmConfigs *voltha.PmConfigs) {
+	log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+	// For now we do nothing with it
+	return
+}
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index 4349a1b..8b6d47e 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -241,8 +241,17 @@
 	return nil
 }
 
-func (so *SimulatedOLT) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
-	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Update_pm_config(device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Debugw("update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler != nil {
+		go handler.UpdatePmConfigs(device, pmConfigs)
+	}
+	return nil
 }
 
 func (so *SimulatedOLT) Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error {
diff --git a/adapters/simulated_onu/adaptercore/device_handler.go b/adapters/simulated_onu/adaptercore/device_handler.go
index 61b1c0b..908fdd5 100644
--- a/adapters/simulated_onu/adaptercore/device_handler.go
+++ b/adapters/simulated_onu/adaptercore/device_handler.go
@@ -30,6 +30,21 @@
 	"time"
 )
 
+// A set of pm names to create the initial pm config.  This is used only for testing in this simulated adapter
+var pmNames = []string{
+	"tx_64_pkts",
+	"tx_65_127_pkts",
+	"tx_128_255_pkts",
+	"tx_1024_1518_pkts",
+	"tx_1519_9k_pkts",
+	"rx_64_pkts",
+	"rx_64_pkts",
+	"rx_65_127_pkts",
+	"rx_128_255_pkts",
+	"rx_1024_1518_pkts",
+	"rx_1519_9k_pkts",
+}
+
 //DeviceHandler follows the same patterns as ponsim_olt.  The only difference is that it does not
 // interact with an OLT device.
 type DeviceHandler struct {
@@ -42,6 +57,7 @@
 	ponPort      *voltha.Port
 	exitChannel  chan int
 	lockDevice   sync.RWMutex
+	metrics      *com.PmMetrics
 }
 
 //NewDeviceHandler creates a new device handler
@@ -55,6 +71,13 @@
 	dh.simulatedOLT = adapter
 	dh.exitChannel = make(chan int, 1)
 	dh.lockDevice = sync.RWMutex{}
+	dh.metrics = com.NewPmMetrics(
+		cloned.Id,
+		com.Frequency(150),
+		com.Grouped(false),
+		com.FrequencyOverride(false),
+		com.Metrics(pmNames),
+	)
 	return &dh
 }
 
@@ -113,6 +136,11 @@
 		log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
 	}
 
+	// Now, set the initial PM configuration for that device
+	if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
+		log.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
+	}
+
 	// Sleep to mimic the omci management channel creation with the OLT
 	time.Sleep(10 * time.Millisecond)
 
@@ -268,3 +296,9 @@
 	// For now we do nothing with it
 	return
 }
+
+func (dh *DeviceHandler) UpdatePmConfigs(device *voltha.Device, pmConfigs *voltha.PmConfigs) {
+	log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+	// For now we do nothing with it
+	return
+}
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index d8d4127..02448f5 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -233,8 +233,17 @@
 	return nil
 }
 
-func (so *SimulatedONU) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
-	return errors.New("UnImplemented")
+func (so *SimulatedONU) Update_pm_config(device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Debugw("update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler != nil {
+		go handler.UpdatePmConfigs(device, pmConfigs)
+	}
+	return nil
 }
 
 func (so *SimulatedONU) Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error {
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 23b2073..a606a75 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -472,9 +472,24 @@
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
-func (ap *AdapterProxy) UpdatePmConfig(device voltha.Device, pmConfigs voltha.PmConfigs) error {
-	log.Debug("UpdatePmConfig")
-	return nil
+func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
+	log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id})
+	toTopic := ap.getAdapterTopic(device.Adapter)
+	rpc := "Update_pm_config"
+	args := make([]*kafka.KVArg, 2)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "pm_configs",
+		Value: pmConfigs,
+	}
+
+	replyToTopic := ap.getCoreTopic()
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	log.Debugw("UpdatePmConfigs-response", log.Fields{"deviceid": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) ReceivePacketOut(deviceId voltha.ID, egressPortNo int, msg interface{}) error {
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 0d37255..1a00db8 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -984,13 +984,12 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 3 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	pmConfigs := &voltha.PmConfigs{}
-	init := &ic.BoolType{}
 	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
@@ -999,11 +998,6 @@
 				log.Warnw("cannot-unmarshal-pm-config", log.Fields{"error": err})
 				return nil, err
 			}
-		case "init":
-			if err := ptypes.UnmarshalAny(arg.Value, init); err != nil {
-				log.Warnw("cannot-unmarshal-boolean", log.Fields{"error": err})
-				return nil, err
-			}
 		case kafka.TransactionKey:
 			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
 				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
@@ -1012,7 +1006,7 @@
 		}
 	}
 	log.Debugw("DevicePMConfigUpdate", log.Fields{"deviceId": pmConfigs.Id, "configs": pmConfigs,
-		"init": init, "transactionID": transactionID.Val})
+		"transactionID": transactionID.Val})
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
@@ -1029,10 +1023,7 @@
 		return nil, nil
 	}
 
-	go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
-	//if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
-	//	return nil, err
-	//}
+	go rhp.deviceMgr.initPmConfigs(pmConfigs.Id, pmConfigs)
 
 	return new(empty.Empty), nil
 }
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 0198254..a202c82 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -644,6 +644,66 @@
 	return nil
 }
 
+func (agent *DeviceAgent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("updatePmConfigs", log.Fields{"id": pmConfigs.Id})
+	// Work only on latest data
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		// clone the device
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+		// Store the device
+		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
+		if afterUpdate == nil {
+			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		}
+		// Send the request to the adapter
+		if err := agent.adapterProxy.UpdatePmConfigs(ctx, cloned, pmConfigs); err != nil {
+			log.Errorw("update-pm-configs-error", log.Fields{"id": agent.lastData.Id, "error": err})
+			return err
+		}
+		return nil
+	}
+}
+
+func (agent *DeviceAgent) initPmConfigs(pmConfigs *voltha.PmConfigs) error {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("initPmConfigs", log.Fields{"id": pmConfigs.Id})
+	// Work only on latest data
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		// clone the device
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+		// Store the device
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
+		if afterUpdate == nil {
+			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		}
+		return nil
+	}
+}
+
+func (agent *DeviceAgent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
+	agent.lockDevice.RLock()
+	defer agent.lockDevice.RUnlock()
+	log.Debugw("listPmConfigs", log.Fields{"id": agent.deviceId})
+	// Get the most up to date the device info
+	if device, err := agent.getDeviceWithoutLock(); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		cloned := proto.Clone(device).(*voltha.Device)
+		return cloned.PmConfigs, nil
+	}
+}
+
 func (agent *DeviceAgent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
@@ -1131,27 +1191,6 @@
 	}
 }
 
-func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
-	agent.lockDevice.Lock()
-	defer agent.lockDevice.Unlock()
-	log.Debug("updatePmConfigs")
-	// Work only on latest data
-	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
-		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
-	} else {
-		// clone the device
-		cloned := proto.Clone(storeDevice).(*voltha.Device)
-		cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
-		// Store the device
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-		return nil
-	}
-}
-
 func (agent *DeviceAgent) addPort(port *voltha.Port) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 257b707..f92645b 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -596,13 +596,38 @@
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-func (dMgr *DeviceManager) updatePmConfigs(deviceId string, pmConfigs *voltha.PmConfigs) error {
+// updatePmConfigs updates the PM configs.  This is executed when the northbound gRPC API is invoked, typically
+// following a user action
+func (dMgr *DeviceManager) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs, ch chan interface{}) {
+	var res interface{}
+	if pmConfigs.Id == "" {
+		res = status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
+	} else if agent := dMgr.getDeviceAgent(pmConfigs.Id); agent != nil {
+		res = agent.updatePmConfigs(ctx, pmConfigs)
+	} else {
+		res = status.Errorf(codes.NotFound, "%s", pmConfigs.Id)
+	}
+	sendResponse(ctx, ch, res)
+}
+
+// initPmConfigs initialize the pm configs as defined by the adapter.
+func (dMgr *DeviceManager) initPmConfigs(deviceId string, pmConfigs *voltha.PmConfigs) error {
+	if pmConfigs.Id == "" {
+		return status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
+	}
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
-		return agent.updatePmConfigs(pmConfigs)
+		return agent.initPmConfigs(pmConfigs)
 	}
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
+func (dMgr *DeviceManager) listPmConfigs(ctx context.Context, deviceId string) (*voltha.PmConfigs, error) {
+	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+		return agent.listPmConfigs(ctx)
+	}
+	return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
 func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*ic.SwitchCapability, error) {
 	log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
@@ -617,7 +642,6 @@
 		return agent.getPorts(ctx, portType), nil
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", deviceId)
-
 }
 
 func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*ic.PortCapability, error) {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index d5e0305..8180ecd 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -725,7 +725,30 @@
 		out := new(empty.Empty)
 		return out, nil
 	}
-	return nil, errors.New("UnImplemented")
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: configs.Id}); err != nil {
+			return new(empty.Empty), err
+		} else {
+			defer txn.Close()
+		}
+	}
+
+	ch := make(chan interface{})
+	defer close(ch)
+	go handler.deviceMgr.updatePmConfigs(ctx, configs, ch)
+	return waitForNilResponseOnSuccess(ctx, ch)
+}
+
+func (handler *APIHandler) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+	log.Debugw("ListDevicePmConfigs-request", log.Fields{"deviceId": *id})
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &voltha.PmConfigs{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
+	return handler.deviceMgr.listPmConfigs(ctx, id.Id)
 }
 
 func (handler *APIHandler) CreateAlarmFilter(ctx context.Context, filter *voltha.AlarmFilter) (*voltha.AlarmFilter, error) {
diff --git a/tests/core/performance_metrics_test.go b/tests/core/performance_metrics_test.go
new file mode 100644
index 0000000..9e074f5
--- /dev/null
+++ b/tests/core/performance_metrics_test.go
@@ -0,0 +1,313 @@
+// +build integration
+
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+	"context"
+	"fmt"
+	"github.com/google/uuid"
+	"github.com/opencord/voltha-go/common/log"
+	tu "github.com/opencord/voltha-go/tests/utils"
+	"github.com/opencord/voltha-protos/go/common"
+	"github.com/opencord/voltha-protos/go/voltha"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc/metadata"
+	"math"
+	"os"
+	"testing"
+	"time"
+)
+
+var stub voltha.VolthaServiceClient
+var volthaSerialNumberKey string
+
+/*
+ This local "integration" test uses one RW-Core, one simulated_olt and one simulated_onu adapter to test performance
+metrics, in a development environment. It uses docker-compose to set up the local environment. However, it can
+easily be extended to run in k8s environment.
+
+The compose files used are located under %GOPATH/src/github.com/opencord/voltha-go/compose. If the GOPATH is not set
+then you can specify the location of the compose files by using COMPOSE_PATH to set the compose files location.
+
+To run this test: DOCKER_HOST_IP=<local IP> go test -v
+
+*/
+
+var allDevices map[string]*voltha.Device
+var allLogicalDevices map[string]*voltha.LogicalDevice
+
+var composePath string
+
+const (
+	GRPC_PORT        = 50057
+	NUM_OLTS         = 1
+	NUM_ONUS_PER_OLT = 4 // This should coincide with the number of onus per olt in adapters-simulated.yml file
+)
+
+var parentPmNames = []string{
+	"tx_64_pkts",
+	"tx_65_127_pkts",
+	"tx_128_255_pkts",
+	"tx_256_511_pkts",
+	"tx_512_1023_pkts",
+	"tx_1024_1518_pkts",
+	"tx_1519_9k_pkts",
+	"rx_64_pkts",
+	"rx_65_127_pkts",
+	"rx_128_255_pkts",
+	"rx_256_511_pkts",
+	"rx_512_1023_pkts",
+	"rx_1024_1518_pkts",
+	"rx_1519_9k_pkts",
+}
+
+var childPmNames = []string{
+	"tx_64_pkts",
+	"tx_65_127_pkts",
+	"tx_128_255_pkts",
+	"tx_1024_1518_pkts",
+	"tx_1519_9k_pkts",
+	"rx_64_pkts",
+	"rx_64_pkts",
+	"rx_65_127_pkts",
+	"rx_128_255_pkts",
+	"rx_1024_1518_pkts",
+	"rx_1519_9k_pkts",
+}
+
+func setup() {
+	var err error
+
+	if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+	log.SetAllLogLevel(log.ErrorLevel)
+
+	volthaSerialNumberKey = "voltha_serial_number"
+	allDevices = make(map[string]*voltha.Device)
+	allLogicalDevices = make(map[string]*voltha.LogicalDevice)
+
+	grpcHostIP := os.Getenv("DOCKER_HOST_IP")
+	goPath := os.Getenv("GOPATH")
+	if goPath != "" {
+		composePath = fmt.Sprintf("%s/src/github.com/opencord/voltha-go/compose", goPath)
+	} else {
+		composePath = os.Getenv("COMPOSE_PATH")
+	}
+
+	fmt.Println("Using compose path:", composePath)
+
+	//Start the simulated environment
+	if err = tu.StartSimulatedEnv(composePath); err != nil {
+		fmt.Println("Failure starting simulated environment:", err)
+		os.Exit(10)
+	}
+
+	stub, err = tu.SetupGrpcConnectionToCore(grpcHostIP, GRPC_PORT)
+	if err != nil {
+		fmt.Println("Failure connecting to Voltha Core:", err)
+		os.Exit(11)
+	}
+
+	// Wait for the simulated devices to be registered in the Voltha Core
+	adapters := []string{"simulated_olt", "simulated_onu"}
+	if _, err = tu.WaitForAdapterRegistration(stub, adapters, 40); err != nil {
+		fmt.Println("Failure retrieving adapters:", err)
+		os.Exit(12)
+	}
+}
+
+func shutdown() {
+	err := tu.StopSimulatedEnv(composePath)
+	if err != nil {
+		fmt.Println("Failure stop simulated environment:", err)
+	}
+}
+
+func refreshLocalDeviceCache(stub voltha.VolthaServiceClient) error {
+	retrievedDevices, err := tu.ListDevices(stub)
+	if err != nil {
+		return err
+	}
+	for _, d := range retrievedDevices.Items {
+		allDevices[d.Id] = d
+	}
+
+	retrievedLogicalDevices, err := tu.ListLogicalDevices(stub)
+	if err != nil {
+		return err
+	}
+
+	for _, ld := range retrievedLogicalDevices.Items {
+		allLogicalDevices[ld.Id] = ld
+	}
+	return nil
+}
+
+func isPresent(pmName string, pmNames []string) bool {
+	for _, name := range pmNames {
+		if name == pmName {
+			return true
+		}
+	}
+	return false
+}
+
+func verifyDevicePMs(t *testing.T, stub voltha.VolthaServiceClient, device *voltha.Device, allPmNames []string, disabledPmNames []string, frequency uint32) {
+	ui := uuid.New()
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+	pmConfigs, err := stub.ListDevicePmConfigs(ctx, &common.ID{Id: device.Id})
+	assert.Nil(t, err)
+	assert.Equal(t, device.Id, pmConfigs.Id)
+	assert.Equal(t, uint32(frequency), pmConfigs.DefaultFreq)
+	assert.Equal(t, false, pmConfigs.FreqOverride)
+	assert.Equal(t, false, pmConfigs.Grouped)
+	assert.Nil(t, pmConfigs.Groups)
+	assert.True(t, len(pmConfigs.Metrics) > 0)
+	metrics := pmConfigs.Metrics
+	for _, m := range metrics {
+		if m.Enabled {
+			assert.True(t, isPresent(m.Name, allPmNames))
+		} else {
+			assert.True(t, isPresent(m.Name, disabledPmNames))
+		}
+		assert.Equal(t, voltha.PmConfig_COUNTER, m.Type)
+	}
+}
+
+func verifyInitialPmConfigs(t *testing.T, stub voltha.VolthaServiceClient) {
+	fmt.Println("Verifying initial PM configs")
+	err := refreshLocalDeviceCache(stub)
+	assert.Nil(t, err)
+	for _, d := range allDevices {
+		if d.Root {
+			verifyDevicePMs(t, stub, d, parentPmNames, []string{}, 150)
+		} else {
+			verifyDevicePMs(t, stub, d, childPmNames, []string{}, 150)
+		}
+	}
+}
+
+func verifyPmFrequencyUpdate(t *testing.T, stub voltha.VolthaServiceClient, device *voltha.Device) {
+	ui := uuid.New()
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+	pmConfigs, err := stub.ListDevicePmConfigs(ctx, &common.ID{Id: device.Id})
+	assert.Nil(t, err)
+	pmConfigs.DefaultFreq = 10
+	_, err = stub.UpdateDevicePmConfigs(ctx, pmConfigs)
+	assert.Nil(t, err)
+	if device.Root {
+		verifyDevicePMs(t, stub, device, parentPmNames, []string{}, 10)
+	} else {
+		verifyDevicePMs(t, stub, device, childPmNames, []string{}, 10)
+	}
+}
+
+func updatePmFrequencies(t *testing.T, stub voltha.VolthaServiceClient) {
+	fmt.Println("Verifying update to PMs frequencies")
+	err := refreshLocalDeviceCache(stub)
+	assert.Nil(t, err)
+	for _, d := range allDevices {
+		verifyPmFrequencyUpdate(t, stub, d)
+	}
+}
+
+func verifyDisablingSomePmMetrics(t *testing.T, stub voltha.VolthaServiceClient, device *voltha.Device) {
+	ui := uuid.New()
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+	pmConfigs, err := stub.ListDevicePmConfigs(ctx, &common.ID{Id: device.Id})
+	assert.Nil(t, err)
+	metricsToDisable := []string{"tx_64_pkts", "rx_64_pkts", "tx_65_127_pkts", "rx_65_127_pkts"}
+	for _, m := range pmConfigs.Metrics {
+		if isPresent(m.Name, metricsToDisable) {
+			m.Enabled = false
+		}
+	}
+	_, err = stub.UpdateDevicePmConfigs(ctx, pmConfigs)
+	assert.Nil(t, err)
+	if device.Root {
+		verifyDevicePMs(t, stub, device, parentPmNames, metricsToDisable, 10)
+	} else {
+		verifyDevicePMs(t, stub, device, childPmNames, metricsToDisable, 10)
+	}
+}
+
+func disableSomePmMetrics(t *testing.T, stub voltha.VolthaServiceClient) {
+	fmt.Println("Verifying disabling of some PMs")
+	err := refreshLocalDeviceCache(stub)
+	assert.Nil(t, err)
+	for _, d := range allDevices {
+		verifyDisablingSomePmMetrics(t, stub, d)
+	}
+}
+
+func createAndEnableDevices(t *testing.T) {
+	err := tu.SetAllLogLevel(stub, voltha.Logging{Level: common.LogLevel_WARNING})
+	assert.Nil(t, err)
+
+	err = tu.SetLogLevel(stub, voltha.Logging{Level: common.LogLevel_DEBUG, PackageName: "github.com/opencord/voltha-go/rw_core/core"})
+	assert.Nil(t, err)
+
+	startTime := time.Now()
+
+	//Pre-provision the parent device
+	oltDevice, err := tu.PreProvisionDevice(stub)
+	assert.Nil(t, err)
+
+	fmt.Println("Creation of ", NUM_OLTS, " OLT devices took:", time.Since(startTime))
+
+	startTime = time.Now()
+
+	//Enable all parent device - this will enable the child devices as well as validate the child devices
+	err = tu.EnableDevice(stub, oltDevice, NUM_ONUS_PER_OLT)
+	assert.Nil(t, err)
+
+	fmt.Println("Enabling of  OLT device took:", time.Since(startTime))
+
+	// Wait until the core and adapters sync up after an enabled
+	time.Sleep(time.Duration(math.Max(10, float64(NUM_OLTS*NUM_ONUS_PER_OLT)/2)) * time.Second)
+
+	err = tu.VerifyDevices(stub, NUM_ONUS_PER_OLT)
+	assert.Nil(t, err)
+
+	lds, err := tu.VerifyLogicalDevices(stub, oltDevice, NUM_ONUS_PER_OLT)
+	assert.Nil(t, err)
+	assert.Equal(t, 1, len(lds.Items))
+}
+
+func TestPerformanceMetrics(t *testing.T) {
+	//1. Test creation and activation of the devices.  This will validate the devices as well as the logical device created/
+	createAndEnableDevices(t)
+
+	//	2. Test initial PMs on each device
+	verifyInitialPmConfigs(t, stub)
+
+	//	3. Test frequency update of the pmConfigs
+	updatePmFrequencies(t, stub)
+
+	//	4. Test disable some PM metrics
+	disableSomePmMetrics(t, stub)
+}
+
+func TestMain(m *testing.M) {
+	setup()
+	code := m.Run()
+	shutdown()
+	os.Exit(code)
+}