Propagate port change to SDN controller via OF13

This change adds plumbing to propagate new logical port
additions up to the SDN contoller via OpenFlow.

Change-Id: Ic02586b096144c3697a0974ebbc8a7be8760bef8
diff --git a/ofagent/agent.py b/ofagent/agent.py
index cabcc6c..a60b7f7 100644
--- a/ofagent/agent.py
+++ b/ofagent/agent.py
@@ -25,7 +25,7 @@
 from common.utils.asleep import asleep
 from of_connection import OpenFlowConnection
 from of_protocol_handler import OpenFlowProtocolHandler
-
+from ofagent.protos.openflow_13_pb2 import ChangeEvent
 
 log = structlog.get_logger()
 
@@ -121,6 +121,14 @@
     def forward_packet_in(self, ofp_packet_in):
         self.proto_handler.forward_packet_in(ofp_packet_in)
 
+    def forward_change_event(self, event):
+        # assert isinstance(event, ChangeEvent)
+        log.info('got-change-event', change_event=event)
+        if event.HasField("port_status"):
+            self.proto_handler.forward_port_status(event.port_status)
+        else:
+            log.error('unknown-change-event', change_event=event)
+
 
 if __name__ == '__main__':
     """Run this to test the agent for N concurrent sessions:
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 8d25891..1aa7031 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -209,3 +209,9 @@
         if datapath_id:
             agent = self.agent_map[datapath_id]
             agent.forward_packet_in(ofp_packet_in)
+
+    def forward_change_event(self, device_id, event):
+        datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
+        if datapath_id:
+            agent = self.agent_map[datapath_id]
+            agent.forward_change_event(event)
diff --git a/ofagent/converter.py b/ofagent/converter.py
index 9ab9636..7252c43 100644
--- a/ofagent/converter.py
+++ b/ofagent/converter.py
@@ -55,6 +55,12 @@
     kw = pb2dict(pb)
     return of13.common.port_desc(**kw)
 
+def ofp_port_status_to_loxi_port_status(pb):
+    return of13.message.port_status(
+        reason=pb.reason,
+        desc=ofp_port_to_loxi_port_desc(pb.desc)
+    )
+
 def make_loxi_match(match):
     assert match.get('type', pb2.OFPMT_STANDARD) == pb2.OFPMT_OXM
     loxi_match_fields = []
@@ -120,6 +126,7 @@
 
 to_loxi_converters = {
     pb2.ofp_port: ofp_port_to_loxi_port_desc,
+    pb2.ofp_port_status: ofp_port_status_to_loxi_port_status,
     pb2.ofp_flow_stats: ofp_flow_stats_to_loxi_flow_stats,
     pb2.ofp_packet_in: ofp_packet_in_to_loxi_packet_in,
     pb2.ofp_group_entry: ofp_group_entry_to_loxi_group_entry,
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index 5d42566..13b49b9 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -46,12 +46,15 @@
 
         self.packet_out_queue = Queue()  # queue to send out PacketOut msgs
         self.packet_in_queue = DeferredQueue()  # queue to receive PacketIn
+        self.change_event_queue = DeferredQueue()  # queue change events
 
     def start(self):
         log.debug('starting')
         self.start_packet_out_stream()
         self.start_packet_in_stream()
+        self.start_change_event_in_stream()
         reactor.callLater(0, self.packet_in_forwarder_loop)
+        reactor.callLater(0, self.change_event_processing_loop)
         log.info('started')
         return self
 
@@ -92,6 +95,31 @@
 
         reactor.callInThread(receive_packet_in_stream)
 
+    def start_change_event_in_stream(self):
+
+        def receive_change_events():
+            streaming_rpc_method = self.local_stub.ReceiveChangeEvents
+            iterator = streaming_rpc_method(empty_pb2.Empty())
+            for event in iterator:
+                reactor.callFromThread(self.change_event_queue.put, event)
+                log.debug('enqued-change-event',
+                          change_event=event,
+                          queue_len=len(self.change_event_queue.pending))
+
+        reactor.callInThread(receive_change_events)
+
+    @inlineCallbacks
+    def change_event_processing_loop(self):
+        while True:
+            try:
+                event = yield self.change_event_queue.get()
+                device_id = event.id
+                self.connection_manager.forward_change_event(device_id, event)
+            except Exception, e:
+                log.exception('failed-in-packet-in-handler', e=e)
+            if self.stopped:
+                break
+
     @inlineCallbacks
     def packet_in_forwarder_loop(self):
         while True:
diff --git a/ofagent/of_protocol_handler.py b/ofagent/of_protocol_handler.py
index b79786f..5ac8712 100644
--- a/ofagent/of_protocol_handler.py
+++ b/ofagent/of_protocol_handler.py
@@ -258,3 +258,6 @@
 
     def forward_packet_in(self, ofp_packet_in):
         self.cxn.send(to_loxi(ofp_packet_in))
+
+    def forward_port_status(self, ofp_port_status):
+        self.cxn.send(to_loxi(ofp_port_status))
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index f693a73..8dc8fa2 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -20,8 +20,11 @@
 from uuid import uuid4
 
 import structlog
+from klein import Klein
+from twisted.internet import endpoints
 from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks
+from twisted.web.server import Site
 from zope.interface import implementer
 
 from common.utils.asleep import asleep
@@ -53,6 +56,8 @@
         )
     ]
 
+    app = Klein()
+
     def __init__(self, adapter_agent, config):
         self.adapter_agent = adapter_agent
         self.config = config
@@ -62,9 +67,15 @@
             version='0.1',
             config=AdapterConfig(log_level=LogLevel.INFO)
         )
+        self.control_endpoint = None
 
     def start(self):
         log.debug('starting')
+
+        # setup a basic web server for test control
+        self.control_endpoint = endpoints.TCP4ServerEndpoint(reactor, 18880)
+        self.control_endpoint.listen(self.get_test_control_site())
+
         # TODO tmp: populate some devices and logical devices
         reactor.callLater(0, self._tmp_populate_stuff)
         log.info('started')
@@ -309,23 +320,23 @@
         device.oper_status = OperStatus.ACTIVE
         self.adapter_agent.update_device(device)
 
-        reactor.callLater(0.1, self._simulate_detection_of_onus, device)
+        # reactor.callLater(0.1, self._simulate_detection_of_onus, device.id)
 
     @inlineCallbacks
-    def _simulate_detection_of_onus(self, device):
+    def _simulate_detection_of_onus(self, device_id):
         for i in xrange(1, 5):
             log.info('activate-olt-for-onu-{}'.format(i))
-            gemport, vlan_id = self._olt_side_onu_activation(i)
+            vlan_id = self._olt_side_onu_activation(i)
             yield asleep(0.05)
             self.adapter_agent.child_device_detected(
-                parent_device_id=device.id,
+                parent_device_id=device_id,
                 parent_port_no=1,
                 child_device_type='simulated_onu',
                 proxy_address=Device.ProxyAddress(
-                    device_id=device.id,
+                    device_id=device_id,
                     channel_id=vlan_id
                 ),
-                vlan=100 + i
+                vlan=vlan_id
             )
 
     def _olt_side_onu_activation(self, seq):
@@ -335,9 +346,8 @@
         be able to provide tunneled (proxy) communication to the given ONU,
         using the returned information.
         """
-        gemport = seq + 1
         vlan_id = seq + 100
-        return gemport, vlan_id
+        return vlan_id
 
     def update_flows_bulk(self, device, flows, groups):
         log.debug('bulk-flow-update', device_id=device.id,
@@ -358,3 +368,15 @@
 
     def receive_proxied_message(self, proxy_address, msg):
         raise NotImplementedError()
+
+    # ~~~~~~~~~~~~~~~~~~~~ Embedded test Klein rest server ~~~~~~~~~~~~~~~~~~~~
+
+    @app.route('/devices/<string:id>/detect_onus')
+    def detect_onu(self, request, **kw):
+        log.info('detect-onus', request=request, **kw)
+        device_id = kw['id']
+        self._simulate_detection_of_onus(device_id)
+        return '{"status": "OK"}'
+
+    def get_test_control_site(self):
+        return Site(self.app.resource())
diff --git a/voltha/core/core.py b/voltha/core/core.py
index c20f311..834542f 100644
--- a/voltha/core/core.py
+++ b/voltha/core/core.py
@@ -59,6 +59,7 @@
         self.device_agents = {}
         self.logical_device_agents = {}
         self.packet_in_queue = Queue()
+        self.change_event_queue = Queue()
 
     @inlineCallbacks
     def start(self):
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 416829b..2da337f 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -19,7 +19,8 @@
 
 from common.utils.grpc_utils import twisted_async
 from voltha.core.config.config_root import ConfigRoot
-from voltha.protos.openflow_13_pb2 import PacketIn, Flows, FlowGroups
+from voltha.protos.openflow_13_pb2 import PacketIn, Flows, FlowGroups, \
+    ofp_port_status
 
 from google.protobuf.empty_pb2 import Empty
 
@@ -27,7 +28,7 @@
     add_VolthaLocalServiceServicer_to_server, VolthaLocalServiceServicer, \
     VolthaInstance, Adapters, LogicalDevices, LogicalDevice, Ports, \
     LogicalPorts, Devices, Device, DeviceType, \
-    DeviceTypes, DeviceGroups, DeviceGroup, AdminState, OperStatus
+    DeviceTypes, DeviceGroups, DeviceGroup, AdminState, OperStatus, ChangeEvent
 from voltha.registry import registry
 
 log = structlog.get_logger()
@@ -422,3 +423,14 @@
         """Must be called on the twisted thread"""
         packet_in = PacketIn(id=device_id, packet_in=ofp_packet_in)
         self.core.packet_in_queue.put(packet_in)
+
+    def ReceiveChangeEvents(self, request, context):
+        while 1:
+            event = self.core.change_event_queue.get()
+            yield event
+
+    def send_port_change_event(self, device_id, port_status):
+        """Must be called on the twisted thread"""
+        assert isinstance(port_status, ofp_port_status)
+        event = ChangeEvent(id=device_id, port_status=port_status)
+        self.core.change_event_queue.put(event)
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index 817ee60..998f85b 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -30,6 +30,7 @@
 from voltha.protos import third_party
 from voltha.protos import openflow_13_pb2 as ofp
 from voltha.protos.device_pb2 import Port
+from voltha.protos.logical_device_pb2 import LogicalPort
 from voltha.protos.openflow_13_pb2 import Flows, FlowGroups
 from voltha.registry import registry
 
@@ -43,6 +44,7 @@
 
     def __init__(self, core, logical_device):
         self.core = core
+        self.local_handler = core.get_local_handler()
         self.grpc_server = registry('grpc_server')
         self.logical_device_id = logical_device.id
 
@@ -59,9 +61,9 @@
         self.groups_proxy.register_callback(
             CallbackType.POST_UPDATE, self._group_table_updated)
         self.self_proxy.register_callback(
-            CallbackType.POST_ADD, self._port_list_updated)
+            CallbackType.POST_ADD, self._port_added)
         self.self_proxy.register_callback(
-            CallbackType.POST_REMOVE, self._port_list_updated)
+            CallbackType.POST_REMOVE, self._port_removed)
 
         self.log = structlog.get_logger(logical_device_id=logical_device.id)
 
@@ -501,6 +503,39 @@
 
     ## <==================== APIs NEEDED BY FLOW DECOMPOSER ===================
 
+    def _port_added(self, port):
+        assert isinstance(port, LogicalPort)
+        self._port_list_updated(port)
+        self.local_handler.send_port_change_event(
+            device_id=self.logical_device_id,
+            port_status=ofp.ofp_port_status(
+                reason=ofp.OFPPR_ADD,
+                desc=port.ofp_port
+            )
+        )
+
+    def _port_removed(self, port):
+        assert isinstance(port, LogicalPort)
+        self._port_list_updated(port)
+        self.local_handler.send_port_change_event(
+            device_id=self.logical_device_id,
+            port_status=ofp.ofp_port_status(
+                reason=ofp.OFPPR_DELETE,
+                desc=port.ofp_port
+            )
+        )
+
+    # TODO not yet hooked up
+    def _port_changed(self, port):
+        assert isinstance(port, LogicalPort)
+        self.local_handler.send_port_change_event(
+            device_id=self.logical_device_id,
+            port_status=ofp.ofp_port_status(
+                reason=ofp.OFPPR_MODIFY,
+                desc=port.ofp_port
+            )
+        )
+
     def _port_list_updated(self, _):
         # invalidate the graph and the route table
         self._invalidate_cached_tables()
diff --git a/voltha/protos/openflow_13.proto b/voltha/protos/openflow_13.proto
index 01b5059..a452de5 100644
--- a/voltha/protos/openflow_13.proto
+++ b/voltha/protos/openflow_13.proto
@@ -2273,3 +2273,10 @@
     string id = 1;  // LogicalDevice.id
     ofp_packet_out packet_out = 2;
 }
+
+message ChangeEvent {
+    string id = 1; // LogicalDevice.id
+    oneof event {
+        ofp_port_status port_status = 2;
+    }
+}
diff --git a/voltha/protos/voltha.proto b/voltha/protos/voltha.proto
index 964d182..90df0ec 100644
--- a/voltha/protos/voltha.proto
+++ b/voltha/protos/voltha.proto
@@ -417,4 +417,9 @@
         // This does not have an HTTP representation
     }
 
+    rpc ReceiveChangeEvents(google.protobuf.Empty)
+        returns(stream openflow_13.ChangeEvent) {
+        // This does not have an HTTP representation
+    }
+
 }