This is a first attempt at getting INGRESS packets flowing northwards in the system.
Tested with ponsim and EAPOL packets generated by the RG tester; confirmed that the
packets flowed from rg->onu->olt->adapter->rw-core->ofagent.

Change-Id: I534c2a376751de50f8e5af9676cd9d467e7b3835
diff --git a/BUILD.md b/BUILD.md
index e0cba23..d96a573 100644
--- a/BUILD.md
+++ b/BUILD.md
@@ -35,6 +35,7 @@
 
 ```
 go get -u google.golang.org/grpc   # gRPC
+go get -u github.com/golang-collections/go-datastructures/queue
 go get -u github.com/golang/protobuf/protoc-gen-go   # protoc plugin for Go
 go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
 go get -u github.com/gogo/protobuf/proto   # Clone function
diff --git a/k8s/single-node/rw-core.yml b/k8s/single-node/rw-core.yml
index df75a6d..7fd3848 100644
--- a/k8s/single-node/rw-core.yml
+++ b/k8s/single-node/rw-core.yml
@@ -60,7 +60,6 @@
             - "-kv_store_type=etcd"
             - "-kv_store_host=etcd.$(NAMESPACE).svc.cluster.local"
             - "-kv_store_port=2379"
-            - "-grpc_host=rw-core.$(NAMESPACE).svc.cluster.local"
             - "-grpc_port=50057"
             - "-banner=true"
             - "-kafka_adapter_host=kafka.$(NAMESPACE).svc.cluster.local"
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0908146..298c3ce 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -32,7 +32,7 @@
 	deviceMgr         *DeviceManager
 	logicalDeviceMgr  *LogicalDeviceManager
 	grpcServer        *grpcserver.GrpcServer
-	grpcNBIAPIHanfler *APIHandler
+	grpcNBIAPIHandler *APIHandler
 	config            *config.RWCoreFlags
 	kmp               *kafka.InterContainerProxy
 	clusterDataRoot   model.Root
@@ -106,12 +106,13 @@
 	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
 	log.Info("grpc-server-created")
 
-	core.grpcNBIAPIHanfler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr)
+	core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr)
+	core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
 	//	Create a function to register the core GRPC service with the GRPC server
 	f := func(gs *grpc.Server) {
 		voltha.RegisterVolthaServiceServer(
 			gs,
-			core.grpcNBIAPIHanfler,
+			core.grpcNBIAPIHandler,
 		)
 	}
 
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index d44ccf4..79ef982 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -18,6 +18,7 @@
 import (
 	"context"
 	"errors"
+	"github.com/golang-collections/go-datastructures/queue"
 	"github.com/golang/protobuf/ptypes/empty"
 	da "github.com/opencord/voltha-go/common/core/northbound/grpc"
 	"github.com/opencord/voltha-go/common/log"
@@ -36,6 +37,7 @@
 type APIHandler struct {
 	deviceMgr        *DeviceManager
 	logicalDeviceMgr *LogicalDeviceManager
+	packetInQueue     *queue.Queue
 	da.DefaultAPIHandler
 }
 
@@ -43,6 +45,8 @@
 	handler := &APIHandler{
 		deviceMgr:        deviceMgr,
 		logicalDeviceMgr: lDeviceMgr,
+		// TODO: Figure out what the 'hint' parameter to queue.New does
+		packetInQueue:    queue.New(10),
 	}
 	return handler
 }
@@ -388,7 +392,10 @@
 func (handler *APIHandler) sendPacketIn(deviceId string, packet *openflow_13.OfpPacketIn) {
 	packetIn := openflow_13.PacketIn{Id: deviceId, PacketIn: packet}
 	log.Debugw("sendPacketIn", log.Fields{"packetIn": packetIn})
-	// TODO: put the packet in the queue
+	// Enqueue the packet
+	if err := handler.packetInQueue.Put(packetIn); err != nil {
+		log.Errorw("failed-to-enqueue-packet", log.Fields{"error": err})
+	}
 }
 
 func (handler *APIHandler) ReceivePacketsIn(
@@ -398,12 +405,15 @@
 	log.Debugw("ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
 
 	for {
-		// TODO: need to retrieve packet from queue
-		packet := &openflow_13.PacketIn{}
-		time.Sleep(time.Duration(5) * time.Second)
-		err := packetsIn.Send(packet)
-		if err != nil {
-			log.Errorw("Failed to send packet", log.Fields{"error": err})
+		// Dequeue a packet
+		if packets, err := handler.packetInQueue.Get(1); err == nil {
+			log.Debugw("dequeued-packet", log.Fields{"packet": packets[0]})
+			if packet, ok := packets[0].(openflow_13.PacketIn); ok {
+				log.Debugw("sending-packet-in", log.Fields{"packet": packet})
+				if err := packetsIn.Send(&packet); err != nil {
+					log.Errorw("failed-to-send-packet", log.Fields{"error": err})
+				}
+			}
 		}
 	}
 	log.Debugw("ReceivePacketsIn-request-done", log.Fields{"packetsIn": packetsIn})
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index ea94788..86b79f9 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -1015,5 +1015,6 @@
 func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
 	log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
 	packet_in := fd.MkPacketIn(port, packet)
+	agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, packet_in)
 	log.Debugw("sending-packet-in", log.Fields{"packet-in": packet_in})
 }
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 5f9cea2..f49e52a 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -32,6 +32,7 @@
 type LogicalDeviceManager struct {
 	logicalDeviceAgents        map[string]*LogicalDeviceAgent
 	deviceMgr                  *DeviceManager
+	grpcNbiHdlr                *APIHandler
 	adapterProxy               *AdapterProxy
 	kafkaICProxy               *kafka.InterContainerProxy
 	clusterDataProxy           *model.Proxy
@@ -50,6 +51,10 @@
 	return &logicalDeviceMgr
 }
 
+func (ldMgr *LogicalDeviceManager) setGrpcNbiHandler(grpcNbiHandler *APIHandler) {
+	ldMgr.grpcNbiHdlr = grpcNbiHandler
+}
+
 func (ldMgr *LogicalDeviceManager) start(ctx context.Context) {
 	log.Info("starting-logical-device-manager")
 	log.Info("logical-device-manager-started")