Packet-in channel plumbed thru whole stack
Change-Id: I1df0265139259cc1757f29af19132c5384943c15
diff --git a/ofagent/converter.py b/ofagent/converter.py
index 7252c43..4d40692 100644
--- a/ofagent/converter.py
+++ b/ofagent/converter.py
@@ -71,6 +71,9 @@
if field_type == pb2.OFPXMT_OFB_ETH_TYPE:
loxi_match_fields.append(
of13.oxm.eth_type(value=ofb_field['eth_type']))
+ elif field_type == pb2.OFPXMT_OFB_IN_PORT:
+ loxi_match_fields.append(
+ of13.oxm.in_port(value=ofb_field['port']))
else:
raise NotImplementedError(
'OXM match field for type %s' % field_type)
@@ -102,11 +105,18 @@
del kw['id']
return of13.flow_stats_entry(**kw)
+
def ofp_packet_in_to_loxi_packet_in(pb):
- kw = pb2dict(pb)
- if 'match' in kw:
- kw['match'] = make_loxi_match(kw['match'])
- return of13.message.packet_in(**kw)
+ packet_in = of13.message.packet_in(
+ buffer_id=pb.buffer_id,
+ reason=pb.reason,
+ table_id=pb.table_id,
+ cookie=pb.cookie,
+ match=make_loxi_match(pb2dict(pb.match)),
+ data=pb.data
+ )
+ return packet_in
+
def ofp_group_entry_to_loxi_group_entry(pb):
return of13.group_stats_entry(
diff --git a/ofagent/of_protocol_handler.py b/ofagent/of_protocol_handler.py
index 5ac8712..86f8cb2 100644
--- a/ofagent/of_protocol_handler.py
+++ b/ofagent/of_protocol_handler.py
@@ -257,6 +257,7 @@
}
def forward_packet_in(self, ofp_packet_in):
+ log.info('sending-packet-in', ofp_packet_in=ofp_packet_in)
self.cxn.send(to_loxi(ofp_packet_in))
def forward_port_status(self, ofp_port_status):
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index bb40eb0..e59d5dd 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -228,4 +228,13 @@
:return:
"""
+ def send_packet_in(logical_device_id, logical_port_no, packet):
+ """
+ Forward given packet to the northbound toward an SDN controller.
+ :param device_id: logical device identifier
+ :param logical_port_no: logical port_no (as numbered in openflow)
+ :param packet: the actual packet; can be a serialized string or a scapy
+ Packet.
+ :return: None returned on success
+ """
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index 8dc8fa2..d407c4b 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -21,6 +21,7 @@
import structlog
from klein import Klein
+from scapy.layers.l2 import Ether, EAPOL, Padding
from twisted.internet import endpoints
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
@@ -234,6 +235,9 @@
)
self.adapter_agent.add_logical_port(ld.id, port)
+ olt.parent_id = ld.id
+ self.adapter_agent.update_device(olt)
+
@inlineCallbacks
def _simulate_device_activation(self, device):
@@ -371,12 +375,26 @@
# ~~~~~~~~~~~~~~~~~~~~ Embedded test Klein rest server ~~~~~~~~~~~~~~~~~~~~
- @app.route('/devices/<string:id>/detect_onus')
- def detect_onu(self, request, **kw):
- log.info('detect-onus', request=request, **kw)
- device_id = kw['id']
+ def get_test_control_site(self):
+ return Site(self.app.resource())
+
+ @app.route('/devices/<string:device_id>/detect_onus')
+ def detect_onus(self, request, device_id):
+ log.info('detect-onus', request=request, device_id=device_id)
self._simulate_detection_of_onus(device_id)
return '{"status": "OK"}'
- def get_test_control_site(self):
- return Site(self.app.resource())
+ @app.route('/devices/<string:device_id>/test_eapol_in')
+ def test_eapol_in(self, request, device_id):
+ """Simulate a packet in message posted upstream"""
+ log.info('test_eapol_in', request=request, device_id=device_id)
+ eapol_start = str(
+ (Ether(src='00:11:22:33:44:55') / EAPOL(type=1))
+ / Padding(load=42*'\x00')
+ )
+
+ device = self.adapter_agent.get_device(device_id)
+ self.adapter_agent.send_packet_in(logical_device_id=device.parent_id,
+ logical_port_no=1,
+ packet=eapol_start)
+ return '{"status": "sent"}'
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 83b4e89..2a6c61b 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -20,6 +20,7 @@
from uuid import uuid4
import structlog
+from scapy.packet import Packet
from twisted.internet.defer import inlineCallbacks, returnValue
from zope.interface import implementer
@@ -251,3 +252,15 @@
def receive_proxied_message(self, proxy_address, msg):
topic = self._gen_rx_proxy_address_topic(proxy_address)
self.event_bus.publish(topic, msg)
+
+ # ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~
+
+ def send_packet_in(self, logical_device_id, logical_port_no, packet):
+ self.log.debug('send-packet-in', logical_device_id=logical_device_id,
+ logical_port_no=logical_port_no, packet=packet)
+
+ if isinstance(packet, Packet):
+ packet = str(packet)
+
+ topic = 'packet-in:' + logical_device_id
+ self.event_bus.publish(topic, (logical_port_no, packet))
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index ec06379..afc4ef3 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -22,6 +22,7 @@
import structlog
+from common.event_bus import EventBusClient
from voltha.core.config.config_proxy import CallbackType
from voltha.core.device_graph import DeviceGraph
from voltha.core.flow_decomposer import FlowDecomposer, \
@@ -45,7 +46,6 @@
def __init__(self, core, logical_device):
self.core = core
self.local_handler = core.get_local_handler()
- self.grpc_server = registry('grpc_server')
self.logical_device_id = logical_device.id
self.root_proxy = core.get_proxy('/')
@@ -65,14 +65,15 @@
self.self_proxy.register_callback(
CallbackType.POST_REMOVE, self._port_removed)
+ self.event_bus = EventBusClient()
+ self.packet_in_subscription = self.event_bus.subscribe(
+ topic='packet-in:{}'.format(logical_device.id),
+ callback=self.handle_packet_in_event)
+
self.log = structlog.get_logger(logical_device_id=logical_device.id)
self._routes = None
- self.log = structlog.get_logger(logical_device_id=logical_device.id)
-
- self.log = structlog.get_logger(logical_device_id=logical_device.id)
-
def start(self):
self.log.debug('starting')
self.log.info('started')
@@ -127,10 +128,8 @@
self.flow_modify_strict(flow_mod)
else:
- self.log.warn('unhandled-flow-mod', command=command, flow_mod=flow_mod)
-
- # def list_flows(self):
- # return self.flows
+ self.log.warn('unhandled-flow-mod',
+ command=command, flow_mod=flow_mod)
def update_group_table(self, group_mod):
@@ -146,13 +145,10 @@
self.group_modify(group_mod)
else:
- self.log.warn('unhandled-group-mod', command=command,
- group_mod=group_mod)
+ self.log.warn('unhandled-group-mod',
+ command=command, group_mod=group_mod)
- def list_groups(self):
- return self.groups.values()
-
- ## <=============== LOW LEVEL FLOW HANDLERS ==============================>
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL FLOW HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
def flow_add(self, mod):
assert isinstance(mod, ofp.ofp_flow_mod)
@@ -379,7 +375,7 @@
return bool(to_delete), flows
- ## <=============== LOW LEVEL GROUP HANDLERS =============================>
+ # ~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL GROUP HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~~~~
def group_add(self, group_mod):
assert isinstance(group_mod, ofp.ofp_group_mod)
@@ -450,7 +446,7 @@
if changed:
self.groups_proxy.update('/', FlowGroups(items=groups.values()))
- ## <=============== PACKET_OUT ===========================================>
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_OUT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def packet_out(self, ofp_packet_out):
self.log.debug('packet-out', packet=ofp_packet_out)
@@ -464,14 +460,36 @@
data=ofp_packet_out.data
))
- ## <=============== PACKET_IN ============================================>
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_IN ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def handle_packet_in_event(self, _, msg):
+ self.log.debug('handle-packet-in', msg=msg)
+ logical_port_no, packet = msg
+ packet_in = ofp.ofp_packet_in(
+ # buffer_id=0,
+ reason=ofp.OFPR_ACTION,
+ # table_id=0,
+ # cookie=0,
+ match=ofp.ofp_match(
+ type=ofp.OFPMT_OXM,
+ oxm_fields=[
+ ofp.ofp_oxm_field(
+ oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+ ofb_field=in_port(logical_port_no)
+ )
+ ]
+ ),
+ data=packet
+ )
+ self.packet_in(packet_in)
def packet_in(self, ofp_packet_in):
# TODO
print 'PACKET_IN:', ofp_packet_in
- self.grpc_server.send_packet_in(self.logical_device_id, ofp_packet_in)
+ self.local_handler.send_packet_in(
+ self.logical_device_id, ofp_packet_in)
- ## <======================== FLOW TABLE UPDATE HANDLING ===================
+ # ~~~~~~~~~~~~~~~~~~~~~ FLOW TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
def _flow_table_updated(self, flows):
self.log.debug('flow-table-updated',
@@ -490,7 +508,7 @@
self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
FlowGroups(items=groups.values()))
- ## <======================= GROUP TABLE UPDATE HANDLING ===================
+ # ~~~~~~~~~~~~~~~~~~~~ GROUP TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
def _group_table_updated(self, flow_groups):
self.log.debug('group-table-updated',
@@ -505,7 +523,7 @@
self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
FlowGroups(items=groups.values()))
- ## <==================== APIs NEEDED BY FLOW DECOMPOSER ===================
+ # ~~~~~~~~~~~~~~~~~~~ APIs NEEDED BY FLOW DECOMPOSER ~~~~~~~~~~~~~~~~~~~~~~
def _port_added(self, port):
assert isinstance(port, LogicalPort)