This is a first attempt at getting INGRESS packets flowing northwards in the system.
Tested with ponsim and EAPOL packets generated by the RG tester; confirmed that the
packets flowed from rg->onu->olt->adapter->rw-core->ofagent.
Change-Id: I534c2a376751de50f8e5af9676cd9d467e7b3835
diff --git a/BUILD.md b/BUILD.md
index e0cba23..d96a573 100644
--- a/BUILD.md
+++ b/BUILD.md
@@ -35,6 +35,7 @@
```
go get -u google.golang.org/grpc # gRPC
+go get -u github.com/golang-collections/go-datastructures/queue
go get -u github.com/golang/protobuf/protoc-gen-go # protoc plugin for Go
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
go get -u github.com/gogo/protobuf/proto # Clone function
diff --git a/k8s/single-node/rw-core.yml b/k8s/single-node/rw-core.yml
index df75a6d..7fd3848 100644
--- a/k8s/single-node/rw-core.yml
+++ b/k8s/single-node/rw-core.yml
@@ -60,7 +60,6 @@
- "-kv_store_type=etcd"
- "-kv_store_host=etcd.$(NAMESPACE).svc.cluster.local"
- "-kv_store_port=2379"
- - "-grpc_host=rw-core.$(NAMESPACE).svc.cluster.local"
- "-grpc_port=50057"
- "-banner=true"
- "-kafka_adapter_host=kafka.$(NAMESPACE).svc.cluster.local"
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0908146..298c3ce 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -32,7 +32,7 @@
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
grpcServer *grpcserver.GrpcServer
- grpcNBIAPIHanfler *APIHandler
+ grpcNBIAPIHandler *APIHandler
config *config.RWCoreFlags
kmp *kafka.InterContainerProxy
clusterDataRoot model.Root
@@ -106,12 +106,13 @@
core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
log.Info("grpc-server-created")
- core.grpcNBIAPIHanfler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr)
+ core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr)
+ core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
// Create a function to register the core GRPC service with the GRPC server
f := func(gs *grpc.Server) {
voltha.RegisterVolthaServiceServer(
gs,
- core.grpcNBIAPIHanfler,
+ core.grpcNBIAPIHandler,
)
}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index d44ccf4..79ef982 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -18,6 +18,7 @@
import (
"context"
"errors"
+ "github.com/golang-collections/go-datastructures/queue"
"github.com/golang/protobuf/ptypes/empty"
da "github.com/opencord/voltha-go/common/core/northbound/grpc"
"github.com/opencord/voltha-go/common/log"
@@ -36,6 +37,7 @@
type APIHandler struct {
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
+ packetInQueue *queue.Queue
da.DefaultAPIHandler
}
@@ -43,6 +45,8 @@
handler := &APIHandler{
deviceMgr: deviceMgr,
logicalDeviceMgr: lDeviceMgr,
+ // TODO: Figure out what the 'hint' parameter to queue.New does
+ packetInQueue: queue.New(10),
}
return handler
}
@@ -388,7 +392,10 @@
func (handler *APIHandler) sendPacketIn(deviceId string, packet *openflow_13.OfpPacketIn) {
packetIn := openflow_13.PacketIn{Id: deviceId, PacketIn: packet}
log.Debugw("sendPacketIn", log.Fields{"packetIn": packetIn})
- // TODO: put the packet in the queue
+ // Enqueue the packet
+ if err := handler.packetInQueue.Put(packetIn); err != nil {
+ log.Errorw("failed-to-enqueue-packet", log.Fields{"error": err})
+ }
}
func (handler *APIHandler) ReceivePacketsIn(
@@ -398,12 +405,15 @@
log.Debugw("ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
for {
- // TODO: need to retrieve packet from queue
- packet := &openflow_13.PacketIn{}
- time.Sleep(time.Duration(5) * time.Second)
- err := packetsIn.Send(packet)
- if err != nil {
- log.Errorw("Failed to send packet", log.Fields{"error": err})
+ // Dequeue a packet
+ if packets, err := handler.packetInQueue.Get(1); err == nil {
+ log.Debugw("dequeued-packet", log.Fields{"packet": packets[0]})
+ if packet, ok := packets[0].(openflow_13.PacketIn); ok {
+ log.Debugw("sending-packet-in", log.Fields{"packet": packet})
+ if err := packetsIn.Send(&packet); err != nil {
+ log.Errorw("failed-to-send-packet", log.Fields{"error": err})
+ }
+ }
}
}
log.Debugw("ReceivePacketsIn-request-done", log.Fields{"packetsIn": packetsIn})
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index ea94788..86b79f9 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -1015,5 +1015,6 @@
func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
packet_in := fd.MkPacketIn(port, packet)
+ agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, packet_in)
log.Debugw("sending-packet-in", log.Fields{"packet-in": packet_in})
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 5f9cea2..f49e52a 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -32,6 +32,7 @@
type LogicalDeviceManager struct {
logicalDeviceAgents map[string]*LogicalDeviceAgent
deviceMgr *DeviceManager
+ grpcNbiHdlr *APIHandler
adapterProxy *AdapterProxy
kafkaICProxy *kafka.InterContainerProxy
clusterDataProxy *model.Proxy
@@ -50,6 +51,10 @@
return &logicalDeviceMgr
}
+func (ldMgr *LogicalDeviceManager) setGrpcNbiHandler(grpcNbiHandler *APIHandler) {
+ ldMgr.grpcNbiHdlr = grpcNbiHandler
+}
+
func (ldMgr *LogicalDeviceManager) start(ctx context.Context) {
log.Info("starting-logical-device-manager")
log.Info("logical-device-manager-started")