Maple OLT and Broadcom ONU adapter flow rule parsing.

Updates to Maple OLT and Broadcom ONU adapters to support parsing
of flow based rules and apply upstream controller bound rules
to OLT.

Change-Id: I9d19d3f511ce7437cc5d89574201ff278509aa9c
diff --git a/voltha/adapters/broadcom_onu/broadcom_onu.py b/voltha/adapters/broadcom_onu/broadcom_onu.py
index 512718f..212c0cf 100644
--- a/voltha/adapters/broadcom_onu/broadcom_onu.py
+++ b/voltha/adapters/broadcom_onu/broadcom_onu.py
@@ -26,6 +26,7 @@
 
 from voltha.adapters.interface import IAdapterInterface
 from voltha.core.logical_device_agent import mac_str_to_tuple
+import voltha.core.flow_decomposer as fd
 from voltha.protos import third_party
 from voltha.protos.adapter_pb2 import Adapter
 from voltha.protos.adapter_pb2 import AdapterConfig
@@ -35,7 +36,7 @@
 from voltha.protos.health_pb2 import HealthStatus
 from voltha.protos.logical_device_pb2 import LogicalPort
 from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, OFPPF_1GB_FD
-from voltha.protos.openflow_13_pb2 import ofp_port
+from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, ofp_port
 from common.frameio.frameio import hexify
 from voltha.extensions.omci.omci import *
 
@@ -104,7 +105,7 @@
                   flows=flows, groups=groups)
         assert len(groups.items) == 0
         handler = self.devices_handlers[device.proxy_address.channel_id]
-        return handler.update_flow_table(flows.items)
+        return handler.update_flow_table(device, flows.items)
 
     def update_flows_incrementally(self, device, flow_changes, group_changes):
         raise NotImplementedError()
@@ -214,21 +215,131 @@
         self.adapter_agent.update_device(device)
 
     @inlineCallbacks
-    def update_flow_table(self, flows):
+    def update_flow_table(self, device, flows):
+        #
+        # We need to proxy through the OLT to get to the ONU
+        # Configuration from here should be using OMCI
+        #
+        self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
 
-        # we need to proxy through the OLT to get to the ONU
+        def is_downstream(port):
+            return port == 2  # Need a better way
 
-        # reset response queue
-        while self.incoming_messages.pending:
-            yield self.incoming_messages.get()
+        def is_upstream(port):
+            return not is_downstream(port)
 
-        msg = FlowTable(
-            port=self.proxy_address.channel_id,
-            flows=flows
-        )
-        self.adapter_agent.send_proxied_message(self.proxy_address, msg)
+        for flow in flows:
+            try:
+                _in_port = fd.get_in_port(flow)
+                assert _in_port is not None
 
-        yield self.incoming_messages.get()
+                if is_downstream(_in_port):
+                    self.log.info('downstream-flow')
+                elif is_upstream(_in_port):
+                    self.log.info('upstream-flow')
+                else:
+                    raise Exception('port should be 1 or 2 by our convention')
+
+                _out_port = fd.get_out_port(flow)  # may be None
+                self.log.info('out-port', out_port=_out_port)
+
+                for field in fd.get_ofb_fields(flow):
+                    if field.type == fd.ETH_TYPE:
+                        _type = field.eth_type
+                        self.log.info('field-type-eth-type',
+                                      eth_type=_type)
+
+                    elif field.type == fd.IP_PROTO:
+                        _proto = field.ip_proto
+                        self.log.info('field-type-ip-proto',
+                                      ip_proto=_proto)
+
+                    elif field.type == fd.IN_PORT:
+                        _port = field.port
+                        self.log.info('field-type-in-port',
+                                      in_port=_port)
+
+                    elif field.type == fd.VLAN_VID:
+                        _vlan_vid = field.vlan_vid & 0xfff
+                        self.log.info('field-type-vlan-vid',
+                                      vlan=_vlan_vid)
+
+                    elif field.type == fd.VLAN_PCP:
+                        _vlan_pcp = field.vlan_pcp
+                        self.log.info('field-type-vlan-pcp',
+                                      pcp=_vlan_pcp)
+
+                    elif field.type == fd.UDP_DST:
+                        _udp_dst = field.udp_dst
+                        self.log.info('field-type-udp-dst',
+                                      udp_dst=_udp_dst)
+
+                    elif field.type == fd.UDP_SRC:
+                        _udp_src = field.udp_src
+                        self.log.info('field-type-udp-src',
+                                      udp_src=_udp_src)
+
+                    elif field.type == fd.IPV4_DST:
+                        _ipv4_dst = field.ipv4_dst
+                        self.log.info('field-type-ipv4-dst',
+                                      ipv4_dst=_ipv4_dst)
+
+                    elif field.type == fd.IPV4_SRC:
+                        _ipv4_src = field.ipv4_src
+                        self.log.info('field-type-ipv4-src',
+                                      ipv4_dst=_ipv4_src)
+
+                    elif field.type == fd.METADATA:
+                        _metadata = field.metadata
+                        self.log.info('field-type-metadata',
+                                      metadata=_metadata)
+
+                    else:
+                        raise NotImplementedError('field.type={}'.format(
+                            field.type))
+
+                for action in fd.get_actions(flow):
+
+                    if action.type == fd.OUTPUT:
+                        _output = action.output.port
+                        self.log.info('action-type-output',
+                                      output=_output, in_port=_in_port)
+
+                    elif action.type == fd.POP_VLAN:
+                        self.log.info('action-type-pop-vlan',
+                                      in_port=_in_port)
+
+                    elif action.type == fd.PUSH_VLAN:
+                        _push_tpid = action.push.ethertype
+                        log.info('action-type-push-vlan',
+                                 push_tpid=_push_tpid, in_port=_in_port)
+                        if action.push.ethertype != 0x8100:
+                            self.log.error('unhandled-tpid',
+                                           ethertype=action.push.ethertype)
+
+                    elif action.type == fd.SET_FIELD:
+                        _field = action.set_field.field.ofb_field
+                        assert (action.set_field.field.oxm_class ==
+                                OFPXMC_OPENFLOW_BASIC)
+                        self.log.info('action-type-set-field',
+                                      field=_field, in_port=_in_port)
+                        if _field.type == fd.VLAN_VID:
+                            self.log.info('set-field-type-valn-vid',
+                                          vlan_vid=_field.vlan_vid & 0xfff)
+                        else:
+                            self.log.error('unsupported-action-set-field-type',
+                                           field_type=_field.type)
+                    else:
+                        log.error('unsupported-action-type',
+                                  action_type=action.type, in_port=_in_port)
+
+                #
+                # All flows created from ONU adapter should be OMCI based
+                #
+
+            except Exception as e:
+                log.exception('failed-to-install-flow', e=e, flow=flow)
+
 
     def get_tx_id(self):
         self.tx_id += 1
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 83906f5..804c312 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -31,6 +31,7 @@
 
 from voltha.adapters.interface import IAdapterInterface
 from voltha.core.logical_device_agent import mac_str_to_tuple
+import voltha.core.flow_decomposer as fd
 from voltha.protos import third_party
 from voltha.protos.adapter_pb2 import Adapter
 from voltha.protos.adapter_pb2 import AdapterConfig
@@ -43,7 +44,7 @@
 from voltha.protos.logical_device_pb2 import LogicalPort, LogicalDevice
 from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, OFPPF_1GB_FD, \
     OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
-    ofp_switch_features, ofp_desc, ofp_port
+    OFPP_CONTROLLER, OFPXMC_OPENFLOW_BASIC, ofp_switch_features, ofp_desc, ofp_port
 from voltha.registry import registry
 from voltha.extensions.omci.omci import *
 
@@ -147,9 +148,9 @@
     def update_flows_bulk(self, device, flows, groups):
         log.info('bulk-flow-update', device_id=device.id,
                  flows=flows, groups=groups)
-        assert len(groups.items) == 0
+        assert len(groups.items) == 0, "Cannot yet deal with groups"
         handler = self.devices_handlers[device.id]
-        return handler.update_flow_table(flows.items)
+        return handler.update_flow_table(flows.items, device)
 
     def update_flows_incrementally(self, device, flow_changes, group_changes):
         raise NotImplementedError()
@@ -220,12 +221,11 @@
             self.log.info('set-remote-exception', exc=str(e))
 
     @inlineCallbacks
-    def send_config_classifier(self, olt_no, etype, ip_proto=None, src_port=None, dst_port=None):
+    def send_config_classifier(self, olt_no, etype, ip_proto=None, dst_port=None):
         self.log.info('configuring-classifier',
                       olt=olt_no,
                       etype=etype,
                       ip_proto=ip_proto,
-                      src_port=src_port,
                       dst_port=dst_port)
         try:
             remote = self.get_channel()
@@ -233,20 +233,18 @@
                                            olt_no,
                                            etype,
                                            ip_proto,
-                                           src_port,
                                            dst_port)
             self.log.info('configured-classifier', data=data)
         except Exception as e:
             self.log.info('config-classifier-exception', exc=str(e))
 
     @inlineCallbacks
-    def send_config_acflow(self, olt_no, onu_no, etype, ip_proto=None, src_port=None, dst_port=None):
+    def send_config_acflow(self, olt_no, onu_no, etype, ip_proto=None, dst_port=None):
         self.log.info('configuring-acflow',
                       olt=olt_no,
                       onu=onu_no,
                       etype=etype,
                       ip_proto=ip_proto,
-                      src_port=src_port,
                       dst_port=dst_port)
         try:
             remote = self.get_channel()
@@ -255,7 +253,6 @@
                                            onu_no,
                                            etype,
                                            ip_proto,
-                                           src_port,
                                            dst_port)
 
             self.log.info('configured-acflow', data=data)
@@ -403,9 +400,6 @@
             self.log.info('get-channel-exception', exc=str(e))
 
         yield self.send_set_remote()
-        yield self.send_config_classifier(0, 0x888e)
-        yield self.send_config_classifier(0, 0x800, 2)
-        yield self.send_config_classifier(0, 0x800, 17, 68, 67)
         yield self.send_connect_olt(0)
         yield self.send_activate_olt(0)
 
@@ -435,7 +429,7 @@
         ld = LogicalDevice(
             # not setting id and datapth_id will let the adapter agent pick id
             desc=ofp_desc(
-                mfr_desc='cord porject',
+                mfr_desc='cord project',
                 hw_desc='n/a',
                 sw_desc='logical device for Maple-based PON',
                 serial_num=uuid4().hex,
@@ -483,9 +477,6 @@
         # register ONUS per uni port until done asynchronously
         for onu_no in [1]:
             vlan_id = self.get_vlan_from_onu(onu_no)
-            yield self.send_config_acflow(0, onu_no, 0x888e)
-            yield self.send_config_acflow(0, onu_no, 0x0800, 2)
-            yield self.send_config_acflow(0, onu_no, 0x800, 17, 68, 67)
             yield self.send_create_onu(0, onu_no, '4252434d', '12345678')
             yield self.send_configure_alloc_id(0, onu_no, vlan_id)
             yield self.send_configure_unicast_gem(0,onu_no, vlan_id)
@@ -509,7 +500,7 @@
             self.interface, self.rcv_io, is_inband_frame)
 
     def rcv_io(self, port, frame):
-        self.log.info('reveived', iface_name=port.iface_name,
+        self.log.info('received', iface_name=port.iface_name,
                       frame_len=len(frame))
         pkt = Ether(frame)
         if pkt.haslayer(Dot1Q):
@@ -530,8 +521,142 @@
                 self.adapter_agent.send_packet_in(
                     packet=str(popped_frame), **kw)
 
-    def update_flow_table(self, flows):
-        self.log.info('pushing-olt-flow-table')
+    @inlineCallbacks
+    def update_flow_table(self, flows, device):
+        self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
+
+        def is_downstream(port):
+            return port == 2  # Need a better way
+
+        def is_upstream(port):
+            return not is_downstream(port)
+
+        for flow in flows:
+            _type = None
+            _ip_proto = None
+            _port = None
+            _vlan_vid = None
+            _udp_dst = None
+            _udp_src = None
+            _ipv4_dst = None
+            _ipv4_src = None
+            _metadata = None
+            _output = None
+            _push_tpid = None
+            _field = None
+
+            try:
+                _in_port = fd.get_in_port(flow)
+                assert _in_port is not None
+
+                if is_downstream(_in_port):
+                    self.log.info('downstream-flow')
+                elif is_upstream(_in_port):
+                    self.log.info('upstream-flow')
+                else:
+                    raise Exception('port should be 1 or 2 by our convention')
+
+                _out_port = fd.get_out_port(flow)  # may be None
+                self.log.info('out-port', out_port=_out_port)
+
+                for field in fd.get_ofb_fields(flow):
+
+                    if field.type == fd.ETH_TYPE:
+                        _type = field.eth_type
+                        self.log.info('field-type-eth-type',
+                                      eth_type=_type)
+
+                    elif field.type == fd.IP_PROTO:
+                        _ip_proto = field.ip_proto
+                        self.log.info('field-type-ip-proto',
+                                      ip_proto=_ip_proto)
+
+                    elif field.type == fd.IN_PORT:
+                        _port = field.port
+                        self.log.info('field-type-in-port',
+                                      in_port=_port)
+
+                    elif field.type == fd.VLAN_VID:
+                        _vlan_vid = field.vlan_vid & 0xfff
+                        self.log.info('field-type-vlan-vid',
+                                      vlan=_vlan_vid)
+
+                    elif field.type == fd.VLAN_PCP:
+                        _vlan_pcp = field.vlan_pcp
+                        self.log.info('field-type-vlan-pcp',
+                                      pcp=_vlan_pcp)
+
+                    elif field.type == fd.UDP_DST:
+                        _udp_dst = field.udp_dst
+                        self.log.info('field-type-udp-dst',
+                                      udp_dst=_udp_dst)
+
+                    elif field.type == fd.UDP_SRC:
+                        _udp_src = field.udp_src
+                        self.log.info('field-type-udp-src',
+                                      udp_src=_udp_src)
+
+                    elif field.type == fd.IPV4_DST:
+                        _ipv4_dst = field.ipv4_dst
+                        self.log.info('field-type-ipv4-dst',
+                                      ipv4_dst=_ipv4_dst)
+
+                    elif field.type == fd.IPV4_SRC:
+                        _ipv4_src = field.ipv4_src
+                        self.log.info('field-type-ipv4-src',
+                                      ipv4_dst=_ipv4_src)
+
+                    elif field.type == fd.METADATA:
+                        _metadata = field.metadata
+                        self.log.info('field-type-metadata',
+                                      metadata=_metadata)
+
+                    else:
+                        raise NotImplementedError('field.type={}'.format(
+                            field.type))
+
+                for action in fd.get_actions(flow):
+
+                    if action.type == fd.OUTPUT:
+                        _output = action.output.port
+                        self.log.info('action-type-output',
+                                      output=_output, in_port=_in_port)
+
+                    elif action.type == fd.POP_VLAN:
+                        self.log.info('action-type-pop-vlan',
+                                      in_port=_in_port)
+
+                    elif action.type == fd.PUSH_VLAN:
+                        _push_tpid = action.push.ethertype
+                        log.info('action-type-push-vlan',
+                                 push_tpid=_push_tpid, in_port=_in_port)
+                        if action.push.ethertype != 0x8100:
+                            self.log.error('unhandled-tpid',
+                                           ethertype=action.push.ethertype)
+
+                    elif action.type == fd.SET_FIELD:
+                        _field = action.set_field.field.ofb_field
+                        assert (action.set_field.field.oxm_class ==
+                                OFPXMC_OPENFLOW_BASIC)
+                        self.log.info('action-type-set-field',
+                                      field=_field, in_port=_in_port)
+                        if _field.type == fd.VLAN_VID:
+                            self.log.info('et-field-type-valn-vid',
+                                          vlan_vid=_field.vlan_vid & 0xfff)
+                        else:
+                            self.log.error('unsupported-action-set-field-type',
+                                           field_type=_field.type)
+                    else:
+                        log.error('unsupported-action-type',
+                                  action_type=action.type, in_port=_in_port)
+
+                if is_upstream(_in_port):
+                    yield self.send_config_classifier(0, _type, _ip_proto, _udp_dst)
+                    yield self.send_config_acflow(0, _in_port, _type, _ip_proto, _udp_dst)
+
+            except Exception as e:
+                log.exception('failed-to-install-flow', e=e, flow=flow)
+
 
     @inlineCallbacks
     def send_proxied_message(self, proxy_address, msg):