VOL-1097 : Ofagent integration for voltha 2.0
- Created a common location for python based components
- Adjusted the ofagent component to interact with voltha 2.0
- Added streaming rpc methods for rcv/send of packets to voltha api
- Adjusted voltha.proto
Change-Id: I47fb7b80878ead060b4b42bd16cb4f8aa384fdb6
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 215f60f..1524474 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -27,6 +27,8 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
+ "io"
+ "time"
)
const MAX_RESPONSE_TIME = 500 // milliseconds
@@ -38,8 +40,10 @@
}
func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager) *APIHandler {
- handler := &APIHandler{deviceMgr: deviceMgr,
- logicalDeviceMgr: lDeviceMgr}
+ handler := &APIHandler{
+ deviceMgr: deviceMgr,
+ logicalDeviceMgr: lDeviceMgr,
+ }
return handler
}
@@ -355,3 +359,87 @@
}
return nil, errors.New("UnImplemented")
}
+
+func (handler *APIHandler) forwardPacketOut(packet *openflow_13.PacketOut) {
+ log.Debugw("forwardPacketOut-request", log.Fields{"packet": packet})
+ //agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
+ //agent.packetOut(packet.PacketOut)
+}
+func (handler *APIHandler) StreamPacketsOut(
+ packets voltha.VolthaService_StreamPacketsOutServer,
+) error {
+ log.Debugw("StreamPacketsOut-request", log.Fields{"packets": packets})
+ for {
+ packet, err := packets.Recv()
+
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ log.Errorw("Failed to receive packet", log.Fields{"error": err})
+ }
+
+ handler.forwardPacketOut(packet)
+ }
+
+ log.Debugw("StreamPacketsOut-request-done", log.Fields{"packets": packets})
+ return nil
+}
+
+func (handler *APIHandler) sendPacketIn(deviceId string, packet *openflow_13.OfpPacketIn) {
+ packetIn := openflow_13.PacketIn{Id: deviceId, PacketIn: packet}
+ log.Debugw("sendPacketIn", log.Fields{"packetIn": packetIn})
+ // TODO: put the packet in the queue
+}
+
+func (handler *APIHandler) ReceivePacketsIn(
+ empty *empty.Empty,
+ packetsIn voltha.VolthaService_ReceivePacketsInServer,
+) error {
+ log.Debugw("ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
+
+ for {
+ // TODO: need to retrieve packet from queue
+ packet := &openflow_13.PacketIn{}
+ time.Sleep(time.Duration(5) * time.Second)
+ err := packetsIn.Send(packet)
+ if err != nil {
+ log.Errorw("Failed to send packet", log.Fields{"error": err})
+ }
+ }
+ log.Debugw("ReceivePacketsIn-request-done", log.Fields{"packetsIn": packetsIn})
+ return nil
+}
+
+func (handler *APIHandler) sendChangeEvent(deviceId string, portStatus *openflow_13.OfpPortStatus) {
+ // TODO: validate the type of portStatus parameter
+ //if _, ok := portStatus.(*openflow_13.OfpPortStatus); ok {
+ //}
+ event := openflow_13.ChangeEvent{Id: deviceId, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
+ log.Debugw("sendChangeEvent", log.Fields{"event": event})
+ // TODO: put the packet in the queue
+}
+
+func (handler *APIHandler) ReceiveChangeEvents(
+ empty *empty.Empty,
+ changeEvents voltha.VolthaService_ReceiveChangeEventsServer,
+) error {
+ log.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
+ for {
+ // TODO: need to retrieve packet from queue
+ event := &openflow_13.ChangeEvent{}
+ time.Sleep(time.Duration(5) * time.Second)
+ err := changeEvents.Send(event)
+ if err != nil {
+ log.Errorw("Failed to send change event", log.Fields{"error": err})
+ }
+ }
+ return nil
+}
+
+func (handler *APIHandler) Subscribe(
+ ctx context.Context,
+ ofAgent *voltha.OfAgentSubscriber,
+) (*voltha.OfAgentSubscriber, error) {
+ log.Debugw("Subscribe-request", log.Fields{"ofAgent": ofAgent})
+ return &voltha.OfAgentSubscriber{OfagentId: ofAgent.OfagentId, VolthaId: ofAgent.VolthaId}, nil
+}