VOL-2920 - Remove NBI passthrough functions.

Modified the NBIHandler to reference device, logical device, and adapter managers as embedded types, allowing the managers to directly implement API functions, without the need for individual passthrough functions.
Also created a new event.Manager type, which is embedded in device.LogicalManager.
Also renamed device.NewDeviceManagers() to device.NewManagers().

Change-Id: I8455da79b991ee67cc16cf898b00b0c98ea97bcd
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 5005e0c..a5c47b9 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -19,7 +19,10 @@
 import (
 	"context"
 	"errors"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/voltha-go/rw_core/core/device/event"
 	"github.com/opencord/voltha-go/rw_core/utils"
+	"io"
 	"strings"
 	"sync"
 	"time"
@@ -36,9 +39,9 @@
 
 // LogicalManager represent logical device manager attributes
 type LogicalManager struct {
+	*event.Manager
 	logicalDeviceAgents            sync.Map
 	deviceMgr                      *Manager
-	eventCallbacks                 EventCallbacks
 	kafkaICProxy                   kafka.InterContainerProxy
 	clusterDataProxy               *model.Proxy
 	exitChannel                    chan int
@@ -47,15 +50,6 @@
 	logicalDeviceLoadingInProgress map[string][]chan int
 }
 
-type EventCallbacks interface {
-	SendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus)
-	SendPacketIn(deviceID string, transactionID string, packet *openflow_13.OfpPacketIn)
-}
-
-func (ldMgr *LogicalManager) SetEventCallbacks(callbacks EventCallbacks) {
-	ldMgr.eventCallbacks = callbacks
-}
-
 func (ldMgr *LogicalManager) Start(ctx context.Context) {
 	logger.Info("starting-logical-device-manager")
 	probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusRunning)
@@ -69,18 +63,6 @@
 	logger.Info("logical-device-manager-stopped")
 }
 
-func sendAPIResponse(ctx context.Context, ch chan interface{}, result interface{}) {
-	if ctx.Err() == nil {
-		// Returned response only of the ctx has not been cancelled/timeout/etc
-		// Channel is automatically closed when a context is Done
-		ch <- result
-		logger.Debugw("sendResponse", log.Fields{"result": result})
-	} else {
-		// Should the transaction be reverted back?
-		logger.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
-	}
-}
-
 func (ldMgr *LogicalManager) addLogicalDeviceAgentToMap(agent *LogicalAgent) {
 	if _, exist := ldMgr.logicalDeviceAgents.Load(agent.logicalDeviceID); !exist {
 		ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
@@ -118,16 +100,16 @@
 
 // GetLogicalDevice provides a cloned most up to date logical device.  If device is not in memory
 // it will be fetched from the dB
-func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id string) (*voltha.LogicalDevice, error) {
+func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
 	logger.Debugw("getlogicalDevice", log.Fields{"logicaldeviceid": id})
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
 		return agent.GetLogicalDevice(ctx)
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
 
 //ListLogicalDevices returns the list of all logical devices
-func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context) (*voltha.LogicalDevices, error) {
+func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context, _ *empty.Empty) (*voltha.LogicalDevices, error) {
 	logger.Debug("ListAllLogicalDevices")
 
 	var logicalDevices []*voltha.LogicalDevice
@@ -301,7 +283,7 @@
 	// Get the device
 	var device *voltha.Device
 	var err error
-	if device, err = ldMgr.deviceMgr.GetDevice(ctx, deviceID); err != nil {
+	if device, err = ldMgr.deviceMgr.getDevice(ctx, deviceID); err != nil {
 		return nil, err
 	}
 	return ldMgr.getLogicalDeviceID(ctx, device)
@@ -315,7 +297,7 @@
 		return nil, err
 	}
 	var lDevice *voltha.LogicalDevice
-	if lDevice, err = ldMgr.GetLogicalDevice(ctx, *lDeviceID); err != nil {
+	if lDevice, err = ldMgr.GetLogicalDevice(ctx, &voltha.ID{Id: *lDeviceID}); err != nil {
 		return nil, err
 	}
 	// Go over list of ports
@@ -328,37 +310,38 @@
 }
 
 // ListLogicalDeviceFlows returns the flows of logical device
-func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id string) (*openflow_13.Flows, error) {
-	logger.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
+	logger.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id.Id})
+	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
 		return agent.ListLogicalDeviceFlows(ctx)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", id)
+	return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 }
 
 // ListLogicalDeviceFlowGroups returns logical device flow groups
-func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id string) (*openflow_13.FlowGroups, error) {
-	logger.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id})
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
+	logger.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id.Id})
+	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
 		return agent.ListLogicalDeviceFlowGroups(ctx)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", id)
+	return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 }
 
 // ListLogicalDevicePorts returns logical device ports
-func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id string) (*voltha.LogicalPorts, error) {
-	logger.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
+	logger.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id.Id})
+	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
 		return agent.ListLogicalDevicePorts(ctx)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", id)
+	return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 }
 
-func (ldMgr *LogicalManager) GetLogicalPort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+// GetLogicalDevicePort returns logical device port details
+func (ldMgr *LogicalManager) GetLogicalDevicePort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
 	// Get the logical device where this device is attached
 	var err error
 	var lDevice *voltha.LogicalDevice
-	if lDevice, err = ldMgr.GetLogicalDevice(ctx, lPortID.Id); err != nil {
+	if lDevice, err = ldMgr.GetLogicalDevice(ctx, &voltha.ID{Id: lPortID.Id}); err != nil {
 		return nil, err
 	}
 	// Go over list of ports
@@ -393,7 +376,7 @@
 	// Get logical port
 	var logicalPort *voltha.LogicalPort
 	var err error
-	if logicalPort, err = ldMgr.GetLogicalPort(ctx, lPortID); err != nil {
+	if logicalPort, err = ldMgr.GetLogicalDevicePort(ctx, lPortID); err != nil {
 		logger.Debugw("no-logical-device-port-present", log.Fields{"logicalPortId": lPortID.PortId})
 		return err
 	}
@@ -525,72 +508,64 @@
 	return nil
 }
 
-func (ldMgr *LogicalManager) UpdateFlowTable(ctx context.Context, id string, flow *openflow_13.OfpFlowMod, ch chan interface{}) {
-	logger.Debugw("UpdateFlowTable", log.Fields{"logicalDeviceId": id})
-	var res interface{}
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		res = agent.updateFlowTable(ctx, flow)
-		logger.Debugw("UpdateFlowTable-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id)
+// UpdateLogicalDeviceFlowTable updates logical device flow table
+func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
+	logger.Debugw("UpdateLogicalDeviceFlowTable", log.Fields{"logicalDeviceId": flow.Id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
 	}
-	sendAPIResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.updateFlowTable(ctx, flow.FlowMod)
 }
 
-func (ldMgr *LogicalManager) UpdateMeterTable(ctx context.Context, id string, meter *openflow_13.OfpMeterMod, ch chan interface{}) {
-	logger.Debugw("UpdateMeterTable", log.Fields{"logicalDeviceId": id})
-	var res interface{}
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		res = agent.updateMeterTable(ctx, meter)
-		logger.Debugw("UpdateMeterTable-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id)
+// UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response
+func (ldMgr *LogicalManager) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
+	logger.Debugw("UpdateLogicalDeviceMeterTable", log.Fields{"logicalDeviceId": meter.Id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, meter.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", meter.Id)
 	}
-	sendAPIResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.updateMeterTable(ctx, meter.MeterMod)
 }
 
 // ListLogicalDeviceMeters returns logical device meters
-func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id string) (*openflow_13.Meters, error) {
-	logger.Debugw("ListLogicalDeviceMeters", log.Fields{"logicalDeviceId": id})
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		return agent.ListLogicalDeviceMeters(ctx)
+func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
+	logger.Debugw("ListLogicalDeviceMeters", log.Fields{"logicalDeviceId": id.Id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-	return nil, status.Errorf(codes.NotFound, "%s", id)
-}
-func (ldMgr *LogicalManager) UpdateGroupTable(ctx context.Context, id string, groupMod *openflow_13.OfpGroupMod, ch chan interface{}) {
-	logger.Debugw("UpdateGroupTable", log.Fields{"logicalDeviceId": id})
-	var res interface{}
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
-		res = agent.updateGroupTable(ctx, groupMod)
-		logger.Debugw("UpdateGroupTable-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id)
-	}
-	sendAPIResponse(ctx, ch, res)
+	return agent.ListLogicalDeviceMeters(ctx)
 }
 
-func (ldMgr *LogicalManager) EnableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
-	logger.Debugw("EnableLogicalPort", log.Fields{"logicalDeviceId": id})
-	var res interface{}
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
-		res = agent.enableLogicalPort(ctx, id.PortId)
-		logger.Debugw("EnableLogicalPort-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id.Id)
+// UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
+func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
+	logger.Debugw("UpdateGroupTable", log.Fields{"logicalDeviceId": flow.Id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
 	}
-	sendAPIResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.updateGroupTable(ctx, flow.GroupMod)
 }
 
-func (ldMgr *LogicalManager) DisableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
-	logger.Debugw("DisableLogicalPort", log.Fields{"logicalDeviceId": id})
-	var res interface{}
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
-		res = agent.disableLogicalPort(ctx, id.PortId)
-		logger.Debugw("DisableLogicalPort-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id.Id)
+// EnableLogicalDevicePort enables logical device port
+func (ldMgr *LogicalManager) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
+	logger.Debugw("EnableLogicalDevicePort", log.Fields{"logicalDeviceId": id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
 	}
-	sendAPIResponse(ctx, ch, res)
+	return &empty.Empty{}, agent.enableLogicalPort(ctx, id.PortId)
+}
+
+// DisableLogicalDevicePort disables logical device port
+func (ldMgr *LogicalManager) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
+	logger.Debugw("DisableLogicalDevicePort", log.Fields{"logicalDeviceId": id})
+	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
+	}
+	return &empty.Empty{}, agent.disableLogicalPort(ctx, id.PortId)
 }
 
 func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
@@ -603,10 +578,37 @@
 	return nil
 }
 
-func (ldMgr *LogicalManager) PacketOut(ctx context.Context, packet *openflow_13.PacketOut) {
-	if agent := ldMgr.getLogicalDeviceAgent(ctx, packet.Id); agent != nil {
-		agent.packetOut(ctx, packet.PacketOut)
-	} else {
-		logger.Errorf("No logical device agent present", log.Fields{"logicalDeviceID": packet.Id})
+// StreamPacketsOut sends packets to adapter
+func (ldMgr *LogicalManager) StreamPacketsOut(packets voltha.VolthaService_StreamPacketsOutServer) error {
+	logger.Debugw("StreamPacketsOut-request", log.Fields{"packets": packets})
+loop:
+	for {
+		select {
+		case <-packets.Context().Done():
+			logger.Infow("StreamPacketsOut-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
+			break loop
+		default:
+		}
+
+		packet, err := packets.Recv()
+
+		if err == io.EOF {
+			logger.Debugw("Received-EOF", log.Fields{"packets": packets})
+			break loop
+		}
+
+		if err != nil {
+			logger.Errorw("Failed to receive packet out", log.Fields{"error": err})
+			continue
+		}
+
+		if agent := ldMgr.getLogicalDeviceAgent(packets.Context(), packet.Id); agent != nil {
+			agent.packetOut(packets.Context(), packet.PacketOut)
+		} else {
+			logger.Errorf("No logical device agent present", log.Fields{"logicalDeviceID": packet.Id})
+		}
 	}
+
+	logger.Debugw("StreamPacketsOut-request-done", log.Fields{"packets": packets})
+	return nil
 }