VOL-2920 - Remove NBI passthrough functions.

Modified the NBIHandler to reference device, logical device, and adapter managers as embedded types, allowing the managers to directly implement API functions, without the need for individual passthrough functions.
Also created a new event.Manager type, which is embedded in device.LogicalManager.
Also renamed device.NewDeviceManagers() to device.NewManagers().

Change-Id: I8455da79b991ee67cc16cf898b00b0c98ea97bcd
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 3857a6b..940bf1c 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -389,8 +389,8 @@
 	defer agent.requestQueue.RequestComplete()
 
 	device := agent.getDeviceWithoutLock()
-	dType := agent.adapterMgr.GetDeviceType(device.Type)
-	if dType == nil {
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
@@ -479,8 +479,8 @@
 	defer agent.requestQueue.RequestComplete()
 
 	device := agent.getDeviceWithoutLock()
-	dType := agent.adapterMgr.GetDeviceType(device.Type)
-	if dType == nil {
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
@@ -621,8 +621,8 @@
 	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
 	}
-	dType := agent.adapterMgr.GetDeviceType(device.Type)
-	if dType == nil {
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
@@ -1165,7 +1165,7 @@
 func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
 	logger.Debugw("getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
 	ports := &voltha.Ports{}
-	if device, _ := agent.deviceMgr.GetDevice(ctx, agent.deviceID); device != nil {
+	if device, _ := agent.deviceMgr.getDevice(ctx, agent.deviceID); device != nil {
 		for _, port := range device.Ports {
 			if port.Type == portType {
 				ports.Items = append(ports.Items, port)
@@ -1503,7 +1503,7 @@
 	}
 }
 
-func (agent *Agent) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest) error {
+func (agent *Agent) simulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) error {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
@@ -1513,7 +1513,7 @@
 	cloned := agent.getDeviceWithoutLock()
 
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	ch, err := agent.adapterProxy.SimulateAlarm(subCtx, cloned, simulatereq)
+	ch, err := agent.adapterProxy.SimulateAlarm(subCtx, cloned, simulateReq)
 	if err != nil {
 		cancel()
 		return err
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 60b7273..8b003b4 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -1,18 +1,19 @@
 /*
-* Copyright 2019-present Open Networking Foundation
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
+
 package device
 
 import (
@@ -107,11 +108,6 @@
 	return test
 }
 
-type fakeEventCallbacks struct{}
-
-func (fakeEventCallbacks) SendChangeEvent(_ string, _ *ofp.OfpPortStatus)      {}
-func (fakeEventCallbacks) SendPacketIn(_ string, _ string, _ *ofp.OfpPacketIn) {}
-
 func (dat *DATest) startCore(inCompeteMode bool) {
 	cfg := config.NewRWCoreFlags()
 	cfg.CorePairTopic = "rw_core"
@@ -144,8 +140,7 @@
 	proxy := model.NewProxy(backend, "/")
 	adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
 
-	dat.deviceMgr, dat.logicalDeviceMgr = NewDeviceManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
-	dat.logicalDeviceMgr.SetEventCallbacks(fakeEventCallbacks{})
+	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
 	if err = dat.kmp.Start(); err != nil {
 		logger.Fatal("Cannot start InterContainerProxy")
 	}
diff --git a/rw_core/core/device/event/common.go b/rw_core/core/device/event/common.go
new file mode 100644
index 0000000..ebb1ad3
--- /dev/null
+++ b/rw_core/core/device/event/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package event
+
+import (
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	var err error
+	logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "queue"})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
new file mode 100644
index 0000000..c205564
--- /dev/null
+++ b/rw_core/core/device/event/event.go
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+	"encoding/hex"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"sync"
+)
+
+type Manager struct {
+	packetInQueue        chan openflow_13.PacketIn
+	packetInQueueDone    chan bool
+	changeEventQueue     chan openflow_13.ChangeEvent
+	changeEventQueueDone chan bool
+}
+
+func NewManager() *Manager {
+	return &Manager{
+		packetInQueue:        make(chan openflow_13.PacketIn, 100),
+		packetInQueueDone:    make(chan bool, 1),
+		changeEventQueue:     make(chan openflow_13.ChangeEvent, 100),
+		changeEventQueueDone: make(chan bool, 1),
+	}
+}
+
+func (q *Manager) SendPacketIn(deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
+	// TODO: Augment the OF PacketIn to include the transactionId
+	packetIn := openflow_13.PacketIn{Id: deviceID, PacketIn: packet}
+	logger.Debugw("SendPacketIn", log.Fields{"packetIn": packetIn})
+	q.packetInQueue <- packetIn
+}
+
+type callTracker struct {
+	failedPacket interface{}
+}
+type streamTracker struct {
+	calls map[string]*callTracker
+	sync.Mutex
+}
+
+var streamingTracker = &streamTracker{calls: make(map[string]*callTracker)}
+
+func (q *Manager) getStreamingTracker(method string, done chan<- bool) *callTracker {
+	streamingTracker.Lock()
+	defer streamingTracker.Unlock()
+	if _, ok := streamingTracker.calls[method]; ok {
+		// bail out the other packet in thread
+		logger.Debugf("%s streaming call already running. Exiting it", method)
+		done <- true
+		logger.Debugf("Last %s exited. Continuing ...", method)
+	} else {
+		streamingTracker.calls[method] = &callTracker{failedPacket: nil}
+	}
+	return streamingTracker.calls[method]
+}
+
+func (q *Manager) flushFailedPackets(tracker *callTracker) error {
+	if tracker.failedPacket != nil {
+		switch tracker.failedPacket.(type) {
+		case openflow_13.PacketIn:
+			logger.Debug("Enqueueing last failed packetIn")
+			q.packetInQueue <- tracker.failedPacket.(openflow_13.PacketIn)
+		case openflow_13.ChangeEvent:
+			logger.Debug("Enqueueing last failed changeEvent")
+			q.changeEventQueue <- tracker.failedPacket.(openflow_13.ChangeEvent)
+		}
+	}
+	return nil
+}
+
+// ReceivePacketsIn receives packets from adapter
+func (q *Manager) ReceivePacketsIn(_ *empty.Empty, packetsIn voltha.VolthaService_ReceivePacketsInServer) error {
+	var streamingTracker = q.getStreamingTracker("ReceivePacketsIn", q.packetInQueueDone)
+	logger.Debugw("ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
+
+	err := q.flushFailedPackets(streamingTracker)
+	if err != nil {
+		logger.Errorw("unable-to-flush-failed-packets", log.Fields{"error": err})
+	}
+
+loop:
+	for {
+		select {
+		case packet := <-q.packetInQueue:
+			logger.Debugw("sending-packet-in", log.Fields{
+				"packet": hex.EncodeToString(packet.PacketIn.Data),
+			})
+			if err := packetsIn.Send(&packet); err != nil {
+				logger.Errorw("failed-to-send-packet", log.Fields{"error": err})
+				// save the last failed packet in
+				streamingTracker.failedPacket = packet
+			} else {
+				if streamingTracker.failedPacket != nil {
+					// reset last failed packet saved to avoid flush
+					streamingTracker.failedPacket = nil
+				}
+			}
+		case <-q.packetInQueueDone:
+			logger.Debug("Another ReceivePacketsIn running. Bailing out ...")
+			break loop
+		}
+	}
+
+	//TODO: Find an elegant way to get out of the above loop when the Core is stopped
+	return nil
+}
+
+func (q *Manager) SendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus) {
+	// TODO: validate the type of portStatus parameter
+	//if _, ok := portStatus.(*openflow_13.OfpPortStatus); ok {
+	//}
+	event := openflow_13.ChangeEvent{Id: deviceID, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
+	logger.Debugw("SendChangeEvent", log.Fields{"event": event})
+	q.changeEventQueue <- event
+}
+
+// ReceiveChangeEvents receives change in events
+func (q *Manager) ReceiveChangeEvents(_ *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
+	var streamingTracker = q.getStreamingTracker("ReceiveChangeEvents", q.changeEventQueueDone)
+	logger.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
+
+	err := q.flushFailedPackets(streamingTracker)
+	if err != nil {
+		logger.Errorw("unable-to-flush-failed-packets", log.Fields{"error": err})
+	}
+
+loop:
+	for {
+		select {
+		// Dequeue a change event
+		case event := <-q.changeEventQueue:
+			logger.Debugw("sending-change-event", log.Fields{"event": event})
+			if err := changeEvents.Send(&event); err != nil {
+				logger.Errorw("failed-to-send-change-event", log.Fields{"error": err})
+				// save last failed changeevent
+				streamingTracker.failedPacket = event
+			} else {
+				if streamingTracker.failedPacket != nil {
+					// reset last failed event saved on success to avoid flushing
+					streamingTracker.failedPacket = nil
+				}
+			}
+		case <-q.changeEventQueueDone:
+			logger.Debug("Another ReceiveChangeEvents already running. Bailing out ...")
+			break loop
+		}
+	}
+
+	return nil
+}
+
+func (q *Manager) GetChangeEventsQueueForTest() <-chan openflow_13.ChangeEvent {
+	return q.changeEventQueue
+}
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index c872da9..7bc8e4d 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -373,7 +373,7 @@
 	var err error
 
 	var device *voltha.Device
-	if device, err = agent.deviceMgr.GetDevice(ctx, deviceID); err != nil {
+	if device, err = agent.deviceMgr.getDevice(ctx, deviceID); err != nil {
 		logger.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": deviceID})
 		return err
 	}
@@ -1671,7 +1671,7 @@
 	defer agent.requestQueue.RequestComplete()
 
 	if agent.deviceRoutes == nil {
-		agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
+		agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
 	}
 	// Get all the logical ports on that logical device
 	lDevice := agent.getLogicalDeviceWithoutLock()
@@ -1695,7 +1695,7 @@
 	defer agent.requestQueue.RequestComplete()
 
 	if agent.deviceRoutes == nil {
-		agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
+		agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
 	}
 	if err := agent.deviceRoutes.AddPort(ctx, lp, agent.logicalDevice.Ports); err != nil {
 		return err
@@ -1737,15 +1737,15 @@
 
 	// Send the port change events to the OF controller
 	for _, newP := range newPorts {
-		go agent.ldeviceMgr.eventCallbacks.SendChangeEvent(agent.logicalDeviceID,
+		go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
 			&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
 	}
 	for _, change := range changedPorts {
-		go agent.ldeviceMgr.eventCallbacks.SendChangeEvent(agent.logicalDeviceID,
+		go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
 			&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_MODIFY, Desc: change.OfpPort})
 	}
 	for _, del := range deletedPorts {
-		go agent.ldeviceMgr.eventCallbacks.SendChangeEvent(agent.logicalDeviceID,
+		go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
 			&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_DELETE, Desc: del.OfpPort})
 	}
 
@@ -1915,7 +1915,7 @@
 		"transactionId": transactionID,
 	})
 	packetIn := fu.MkPacketIn(port, packet)
-	agent.ldeviceMgr.eventCallbacks.SendPacketIn(agent.logicalDeviceID, transactionID, packetIn)
+	agent.ldeviceMgr.SendPacketIn(agent.logicalDeviceID, transactionID, packetIn)
 	logger.Debugw("sending-packet-in", log.Fields{"packet": hex.EncodeToString(packetIn.Data)})
 }
 
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 8a00a9e..64c42b5 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -483,8 +483,7 @@
 	proxy := model.NewProxy(backend, "/")
 	adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)
 
-	lda.deviceMgr, lda.logicalDeviceMgr = NewDeviceManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
-	lda.logicalDeviceMgr.SetEventCallbacks(fakeEventCallbacks{})
+	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
 	if err = lda.kmp.Start(); err != nil {
 		logger.Fatal("Cannot start InterContainerProxy")
 	}
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 5005e0c..a5c47b9 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -19,7 +19,10 @@
 import (
 	"context"
 	"errors"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/voltha-go/rw_core/core/device/event"
 	"github.com/opencord/voltha-go/rw_core/utils"
+	"io"
 	"strings"
 	"sync"
 	"time"
@@ -36,9 +39,9 @@
 
 // LogicalManager represent logical device manager attributes
 type LogicalManager struct {
+	*event.Manager
 	logicalDeviceAgents            sync.Map
 	deviceMgr                      *Manager
-	eventCallbacks                 EventCallbacks
 	kafkaICProxy                   kafka.InterContainerProxy
 	clusterDataProxy               *model.Proxy
 	exitChannel                    chan int
@@ -47,15 +50,6 @@
 	logicalDeviceLoadingInProgress map[string][]chan int
 }
 
-type EventCallbacks interface {
-	SendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus)
-	SendPacketIn(deviceID string, transactionID string, packet *openflow_13.OfpPacketIn)
-}
-
-func (ldMgr *LogicalManager) SetEventCallbacks(callbacks EventCallbacks) {
-	ldMgr.eventCallbacks = callbacks
-}
-
 func (ldMgr *LogicalManager) Start(ctx context.Context) {
 	logger.Info("starting-logical-device-manager")
 	probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusRunning)
@@ -69,18 +63,6 @@
 	logger.Info("logical-device-manager-stopped")
 }
 
-func sendAPIResponse(ctx context.Context, ch chan interface{}, result interface{}) {
-	if ctx.Err() == nil {
-		// Returned response only of the ctx has not been cancelled/timeout/etc
-		// Channel is automatically closed when a context is Done
-		ch <- result
-		logger.Debugw("sendResponse", log.Fields{"result": result})
-	} else {
-		// Should the transaction be reverted back?
-		logger.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
-	}
-}
-
 func (ldMgr *LogicalManager) addLogicalDeviceAgentToMap(agent *LogicalAgent) {
 	if _, exist := ldMgr.logicalDeviceAgents.Load(agent.logicalDeviceID); !exist {
 		ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
@@ -118,16 +100,16 @@
 
 // GetLogicalDevice provides a cloned most up to date logical device.  If device is not in memory
 // it will be fetched from the dB
-func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id string) (*voltha.LogicalDevice, error) {
+func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
 	logger.Debugw("getlogicalDevice", log.Fields{"logicaldeviceid": id})
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
 		return agent.GetLogicalDevice(ctx)
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
 
 //ListLogicalDevices returns the list of all logical devices
-func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context) (*voltha.LogicalDevices, error) {
+func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context, _ *empty.Empty) (*voltha.LogicalDevices, error) {
 	logger.Debug("ListAllLogicalDevices")
 
 	var logicalDevices []*voltha.LogicalDevice
@@ -301,7 +283,7 @@
 	// Get the device
 	var device *voltha.Device
 	var err error
-	if device, err = ldMgr.deviceMgr.GetDevice(ctx, deviceID); err != nil {
+	if device, err = ldMgr.deviceMgr.getDevice(ctx, deviceID); err != nil {
 		return nil, err
 	}
 	return ldMgr.getLogicalDeviceID(ctx, device)
@@ -315,7 +297,7 @@
 		return nil, err
 	}
 	var lDevice *voltha.LogicalDevice
-	if lDevice, err = ldMgr.GetLogicalDevice(ctx, *lDeviceID); err != nil {
+	if lDevice, err = ldMgr.GetLogicalDevice(ctx, &voltha.ID{Id: *lDeviceID}); err != nil {
 		return nil, err
 	}
 	// Go over list of ports
@@ -328,37 +310,38 @@
 }
 
 // ListLogicalDeviceFlows returns the flows of logical device
-func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id string) (*openflow_13.Flows, error) {
-	logger.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
+	logger.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id.Id})
+	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
 		return agent.ListLogicalDeviceFlows(ctx)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", id)
+	return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 }
 
 // ListLogicalDeviceFlowGroups returns logical device flow groups
-func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id string) (*openflow_13.FlowGroups, error) {
-	logger.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id})
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
+	logger.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id.Id})
+	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
 		return agent.ListLogicalDeviceFlowGroups(ctx)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", id)
+	return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 }
 
 // ListLogicalDevicePorts returns logical device ports
-func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id string) (*voltha.LogicalPorts, error) {
-	logger.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
+	logger.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id.Id})
+	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
 		return agent.ListLogicalDevicePorts(ctx)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", id)
+	return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 }
 
-func (ldMgr *LogicalManager) GetLogicalPort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+// GetLogicalDevicePort returns logical device port details
+func (ldMgr *LogicalManager) GetLogicalDevicePort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
 	// Get the logical device where this device is attached
 	var err error
 	var lDevice *voltha.LogicalDevice
-	if lDevice, err = ldMgr.GetLogicalDevice(ctx, lPortID.Id); err != nil {
+	if lDevice, err = ldMgr.GetLogicalDevice(ctx, &voltha.ID{Id: lPortID.Id}); err != nil {
 		return nil, err
 	}
 	// Go over list of ports
@@ -393,7 +376,7 @@
 	// Get logical port
 	var logicalPort *voltha.LogicalPort
 	var err error
-	if logicalPort, err = ldMgr.GetLogicalPort(ctx, lPortID); err != nil {
+	if logicalPort, err = ldMgr.GetLogicalDevicePort(ctx, lPortID); err != nil {
 		logger.Debugw("no-logical-device-port-present", log.Fields{"logicalPortId": lPortID.PortId})
 		return err
 	}
@@ -525,72 +508,64 @@
 	return nil
 }
 
-func (ldMgr *LogicalManager) UpdateFlowTable(ctx context.Context, id string, flow *openflow_13.OfpFlowMod, ch chan interface{}) {
-	logger.Debugw("UpdateFlowTable", log.Fields{"logicalDeviceId": id})
-	var res interface{}
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		res = agent.updateFlowTable(ctx, flow)
-		logger.Debugw("UpdateFlowTable-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id)
+// UpdateLogicalDeviceFlowTable updates logical device flow table
+func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
+	logger.Debugw("UpdateLogicalDeviceFlowTable", log.Fields{"logicalDeviceId": flow.Id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
 	}
-	sendAPIResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.updateFlowTable(ctx, flow.FlowMod)
 }
 
-func (ldMgr *LogicalManager) UpdateMeterTable(ctx context.Context, id string, meter *openflow_13.OfpMeterMod, ch chan interface{}) {
-	logger.Debugw("UpdateMeterTable", log.Fields{"logicalDeviceId": id})
-	var res interface{}
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		res = agent.updateMeterTable(ctx, meter)
-		logger.Debugw("UpdateMeterTable-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id)
+// UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response
+func (ldMgr *LogicalManager) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
+	logger.Debugw("UpdateLogicalDeviceMeterTable", log.Fields{"logicalDeviceId": meter.Id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, meter.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", meter.Id)
 	}
-	sendAPIResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.updateMeterTable(ctx, meter.MeterMod)
 }
 
 // ListLogicalDeviceMeters returns logical device meters
-func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id string) (*openflow_13.Meters, error) {
-	logger.Debugw("ListLogicalDeviceMeters", log.Fields{"logicalDeviceId": id})
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		return agent.ListLogicalDeviceMeters(ctx)
+func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
+	logger.Debugw("ListLogicalDeviceMeters", log.Fields{"logicalDeviceId": id.Id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", id)
-}
-func (ldMgr *LogicalManager) UpdateGroupTable(ctx context.Context, id string, groupMod *openflow_13.OfpGroupMod, ch chan interface{}) {
-	logger.Debugw("UpdateGroupTable", log.Fields{"logicalDeviceId": id})
-	var res interface{}
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		res = agent.updateGroupTable(ctx, groupMod)
-		logger.Debugw("UpdateGroupTable-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id)
-	}
-	sendAPIResponse(ctx, ch, res)
+	return agent.ListLogicalDeviceMeters(ctx)
 }
 
-func (ldMgr *LogicalManager) EnableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
-	logger.Debugw("EnableLogicalPort", log.Fields{"logicalDeviceId": id})
-	var res interface{}
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
-		res = agent.enableLogicalPort(ctx, id.PortId)
-		logger.Debugw("EnableLogicalPort-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id.Id)
+// UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
+func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
+	logger.Debugw("UpdateGroupTable", log.Fields{"logicalDeviceId": flow.Id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
 	}
-	sendAPIResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.updateGroupTable(ctx, flow.GroupMod)
 }
 
-func (ldMgr *LogicalManager) DisableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
-	logger.Debugw("DisableLogicalPort", log.Fields{"logicalDeviceId": id})
-	var res interface{}
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
-		res = agent.disableLogicalPort(ctx, id.PortId)
-		logger.Debugw("DisableLogicalPort-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id.Id)
+// EnableLogicalDevicePort enables logical device port
+func (ldMgr *LogicalManager) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
+	logger.Debugw("EnableLogicalDevicePort", log.Fields{"logicalDeviceId": id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-	sendAPIResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.enableLogicalPort(ctx, id.PortId)
+}
+
+// DisableLogicalDevicePort disables logical device port
+func (ldMgr *LogicalManager) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
+	logger.Debugw("DisableLogicalDevicePort", log.Fields{"logicalDeviceId": id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
+	}
+	return &empty.Empty{}, agent.disableLogicalPort(ctx, id.PortId)
 }
 
 func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
@@ -603,10 +578,37 @@
 	return nil
 }
 
-func (ldMgr *LogicalManager) PacketOut(ctx context.Context, packet *openflow_13.PacketOut) {
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, packet.Id); agent != nil {
-		agent.packetOut(ctx, packet.PacketOut)
-	} else {
-		logger.Errorf("No logical device agent present", log.Fields{"logicalDeviceID": packet.Id})
+// StreamPacketsOut sends packets to adapter
+func (ldMgr *LogicalManager) StreamPacketsOut(packets voltha.VolthaService_StreamPacketsOutServer) error {
+	logger.Debugw("StreamPacketsOut-request", log.Fields{"packets": packets})
+loop:
+	for {
+		select {
+		case <-packets.Context().Done():
+			logger.Infow("StreamPacketsOut-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
+			break loop
+		default:
+		}
+
+		packet, err := packets.Recv()
+
+		if err == io.EOF {
+			logger.Debugw("Received-EOF", log.Fields{"packets": packets})
+			break loop
+		}
+
+		if err != nil {
+			logger.Errorw("Failed to receive packet out", log.Fields{"error": err})
+			continue
+		}
+
+		if agent := ldMgr.getLogicalDeviceAgent(packets.Context(), packet.Id); agent != nil {
+			agent.packetOut(packets.Context(), packet.PacketOut)
+		} else {
+			logger.Errorf("No logical device agent present", log.Fields{"logicalDeviceID": packet.Id})
+		}
 	}
+
+	logger.Debugw("StreamPacketsOut-request-done", log.Fields{"packets": packets})
+	return nil
 }
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index ad2af57..b0128a5 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -19,11 +19,13 @@
 import (
 	"context"
 	"errors"
+	"github.com/opencord/voltha-go/rw_core/core/device/event"
 	"reflect"
 	"runtime"
 	"sync"
 	"time"
 
+	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
 	"github.com/opencord/voltha-go/rw_core/core/device/remote"
@@ -31,6 +33,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-lib-go/v3/pkg/probe"
+	"github.com/opencord/voltha-protos/v3/go/common"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -56,7 +59,7 @@
 	deviceLoadingInProgress map[string][]chan int
 }
 
-func NewDeviceManagers(proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+func NewManagers(proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
 	deviceMgr := &Manager{
 		exitChannel:             make(chan int, 1),
 		rootDevices:             make(map[string]bool),
@@ -69,6 +72,7 @@
 		deviceLoadingInProgress: make(map[string][]chan int),
 	}
 	logicalDeviceMgr := &LogicalManager{
+		Manager:                        event.NewManager(),
 		exitChannel:                    make(chan int, 1),
 		deviceMgr:                      deviceMgr,
 		kafkaICProxy:                   kmp,
@@ -97,18 +101,6 @@
 	logger.Info("device-manager-stopped")
 }
 
-func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {
-	if ctx.Err() == nil {
-		// Returned response only of the ctx has not been cancelled/timeout/etc
-		// Channel is automatically closed when a context is Done
-		ch <- result
-		logger.Debugw("sendResponse", log.Fields{"result": result})
-	} else {
-		// Should the transaction be reverted back?
-		logger.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
-	}
-}
-
 func (dMgr *Manager) addDeviceAgentToMap(agent *Agent) {
 	if _, exist := dMgr.deviceAgents.Load(agent.deviceID); !exist {
 		dMgr.deviceAgents.Store(agent.deviceID, agent)
@@ -158,17 +150,22 @@
 	return result
 }
 
-func (dMgr *Manager) CreateDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
+// CreateDevice creates a new parent device in the data model
+func (dMgr *Manager) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
+	if device.MacAddress == "" && device.GetHostAndPort() == "" {
+		logger.Errorf("No Device Info Present")
+		return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
+	}
+	logger.Debugw("create-device", log.Fields{"device": *device})
+
 	deviceExist, err := dMgr.isParentDeviceExist(ctx, device)
 	if err != nil {
 		logger.Errorf("Failed to fetch parent device info")
-		sendResponse(ctx, ch, err)
-		return
+		return nil, err
 	}
 	if deviceExist {
 		logger.Errorf("Device is Pre-provisioned already with same IP-Port or MAC Address")
-		sendResponse(ctx, ch, errors.New("Device is already pre-provisioned"))
-		return
+		return nil, errors.New("device is already pre-provisioned")
 	}
 	logger.Debugw("CreateDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
 
@@ -179,61 +176,81 @@
 	device, err = agent.start(ctx, device)
 	if err != nil {
 		logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
-		sendResponse(ctx, ch, err)
-		return
+		return nil, err
 	}
 	dMgr.addDeviceAgentToMap(agent)
-
-	sendResponse(ctx, ch, device)
+	return device, nil
 }
 
-func (dMgr *Manager) EnableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
-	logger.Debugw("EnableDevice", log.Fields{"deviceid": id})
-	var res interface{}
-	if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
-		res = agent.enableDevice(ctx)
-		logger.Debugw("EnableDevice-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id.Id)
+// EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
+func (dMgr *Manager) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+	logger.Debugw("EnableDevice", log.Fields{"device-id": id.Id})
+	agent := dMgr.getDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-	sendResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.enableDevice(ctx)
 }
 
-func (dMgr *Manager) DisableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
-	logger.Debugw("DisableDevice", log.Fields{"deviceid": id})
-	var res interface{}
-	if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
-		res = agent.disableDevice(ctx)
-		logger.Debugw("DisableDevice-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id.Id)
+// DisableDevice disables a device along with any child device it may have
+func (dMgr *Manager) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+	logger.Debugw("DisableDevice", log.Fields{"device-id": id.Id})
+	agent := dMgr.getDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-
-	sendResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.disableDevice(ctx)
 }
 
-func (dMgr *Manager) RebootDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
-	logger.Debugw("RebootDevice", log.Fields{"deviceid": id})
-	var res interface{}
-	if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
-		res = agent.rebootDevice(ctx)
-		logger.Debugw("RebootDevice-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id.Id)
+//RebootDevice invoked the reboot API to the corresponding adapter
+func (dMgr *Manager) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+	logger.Debugw("RebootDevice", log.Fields{"device-id": id.Id})
+	agent := dMgr.getDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-	sendResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.rebootDevice(ctx)
 }
 
-func (dMgr *Manager) DeleteDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
-	logger.Debugw("DeleteDevice", log.Fields{"deviceid": id})
-	var res interface{}
-	if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
-		res = agent.deleteDevice(ctx)
-		logger.Debugw("DeleteDevice-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id.Id)
+// DeleteDevice removes a device from the data model
+func (dMgr *Manager) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+	logger.Debugw("DeleteDevice", log.Fields{"device-id": id.Id})
+	agent := dMgr.getDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-	sendResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.deleteDevice(ctx)
+}
+
+// ListDevicePorts returns the ports details for a specific device entry
+func (dMgr *Manager) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
+	logger.Debugw("ListDevicePorts", log.Fields{"device-id": id.Id})
+	device, err := dMgr.getDevice(ctx, id.Id)
+	if err != nil {
+		return &voltha.Ports{}, err
+	}
+	return &voltha.Ports{Items: device.Ports}, nil
+}
+
+// ListDeviceFlows returns the flow details for a specific device entry
+func (dMgr *Manager) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*ofp.Flows, error) {
+	logger.Debugw("ListDeviceFlows", log.Fields{"device-id": id.Id})
+	device, err := dMgr.getDevice(ctx, id.Id)
+	if err != nil {
+		return &ofp.Flows{}, err
+	}
+	return device.Flows, nil
+}
+
+// ListDeviceFlowGroups returns the flow group details for a specific device entry
+func (dMgr *Manager) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
+	logger.Debugw("ListDeviceFlowGroups", log.Fields{"device-id": id.Id})
+
+	device, err := dMgr.getDevice(ctx, id.Id)
+	if err != nil {
+		return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
+	}
+	return device.GetFlowGroups(), nil
 }
 
 // stopManagingDevice stops the management of the device as well as any of its reference device and logical device.
@@ -262,9 +279,13 @@
 	return nil
 }
 
-// GetDevice will returns a device, either from memory or from the dB, if present
-func (dMgr *Manager) GetDevice(ctx context.Context, id string) (*voltha.Device, error) {
-	logger.Debugw("GetDevice", log.Fields{"deviceid": id})
+func (dMgr *Manager) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
+	return dMgr.getDevice(ctx, id.Id)
+}
+
+// getDevice will returns a device, either from memory or from the dB, if present
+func (dMgr *Manager) getDevice(ctx context.Context, id string) (*voltha.Device, error) {
+	logger.Debugw("getDevice", log.Fields{"deviceid": id})
 	if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
 		return agent.getDevice(ctx)
 	}
@@ -278,7 +299,7 @@
 
 	var parentDevice *voltha.Device
 	var err error
-	if parentDevice, err = dMgr.GetDevice(ctx, parentDeviceID); err != nil {
+	if parentDevice, err = dMgr.getDevice(ctx, parentDeviceID); err != nil {
 		return nil, status.Errorf(codes.Aborted, "%s", err.Error())
 	}
 	var childDeviceIds []string
@@ -293,7 +314,7 @@
 	var foundChildDevice *voltha.Device
 	for _, childDeviceID := range childDeviceIds {
 		var found bool
-		if searchDevice, err := dMgr.GetDevice(ctx, childDeviceID); err == nil {
+		if searchDevice, err := dMgr.getDevice(ctx, childDeviceID); err == nil {
 
 			foundOnuID := false
 			if searchDevice.ProxyAddress.OnuId == uint32(onuID) {
@@ -340,7 +361,7 @@
 
 	var parentDevice *voltha.Device
 	var err error
-	if parentDevice, err = dMgr.GetDevice(ctx, proxyAddress.DeviceId); err != nil {
+	if parentDevice, err = dMgr.getDevice(ctx, proxyAddress.DeviceId); err != nil {
 		return nil, status.Errorf(codes.Aborted, "%s", err.Error())
 	}
 	var childDeviceIds []string
@@ -354,7 +375,7 @@
 
 	var foundChildDevice *voltha.Device
 	for _, childDeviceID := range childDeviceIds {
-		if searchDevice, err := dMgr.GetDevice(ctx, childDeviceID); err == nil {
+		if searchDevice, err := dMgr.getDevice(ctx, childDeviceID); err == nil {
 			if searchDevice.ProxyAddress == proxyAddress {
 				foundChildDevice = searchDevice
 				break
@@ -388,7 +409,7 @@
 }
 
 // ListDevices retrieves the latest devices from the data model
-func (dMgr *Manager) ListDevices(ctx context.Context) (*voltha.Devices, error) {
+func (dMgr *Manager) ListDevices(ctx context.Context, _ *empty.Empty) (*voltha.Devices, error) {
 	logger.Debug("ListDevices")
 	result := &voltha.Devices{}
 
@@ -569,17 +590,16 @@
 }
 
 // ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
-func (dMgr *Manager) ListDeviceIds() (*voltha.IDs, error) {
+func (dMgr *Manager) ListDeviceIds(_ context.Context, _ *empty.Empty) (*voltha.IDs, error) {
 	logger.Debug("ListDeviceIDs")
 	// Report only device IDs that are in the device agent map
 	return dMgr.listDeviceIdsFromMap(), nil
 }
 
-//ReconcileDevices is a request to a voltha core to update its list of managed devices.  This will
-//trigger loading the devices along with their children and parent in memory
-func (dMgr *Manager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
+// ReconcileDevices is a request to a voltha core to update its list of managed devices.  This will
+// trigger loading the devices along with their children and parent in memory
+func (dMgr *Manager) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
 	logger.Debugw("ReconcileDevices", log.Fields{"numDevices": len(ids.Items)})
-	var res interface{}
 	if ids != nil && len(ids.Items) != 0 {
 		toReconcile := len(ids.Items)
 		reconciled := 0
@@ -592,12 +612,12 @@
 			}
 		}
 		if toReconcile != reconciled {
-			res = status.Errorf(codes.DataLoss, "less-device-reconciled-than-requested:%d/%d", reconciled, toReconcile)
+			return nil, status.Errorf(codes.DataLoss, "less-device-reconciled-than-requested:%d/%d", reconciled, toReconcile)
 		}
 	} else {
-		res = status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
+		return nil, status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
 	}
-	sendResponse(ctx, ch, res)
+	return &empty.Empty{}, nil
 }
 
 // isOkToReconcile validates whether a device is in the correct status to be reconciled
@@ -740,7 +760,7 @@
 		// Notify the logical device manager to setup a logical port, if needed.  If the added port is an NNI or UNI
 		// then a logical port will be added to the logical device and the device graph generated.  If the port is a
 		// PON port then only the device graph will be generated.
-		if device, err := dMgr.GetDevice(ctx, deviceID); err == nil {
+		if device, err := dMgr.getDevice(ctx, deviceID); err == nil {
 			go func() {
 				err = dMgr.logicalDeviceMgr.updateLogicalPort(context.Background(), device, port)
 				if err != nil {
@@ -792,18 +812,17 @@
 	return status.Errorf(codes.NotFound, "%s", deviceID)
 }
 
-// UpdatePmConfigs updates the PM configs.  This is executed when the northbound gRPC API is invoked, typically
+// UpdateDevicePmConfigs updates the PM configs.  This is executed when the northbound gRPC API is invoked, typically
 // following a user action
-func (dMgr *Manager) UpdatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs, ch chan interface{}) {
-	var res interface{}
-	if pmConfigs.Id == "" {
-		res = status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
-	} else if agent := dMgr.getDeviceAgent(ctx, pmConfigs.Id); agent != nil {
-		res = agent.updatePmConfigs(ctx, pmConfigs)
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", pmConfigs.Id)
+func (dMgr *Manager) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
+	if configs.Id == "" {
+		return nil, status.Error(codes.FailedPrecondition, "invalid-device-Id")
 	}
-	sendResponse(ctx, ch, res)
+	agent := dMgr.getDeviceAgent(ctx, configs.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", configs.Id)
+	}
+	return &empty.Empty{}, agent.updatePmConfigs(ctx, configs)
 }
 
 // InitPmConfigs initialize the pm configs as defined by the adapter.
@@ -817,11 +836,13 @@
 	return status.Errorf(codes.NotFound, "%s", deviceID)
 }
 
-func (dMgr *Manager) ListPmConfigs(ctx context.Context, deviceID string) (*voltha.PmConfigs, error) {
-	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
-		return agent.listPmConfigs(ctx)
+// ListDevicePmConfigs returns pm configs of device
+func (dMgr *Manager) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+	agent := dMgr.getDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", deviceID)
+	return agent.listPmConfigs(ctx)
 }
 
 func (dMgr *Manager) getSwitchCapability(ctx context.Context, deviceID string) (*ic.SwitchCapability, error) {
@@ -860,7 +881,7 @@
 	logger.Debugw("UpdateChildrenStatus", log.Fields{"parentDeviceid": deviceID, "operStatus": operStatus, "connStatus": connStatus})
 	var parentDevice *voltha.Device
 	var err error
-	if parentDevice, err = dMgr.GetDevice(ctx, deviceID); err != nil {
+	if parentDevice, err = dMgr.getDevice(ctx, deviceID); err != nil {
 		return status.Errorf(codes.Aborted, "%s", err.Error())
 	}
 	var childDeviceIds []string
@@ -917,7 +938,7 @@
 		// Notify the logical device manager to remove all logical ports, if needed.
 		// At this stage the device itself may gave been deleted already at a DeleteAllPorts
 		// typically is part of a device deletion phase.
-		if device, err := dMgr.GetDevice(ctx, deviceID); err == nil {
+		if device, err := dMgr.getDevice(ctx, deviceID); err == nil {
 			go func() {
 				err = dMgr.logicalDeviceMgr.deleteAllLogicalPorts(ctx, device)
 				if err != nil {
@@ -953,7 +974,7 @@
 			return status.Error(codes.Unimplemented, "state-change-not-implemented")
 		}
 		// Notify the logical device about the state change
-		device, err := dMgr.GetDevice(ctx, deviceID)
+		device, err := dMgr.getDevice(ctx, deviceID)
 		if err != nil {
 			logger.Warnw("non-existent-device", log.Fields{"deviceId": deviceID, "error": err})
 			return err
@@ -973,8 +994,12 @@
 
 	if deviceType == "" && vendorID != "" {
 		logger.Debug("device-type-is-nil-fetching-device-type")
+		deviceTypes, err := dMgr.adapterMgr.ListDeviceTypes(ctx, nil)
+		if err != nil {
+			return nil, err
+		}
 	OLoop:
-		for _, dType := range dMgr.adapterMgr.ListDeviceTypes() {
+		for _, dType := range deviceTypes.Items {
 			for _, v := range dType.VendorIds {
 				if v == vendorID {
 					deviceType = dType.Adapter
@@ -1085,7 +1110,7 @@
 	// Get the logical device Id based on the deviceId
 	var device *voltha.Device
 	var err error
-	if device, err = dMgr.GetDevice(ctx, deviceID); err != nil {
+	if device, err = dMgr.getDevice(ctx, deviceID); err != nil {
 		logger.Errorw("device-not-found", log.Fields{"deviceId": deviceID})
 		return err
 	}
@@ -1171,7 +1196,7 @@
 		// childDevice is the parent device
 		return childDevice
 	}
-	parentDevice, _ := dMgr.GetDevice(ctx, childDevice.ParentId)
+	parentDevice, _ := dMgr.getDevice(ctx, childDevice.ParentId)
 	return parentDevice
 }
 
@@ -1181,7 +1206,7 @@
 	logger.Debug("ChildDevicesLost")
 	var err error
 	var parentDevice *voltha.Device
-	if parentDevice, err = dMgr.GetDevice(ctx, parentDeviceID); err != nil {
+	if parentDevice, err = dMgr.getDevice(ctx, parentDeviceID); err != nil {
 		logger.Warnw("failed-getting-device", log.Fields{"deviceId": parentDeviceID, "error": err})
 		return err
 	}
@@ -1196,7 +1221,7 @@
 	var parentDevice *voltha.Device
 	var childDeviceIds []string
 
-	if parentDevice, err = dMgr.GetDevice(ctx, parentDeviceID); err != nil {
+	if parentDevice, err = dMgr.getDevice(ctx, parentDeviceID); err != nil {
 		logger.Warnw("failed-getting-device", log.Fields{"deviceId": parentDeviceID, "error": err})
 		return err
 	}
@@ -1330,11 +1355,11 @@
 //GetAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
 func (dMgr *Manager) GetAllChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error) {
 	logger.Debugw("GetAllChildDevices", log.Fields{"parentDeviceId": parentDeviceID})
-	if parentDevice, err := dMgr.GetDevice(ctx, parentDeviceID); err == nil {
+	if parentDevice, err := dMgr.getDevice(ctx, parentDeviceID); err == nil {
 		childDevices := make([]*voltha.Device, 0)
 		if childDeviceIds, er := dMgr.getAllChildDeviceIds(parentDevice); er == nil {
 			for _, deviceID := range childDeviceIds {
-				if d, e := dMgr.GetDevice(ctx, deviceID); e == nil && d != nil {
+				if d, e := dMgr.getDevice(ctx, deviceID); e == nil && d != nil {
 					childDevices = append(childDevices, d)
 				}
 			}
@@ -1354,83 +1379,84 @@
 	return nil
 }
 
-func (dMgr *Manager) DownloadImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
-	logger.Debugw("DownloadImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
-	var res interface{}
-	var err error
-	if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
-		if res, err = agent.downloadImage(ctx, img); err != nil {
-			logger.Debugw("DownloadImage-failed", log.Fields{"err": err, "imageName": img.Name})
-			res = err
-		}
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", img.Id)
+// convenience to avoid redefining
+var operationFailureResp = &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
+
+// DownloadImage execute an image download request
+func (dMgr *Manager) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+	logger.Debugw("DownloadImage", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	agent := dMgr.getDeviceAgent(ctx, img.Id)
+	if agent == nil {
+		return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
 	}
-	sendResponse(ctx, ch, res)
+	resp, err := agent.downloadImage(ctx, img)
+	if err != nil {
+		return operationFailureResp, err
+	}
+	return resp, nil
 }
 
-func (dMgr *Manager) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
-	logger.Debugw("CancelImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
-	var res interface{}
-	var err error
-	if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
-		if res, err = agent.cancelImageDownload(ctx, img); err != nil {
-			logger.Debugw("CancelImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
-			res = err
-		}
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", img.Id)
+// CancelImageDownload cancels image download request
+func (dMgr *Manager) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+	logger.Debugw("CancelImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	agent := dMgr.getDeviceAgent(ctx, img.Id)
+	if agent == nil {
+		return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
 	}
-	sendResponse(ctx, ch, res)
+	resp, err := agent.cancelImageDownload(ctx, img)
+	if err != nil {
+		return operationFailureResp, err
+	}
+	return resp, nil
 }
 
-func (dMgr *Manager) ActivateImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
-	logger.Debugw("ActivateImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
-	var res interface{}
-	var err error
-	if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
-		if res, err = agent.activateImage(ctx, img); err != nil {
-			logger.Debugw("ActivateImage-failed", log.Fields{"err": err, "imageName": img.Name})
-			res = err
-		}
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", img.Id)
+// ActivateImageUpdate activates image update request
+func (dMgr *Manager) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+	logger.Debugw("ActivateImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	agent := dMgr.getDeviceAgent(ctx, img.Id)
+	if agent == nil {
+		return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
 	}
-	sendResponse(ctx, ch, res)
+	resp, err := agent.activateImage(ctx, img)
+	if err != nil {
+		return operationFailureResp, err
+	}
+	return resp, nil
 }
 
-func (dMgr *Manager) RevertImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
-	logger.Debugw("RevertImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
-	var res interface{}
-	var err error
-	if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
-		if res, err = agent.revertImage(ctx, img); err != nil {
-			logger.Debugw("RevertImage-failed", log.Fields{"err": err, "imageName": img.Name})
-			res = err
-		}
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", img.Id)
+// RevertImageUpdate reverts image update
+func (dMgr *Manager) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+	logger.Debugw("RevertImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	agent := dMgr.getDeviceAgent(ctx, img.Id)
+	if agent == nil {
+		return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
 	}
-	sendResponse(ctx, ch, res)
+	resp, err := agent.revertImage(ctx, img)
+	if err != nil {
+		return operationFailureResp, err
+	}
+	return resp, nil
 }
 
-func (dMgr *Manager) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
-	logger.Debugw("GetImageDownloadStatus", log.Fields{"deviceid": img.Id, "imageName": img.Name})
-	var res interface{}
-	var err error
-	if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
-		if res, err = agent.getImageDownloadStatus(ctx, img); err != nil {
-			logger.Debugw("GetImageDownloadStatus-failed", log.Fields{"err": err, "imageName": img.Name})
-			res = err
-		}
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", img.Id)
+// convenience to avoid redefining
+var imageDownloadFailureResp = &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
+
+// GetImageDownloadStatus returns status of image download
+func (dMgr *Manager) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+	logger.Debugw("GetImageDownloadStatus", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	agent := dMgr.getDeviceAgent(ctx, img.Id)
+	if agent == nil {
+		return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
 	}
-	sendResponse(ctx, ch, res)
+	resp, err := agent.getImageDownloadStatus(ctx, img)
+	if err != nil {
+		return imageDownloadFailureResp, err
+	}
+	return resp, nil
 }
 
 func (dMgr *Manager) UpdateImageDownload(ctx context.Context, deviceID string, img *voltha.ImageDownload) error {
-	logger.Debugw("UpdateImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+	logger.Debugw("UpdateImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		if err := agent.updateImageDownload(ctx, img); err != nil {
 			logger.Debugw("UpdateImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
@@ -1442,20 +1468,42 @@
 	return nil
 }
 
+// GetImageDownload returns image download
 func (dMgr *Manager) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	logger.Debugw("GetImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
-	if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
-		return agent.getImageDownload(ctx, img)
+	logger.Debugw("GetImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	agent := dMgr.getDeviceAgent(ctx, img.Id)
+	if agent == nil {
+		return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", img.Id)
+	resp, err := agent.getImageDownload(ctx, img)
+	if err != nil {
+		return imageDownloadFailureResp, err
+	}
+	return resp, nil
 }
 
-func (dMgr *Manager) ListImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
-	logger.Debugw("ListImageDownloads", log.Fields{"deviceID": deviceID})
-	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
-		return agent.listImageDownloads(ctx, deviceID)
+// ListImageDownloads returns image downloads
+func (dMgr *Manager) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
+	logger.Debugw("ListImageDownloads", log.Fields{"device-id": id.Id})
+	agent := dMgr.getDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return &voltha.ImageDownloads{Items: []*voltha.ImageDownload{imageDownloadFailureResp}}, status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", deviceID)
+	resp, err := agent.listImageDownloads(ctx, id.Id)
+	if err != nil {
+		return &voltha.ImageDownloads{Items: []*voltha.ImageDownload{imageDownloadFailureResp}}, err
+	}
+	return resp, nil
+}
+
+// GetImages returns all images for a specific device entry
+func (dMgr *Manager) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
+	logger.Debugw("GetImages", log.Fields{"device-id": id.Id})
+	device, err := dMgr.getDevice(ctx, id.Id)
+	if err != nil {
+		return nil, err
+	}
+	return device.GetImages(), nil
 }
 
 func (dMgr *Manager) NotifyInvalidTransition(_ context.Context, device *voltha.Device) error {
@@ -1484,24 +1532,25 @@
 
 // GetParentDeviceID returns parent device id, either from memory or from the dB, if present
 func (dMgr *Manager) GetParentDeviceID(ctx context.Context, deviceID string) string {
-	if device, _ := dMgr.GetDevice(ctx, deviceID); device != nil {
+	if device, _ := dMgr.getDevice(ctx, deviceID); device != nil {
 		logger.Infow("GetParentDeviceId", log.Fields{"deviceId": device.Id, "parentId": device.ParentId})
 		return device.ParentId
 	}
 	return ""
 }
 
-func (dMgr *Manager) SimulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest, ch chan interface{}) {
-	logger.Debugw("SimulateAlarm", log.Fields{"id": simulatereq.Id, "Indicator": simulatereq.Indicator, "IntfId": simulatereq.IntfId,
-		"PortTypeName": simulatereq.PortTypeName, "OnuDeviceId": simulatereq.OnuDeviceId, "InverseBitErrorRate": simulatereq.InverseBitErrorRate,
-		"Drift": simulatereq.Drift, "NewEqd": simulatereq.NewEqd, "OnuSerialNumber": simulatereq.OnuSerialNumber, "Operation": simulatereq.Operation})
-	var res interface{}
-	if agent := dMgr.getDeviceAgent(ctx, simulatereq.Id); agent != nil {
-		res = agent.simulateAlarm(ctx, simulatereq)
-		logger.Debugw("SimulateAlarm-result", log.Fields{"result": res})
+func (dMgr *Manager) SimulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) (*common.OperationResp, error) {
+	logger.Debugw("SimulateAlarm", log.Fields{"id": simulateReq.Id, "Indicator": simulateReq.Indicator, "IntfId": simulateReq.IntfId,
+		"PortTypeName": simulateReq.PortTypeName, "OnuDeviceId": simulateReq.OnuDeviceId, "InverseBitErrorRate": simulateReq.InverseBitErrorRate,
+		"Drift": simulateReq.Drift, "NewEqd": simulateReq.NewEqd, "OnuSerialNumber": simulateReq.OnuSerialNumber, "Operation": simulateReq.Operation})
+	agent := dMgr.getDeviceAgent(ctx, simulateReq.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", simulateReq.Id)
 	}
-	//TODO CLI always get successful response
-	sendResponse(ctx, ch, res)
+	if err := agent.simulateAlarm(ctx, simulateReq); err != nil {
+		return nil, err
+	}
+	return &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}, nil
 }
 
 func (dMgr *Manager) UpdateDeviceReason(ctx context.Context, deviceID string, reason string) error {
@@ -1512,30 +1561,22 @@
 	return status.Errorf(codes.NotFound, "%s", deviceID)
 }
 
-func (dMgr *Manager) EnablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
+func (dMgr *Manager) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
 	logger.Debugw("EnablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
-	var res interface{}
-	if agent := dMgr.getDeviceAgent(ctx, port.DeviceId); agent != nil {
-		res = agent.enablePort(ctx, port)
-		logger.Debugw("EnablePort-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", port.DeviceId)
+	agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
 	}
-
-	sendResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.enablePort(ctx, port)
 }
 
-func (dMgr *Manager) DisablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
+func (dMgr *Manager) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
 	logger.Debugw("DisablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
-	var res interface{}
-	if agent := dMgr.getDeviceAgent(ctx, port.DeviceId); agent != nil {
-		res = agent.disablePort(ctx, port)
-		logger.Debugw("DisablePort-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", port.DeviceId)
+	agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
 	}
-
-	sendResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.disablePort(ctx, port)
 }
 
 // ChildDeviceLost  calls parent adapter to delete child device and all its references
@@ -1551,26 +1592,22 @@
 	return nil
 }
 
-func (dMgr *Manager) StartOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
-	logger.Debugw("Omci_test_Request", log.Fields{"device-id": omcitestrequest.Id, "uuid": omcitestrequest.Uuid})
-	if agent := dMgr.getDeviceAgent(ctx, omcitestrequest.Id); agent != nil {
-		res, err := agent.startOmciTest(ctx, omcitestrequest)
-		if err != nil {
-			return nil, err
-		}
-		logger.Debugw("Omci_test_Response_result-device-magnager", log.Fields{"result": res})
-		return res, nil
+func (dMgr *Manager) StartOmciTestAction(ctx context.Context, request *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+	logger.Debugw("StartOmciTestAction", log.Fields{"device-id": request.Id, "uuid": request.Uuid})
+	agent := dMgr.getDeviceAgent(ctx, request.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", request.Id)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", omcitestrequest.Id)
+	return agent.startOmciTest(ctx, request)
 }
 
 func (dMgr *Manager) GetExtValue(ctx context.Context, value *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
 	log.Debugw("getExtValue", log.Fields{"onu-id": value.Id})
-	cDevice, err := dMgr.GetDevice(ctx, value.Id)
+	cDevice, err := dMgr.getDevice(ctx, value.Id)
 	if err != nil {
 		return nil, status.Errorf(codes.Aborted, "%s", err.Error())
 	}
-	pDevice, err := dMgr.GetDevice(ctx, cDevice.ParentId)
+	pDevice, err := dMgr.getDevice(ctx, cDevice.ParentId)
 	if err != nil {
 		return nil, status.Errorf(codes.Aborted, "%s", err.Error())
 	}