Async/streaming gRPC client/server proto

This experiment was to fine-tune how we can implement
async gRPC client and server code inside a Twisted
python app.

Change-Id: I945014e27f4b9d6ed624666e0284cc298548adb3

Major cleanup of openflow_13.proto

Change-Id: I4e54eaf87b682124ec518a0ade1a6050a6ec6da8

Relocated openflow_13.proto to voltha

Change-Id: I66ae45a9142d180c2c6651e75c7a1ee08aef7ef8

Removed forced utest from make build

Change-Id: If0da58e9d135ebde6ca68c3316688a03a7b10f2f

twisted openflow agent first pass

Change-Id: Ibe5b4727ccfe92e6fd464ccd3baf6275569ef5d3

store openflow derived files

Change-Id: Ib3e1384bb2ca2a9c0872767f7b793f96b0a154e2

Minor cleanup

Change-Id: I1280ed3acb606121b616a0efd573f5f59d010dca

Factored out common utils

Change-Id: Icd86fcd50f60d0900924674cbcd65e13e47782a1

Refactored twisted agent

Change-Id: I71f26ce5357a4f98477df60b8c5ddc068cf75d43

Relocated openflow agent to ofagent

... and preserved obsolete working (non-twisted) agent under
~/obsolete, so we can still run the olt-oftest and pass tests,
unit the new twisted based agent reaches that maturity point.

Change-Id: I727f8d7144b1291a40276dad2966b7643bd7bc4b

olt-oftest in fake mode works with new agent

Change-Id: I43b4f5812e8dfaa9f45e4a77fdcf6c30ac520f8d

Initial ofagent/voltha operation

Change-Id: Ia8104f1285a6b1c51635d36d7d78fc113f800e79

Additional callouts to Voltha

Change-Id: If8f483d5140d3c9d45f22b480b8d33249a29cd4e

More gRPC calls

Change-Id: I7d24fadf9425217fb26ffe18f25359d072ef38fa

Flow add/list now works

Change-Id: Ie3e3e73108645b47891cef798fc61372a022fd93

Missed some files

Change-Id: I29e81238ff1a26c095c0c73e521579edf7092e21
diff --git a/ofagent/of_protocol_handler.py b/ofagent/of_protocol_handler.py
new file mode 100644
index 0000000..5d1191a
--- /dev/null
+++ b/ofagent/of_protocol_handler.py
@@ -0,0 +1,247 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from twisted.internet.defer import inlineCallbacks
+
+import loxi.of13 as ofp
+from converter import to_loxi, pb2dict, to_grpc
+
+log = structlog.get_logger()
+
+
+class OpenFlowProtocolError(Exception): pass
+
+
+class OpenFlowProtocolHandler(object):
+
+    def __init__(self, datapath_id, agent, cxn, rpc):
+        """
+        The upper half of the OpenFlow protocol, focusing on message
+        exchanges.
+        :param agent: Reference to the Agent() instance, can be used to
+          indicate critical errors to break the connection.
+        :param cxn: The lower level message serdes part of the OF protocol.
+        :param rpc: The application level stub on which RPC calls
+          are made as result of processing incoming OpenFlow request messages.
+        """
+        self.datapath_id = datapath_id
+        self.agent = agent
+        self.cxn = cxn
+        self.rpc = rpc
+
+    @inlineCallbacks
+    def run(self):
+        """A new call is made after a fresh reconnect"""
+
+        try:
+            # send initial hello message
+            self.cxn.send(ofp.message.hello())
+
+            # expect to receive a hello message
+            msg = yield self.cxn.recv_class(ofp.message.hello)
+            # TODO verify version compatibility (must list version 1.3)
+
+            while True:
+                req = yield self.cxn.recv_any()
+                handler = self.main_handlers.get(req.type, None)
+                if handler:
+                    handler(self, req)
+                else:
+                    log.error('cannot-handle',
+                              request=req, xid=req.xid, type=req.type)
+
+        except Exception, e:
+            log.exception('exception', e=e)
+
+    def handle_echo_request(self, req):
+        self.cxn.send(ofp.message.echo_reply(xid=req.xid))
+
+    @inlineCallbacks
+    def handle_feature_request(self, req):
+        device_info = yield self.rpc.get_device_info(self.datapath_id)
+        kw = pb2dict(device_info.switch_features)
+        self.cxn.send(ofp.message.features_reply(
+            xid=req.xid,
+            datapath_id=self.datapath_id,
+            **kw))
+
+    def handle_stats_request(self, req):
+        handler = self.stats_handlers.get(req.stats_type, None)
+        if handler:
+            handler(self, req)
+        else:
+            raise OpenFlowProtocolError(
+                'Cannot handle stats request type "{}"'.format(req.stats_type))
+
+    def handle_barrier_request(self, req):
+        # TODO not really doing barrier yet, but we respond
+        self.cxn.send(ofp.message.barrier_reply(xid=req.xid))
+
+    def handle_experimenter_request(self, req):
+        raise NotImplementedError()
+
+    @inlineCallbacks
+    def handle_flow_mod_request(self, req):
+        yield self.rpc.update_flow_table(self.datapath_id, to_grpc(req))
+
+    def handle_get_async_request(self, req):
+        raise NotImplementedError()
+
+    def handle_get_config_request(self, req):
+        self.cxn.send(ofp.message.get_config_reply(
+            xid=req.xid,
+            miss_send_len=ofp.OFPCML_NO_BUFFER
+        ))
+
+    def handle_group_mod_request(self, req):
+        # TODO do not do anything yet
+        pass
+
+    def handle_meter_mod_request(self, req):
+        raise NotImplementedError()
+
+    def handle_role_request(self, req):
+        # TODO this is where we need to manage which connection is active
+        if req.role != ofp.OFPCR_ROLE_MASTER:
+            raise NotImplementedError()
+        self.cxn.send(ofp.message.role_reply(
+            xid=req.xid, role=req.role, generation_id=req.generation_id))
+
+    def handle_packet_out_request(self, req):
+        # TODO send packet out
+        pass
+
+    def handle_set_config_request(self, req):
+        # TODO ignore for now
+        pass
+
+    def handle_port_mod_request(self, req):
+        raise NotImplementedError()
+
+    def handle_table_mod_request(self, req):
+        raise NotImplementedError()
+
+    def handle_queue_get_config_request(self, req):
+        raise NotImplementedError()
+
+    def handle_set_async_request(self, req):
+        raise NotImplementedError()
+
+    def handle_aggregate_request(self, req):
+        raise NotImplementedError
+
+    @inlineCallbacks
+    def handle_device_description_request(self, req):
+        device_info = yield self.rpc.get_device_info(self.datapath_id)
+        kw = pb2dict(device_info.desc)
+        self.cxn.send(ofp.message.desc_stats_reply(xid=req.xid, **kw))
+
+    def handle_experimenter_stats_request(self, req):
+        raise NotImplementedError()
+
+    @inlineCallbacks
+    def handle_flow_stats_request(self, req):
+        flow_stats = yield self.rpc.list_flows(self.datapath_id)
+        self.cxn.send(ofp.message.flow_stats_reply(
+            xid=req.xid, entries=[to_loxi(f) for f in flow_stats]))
+
+    def handle_group_stats_request(self, req):
+        group_stats = []  # TODO
+        self.cxn.send(ofp.message.group_stats_reply(
+            xid=req.xid, entries=group_stats))
+
+    def handle_group_descriptor_request(self, req):
+        group_list = []  # TODO
+        self.cxn.send(ofp.message.group_desc_stats_reply(
+            xid=req.xid, entries=group_list))
+
+    def handle_group_features_request(self, req):
+        raise NotImplementedError()
+
+    def handle_meter_stats_request(self, req):
+        meter_stats = []  # TODO
+        self.cxn.send(ofp.message.meter_stats_reply(
+            xid=req.xid, entries=meter_stats))
+
+    def handle_meter_config_request(self, req):
+        raise NotImplementedError()
+
+    def handle_meter_features_request(self, req):
+        raise NotImplementedError()
+
+    def handle_port_stats_request(self, req):
+        port_stats = []  # TODO
+        self.cxn.send(ofp.message.port_stats_reply(
+            xid=req.xid,entries=port_stats))
+
+    @inlineCallbacks
+    def handle_port_desc_request(self, req):
+        port_list = yield self.rpc.get_port_list(self.datapath_id)
+        self.cxn.send(ofp.message.port_desc_stats_reply(
+            xid=req.xid,
+            #flags=None,
+            entries=[to_loxi(port) for port in port_list]
+        ))
+
+    def handle_queue_stats_request(self, req):
+        raise NotImplementedError()
+
+    def handle_table_stats_request(self, req):
+        table_stats = []  # TODO
+        self.cxn.send(ofp.message.table_stats_reply(
+            xid=req.xid, entries=table_stats))
+
+    def handle_table_features_request(self, req):
+        raise NotImplementedError()
+
+    stats_handlers = {
+        ofp.OFPST_AGGREGATE: handle_aggregate_request,
+        ofp.OFPST_DESC: handle_device_description_request,
+        ofp.OFPST_EXPERIMENTER: handle_experimenter_stats_request,
+        ofp.OFPST_FLOW: handle_flow_stats_request,
+        ofp.OFPST_GROUP: handle_group_stats_request,
+        ofp.OFPST_GROUP_DESC: handle_group_descriptor_request,
+        ofp.OFPST_GROUP_FEATURES: handle_group_features_request,
+        ofp.OFPST_METER: handle_meter_stats_request,
+        ofp.OFPST_METER_CONFIG: handle_meter_config_request,
+        ofp.OFPST_METER_FEATURES: handle_meter_features_request,
+        ofp.OFPST_PORT: handle_port_stats_request,
+        ofp.OFPST_PORT_DESC: handle_port_desc_request,
+        ofp.OFPST_QUEUE: handle_queue_stats_request,
+        ofp.OFPST_TABLE: handle_table_stats_request,
+        ofp.OFPST_TABLE_FEATURES: handle_table_features_request
+    }
+
+    main_handlers = {
+        ofp.OFPT_BARRIER_REQUEST: handle_barrier_request,
+        ofp.OFPT_ECHO_REQUEST: handle_echo_request,
+        ofp.OFPT_FEATURES_REQUEST: handle_feature_request,
+        ofp.OFPT_EXPERIMENTER: handle_experimenter_request,
+        ofp.OFPT_FLOW_MOD: handle_flow_mod_request,
+        ofp.OFPT_GET_ASYNC_REQUEST: handle_get_async_request,
+        ofp.OFPT_GET_CONFIG_REQUEST: handle_get_config_request,
+        ofp.OFPT_GROUP_MOD: handle_group_mod_request,
+        ofp.OFPT_METER_MOD: handle_meter_mod_request,
+        ofp.OFPT_PACKET_OUT: handle_packet_out_request,
+        ofp.OFPT_PORT_MOD: handle_port_mod_request,
+        ofp.OFPT_QUEUE_GET_CONFIG_REQUEST: handle_queue_get_config_request,
+        ofp.OFPT_ROLE_REQUEST: handle_role_request,
+        ofp.OFPT_SET_ASYNC: handle_set_async_request,
+        ofp.OFPT_SET_CONFIG: handle_set_config_request,
+        ofp.OFPT_STATS_REQUEST: handle_stats_request,
+        ofp.OFPT_TABLE_MOD: handle_table_mod_request,
+    }
+