VOL-1097 : Ofagent integration for voltha 2.0

- Created a common location for python based components
- Adjusted the ofagent component to interact with voltha 2.0
- Added streaming rpc methods for rcv/send of packets to voltha api
- Adjusted voltha.proto

Change-Id: I47fb7b80878ead060b4b42bd16cb4f8aa384fdb6
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 215f60f..1524474 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -27,6 +27,8 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/metadata"
 	"google.golang.org/grpc/status"
+	"io"
+	"time"
 )
 
 const MAX_RESPONSE_TIME = 500   // milliseconds
@@ -38,8 +40,10 @@
 }
 
 func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager) *APIHandler {
-	handler := &APIHandler{deviceMgr: deviceMgr,
-		logicalDeviceMgr: lDeviceMgr}
+	handler := &APIHandler{
+		deviceMgr:        deviceMgr,
+		logicalDeviceMgr: lDeviceMgr,
+	}
 	return handler
 }
 
@@ -355,3 +359,87 @@
 	}
 	return nil, errors.New("UnImplemented")
 }
+
+func (handler *APIHandler) forwardPacketOut(packet *openflow_13.PacketOut) {
+	log.Debugw("forwardPacketOut-request", log.Fields{"packet": packet})
+	//agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
+	//agent.packetOut(packet.PacketOut)
+}
+func (handler *APIHandler) StreamPacketsOut(
+	packets voltha.VolthaService_StreamPacketsOutServer,
+) error {
+	log.Debugw("StreamPacketsOut-request", log.Fields{"packets": packets})
+	for {
+		packet, err := packets.Recv()
+
+		if err == io.EOF {
+			break
+		} else if err != nil {
+			log.Errorw("Failed to receive packet", log.Fields{"error": err})
+		}
+
+		handler.forwardPacketOut(packet)
+	}
+
+	log.Debugw("StreamPacketsOut-request-done", log.Fields{"packets": packets})
+	return nil
+}
+
+func (handler *APIHandler) sendPacketIn(deviceId string, packet *openflow_13.OfpPacketIn) {
+	packetIn := openflow_13.PacketIn{Id: deviceId, PacketIn: packet}
+	log.Debugw("sendPacketIn", log.Fields{"packetIn": packetIn})
+	// TODO: put the packet in the queue
+}
+
+func (handler *APIHandler) ReceivePacketsIn(
+	empty *empty.Empty,
+	packetsIn voltha.VolthaService_ReceivePacketsInServer,
+) error {
+	log.Debugw("ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
+
+	for {
+		// TODO: need to retrieve packet from queue
+		packet := &openflow_13.PacketIn{}
+		time.Sleep(time.Duration(5) * time.Second)
+		err := packetsIn.Send(packet)
+		if err != nil {
+			log.Errorw("Failed to send packet", log.Fields{"error": err})
+		}
+	}
+	log.Debugw("ReceivePacketsIn-request-done", log.Fields{"packetsIn": packetsIn})
+	return nil
+}
+
+func (handler *APIHandler) sendChangeEvent(deviceId string, portStatus *openflow_13.OfpPortStatus) {
+	// TODO: validate the type of portStatus parameter
+	//if _, ok := portStatus.(*openflow_13.OfpPortStatus); ok {
+	//}
+	event := openflow_13.ChangeEvent{Id: deviceId, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
+	log.Debugw("sendChangeEvent", log.Fields{"event": event})
+	// TODO: put the packet in the queue
+}
+
+func (handler *APIHandler) ReceiveChangeEvents(
+	empty *empty.Empty,
+	changeEvents voltha.VolthaService_ReceiveChangeEventsServer,
+) error {
+	log.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
+	for {
+		// TODO: need to retrieve packet from queue
+		event := &openflow_13.ChangeEvent{}
+		time.Sleep(time.Duration(5) * time.Second)
+		err := changeEvents.Send(event)
+		if err != nil {
+			log.Errorw("Failed to send change event", log.Fields{"error": err})
+		}
+	}
+	return nil
+}
+
+func (handler *APIHandler) Subscribe(
+	ctx context.Context,
+	ofAgent *voltha.OfAgentSubscriber,
+) (*voltha.OfAgentSubscriber, error) {
+	log.Debugw("Subscribe-request", log.Fields{"ofAgent": ofAgent})
+	return &voltha.OfAgentSubscriber{OfagentId: ofAgent.OfagentId, VolthaId: ofAgent.VolthaId}, nil
+}