[ 3289 ] remove debugging statements
This commit consists of:
1) Change the flow match method in ponsim to handle multiple matches
2) Add logical PORT change callback
3) Clear all flows on a device when the device is disabled
Change-Id: I82ddc9c4555dced917dee8f13d7d4f08ca4e1d03
diff --git a/ponsim/ponsim.py b/ponsim/ponsim.py
index b888ba2..932ea89 100644
--- a/ponsim/ponsim.py
+++ b/ponsim/ponsim.py
@@ -24,13 +24,12 @@
from scapy.layers.l2 import Ether, Dot1Q
from scapy.packet import Packet
-from common.frameio.frameio import hexify
from voltha.protos import third_party
from voltha.protos.ponsim_pb2 import PonSimMetrics, PonSimPortMetrics, \
-PonSimPacketCounter
-#from voltha.protos.ponsim_pb2 import
+ PonSimPacketCounter
from voltha.core.flow_decomposer import *
from twisted.internet.task import LoopingCall
+
_ = third_party
@@ -43,87 +42,104 @@
)
+class _FlowMatchMask(object):
+ """
+ Enum of mask values based on flow match priority. For instance, a port
+ match has higher priority when match that a UDP match.
+ """
+ UDP_DST = 1
+ UDP_SRC = 2
+ IPV4_DST = 4
+ VLAN_PCP = 8
+ VLAN_VID = 16
+ IP_PROTO = 34
+ ETH_TYPE = 64
+ IN_PORT = 128
+
+
class FrameIOCounter(object):
class SingleFrameCounter(object):
- def __init__(self,name,min,max):
+ def __init__(self, name, min, max):
# Currently there are 2 values, one for the PON interface (port 1)
# and one for the Network Interface (port 2). This can be extended if
# the virtual devices extend the number of ports.
- self.value = [0,0] #{PON,NI}
+ self.value = [0, 0] # {PON,NI}
self.name = name
- self.min = min
+ self.min = min
self.max = max
def __init__(self, device):
self.device = device
- self.tx_counters = dict (
- tx_64=self.SingleFrameCounter("tx_64", 1, 64),
- tx_65_127=self.SingleFrameCounter("tx_65_127", 65, 127),
- tx_128_255=self.SingleFrameCounter("tx_128_255", 128, 255),
- tx_256_511=self.SingleFrameCounter("tx_256_511", 256, 511),
- tx_512_1023=self.SingleFrameCounter("tx_512_1023", 512, 1024),
- tx_1024_1518=self.SingleFrameCounter("tx_1024_1518", 1024, 1518),
- tx_1519_9k=self.SingleFrameCounter("tx_1519_9k", 1519, 9216),
+ self.tx_counters = dict(
+ tx_64=self.SingleFrameCounter("tx_64", 1, 64),
+ tx_65_127=self.SingleFrameCounter("tx_65_127", 65, 127),
+ tx_128_255=self.SingleFrameCounter("tx_128_255", 128, 255),
+ tx_256_511=self.SingleFrameCounter("tx_256_511", 256, 511),
+ tx_512_1023=self.SingleFrameCounter("tx_512_1023", 512, 1024),
+ tx_1024_1518=self.SingleFrameCounter("tx_1024_1518", 1024, 1518),
+ tx_1519_9k=self.SingleFrameCounter("tx_1519_9k", 1519, 9216),
)
self.rx_counters = dict(
- rx_64=self.SingleFrameCounter("rx_64", 1, 64),
- rx_65_127=self.SingleFrameCounter("rx_65_127", 65, 127),
- rx_128_255=self.SingleFrameCounter("rx_128_255", 128, 255),
- rx_256_511=self.SingleFrameCounter("rx_256_511", 256, 511),
- rx_512_1023=self.SingleFrameCounter("rx_512_1023", 512, 1024),
- rx_1024_1518=self.SingleFrameCounter("rx_1024_1518", 1024, 1518),
- rx_1519_9k=self.SingleFrameCounter("rx_1519_9k", 1519, 9216)
+ rx_64=self.SingleFrameCounter("rx_64", 1, 64),
+ rx_65_127=self.SingleFrameCounter("rx_65_127", 65, 127),
+ rx_128_255=self.SingleFrameCounter("rx_128_255", 128, 255),
+ rx_256_511=self.SingleFrameCounter("rx_256_511", 256, 511),
+ rx_512_1023=self.SingleFrameCounter("rx_512_1023", 512, 1024),
+ rx_1024_1518=self.SingleFrameCounter("rx_1024_1518", 1024, 1518),
+ rx_1519_9k=self.SingleFrameCounter("rx_1519_9k", 1519, 9216)
)
def count_rx_frame(self, port, size):
log.info("counting-rx-frame", size=size, port=port)
- for k,v in self.rx_counters.iteritems():
+ for k, v in self.rx_counters.iteritems():
if size >= v.min and size <= v.max:
- self.rx_counters[k].value[port-1] += 1
+ self.rx_counters[k].value[port - 1] += 1
return
log.warn("unsupported-packet-size", size=size)
def count_tx_frame(self, port, size):
for k, v in self.tx_counters.iteritems():
if size >= v.min and size <= v.max:
- self.tx_counters[k].value[port-1] += 1
+ self.tx_counters[k].value[port - 1] += 1
return
log.warn("unsupported-packet-size", size=size)
def log_counts(self):
- rx_ct_list = [(v.name, v.value[0], v.value[1]) for v in self.rx_counters.values()]
- tx_ct_list = [(v.name, v.value[0], v.value[1]) for v in self.tx_counters.values()]
- log.info("rx-counts",rx_ct_list=rx_ct_list)
- log.info("tx-counts",tx_ct_list=tx_ct_list)
+ rx_ct_list = [(v.name, v.value[0], v.value[1]) for v in
+ self.rx_counters.values()]
+ tx_ct_list = [(v.name, v.value[0], v.value[1]) for v in
+ self.tx_counters.values()]
+ log.info("rx-counts", rx_ct_list=rx_ct_list)
+ log.info("tx-counts", tx_ct_list=tx_ct_list)
def make_proto(self):
sim_metrics = PonSimMetrics(
- device = self.device
+ device=self.device
)
- pon_port_metrics = PonSimPortMetrics (
- port_name = "pon"
+ pon_port_metrics = PonSimPortMetrics(
+ port_name="pon"
)
- ni_port_metrics = PonSimPortMetrics (
- port_name = "nni"
+ ni_port_metrics = PonSimPortMetrics(
+ port_name="nni"
)
for c in sorted(self.rx_counters):
ctr = self.rx_counters[c]
pon_port_metrics.packets.extend([
- PonSimPacketCounter(name=ctr.name,value=ctr.value[0])
+ PonSimPacketCounter(name=ctr.name, value=ctr.value[0])
])
# Since they're identical keys, save some time and cheat
ni_port_metrics.packets.extend([
- PonSimPacketCounter(name=ctr.name,value=ctr.value[1])
+ PonSimPacketCounter(name=ctr.name, value=ctr.value[1])
])
for c in sorted(self.tx_counters):
ctr = self.tx_counters[c]
pon_port_metrics.packets.extend([
- PonSimPacketCounter(name=ctr.name,value=ctr.value[0])
+ PonSimPacketCounter(name=ctr.name, value=ctr.value[0])
])
# Since they're identical keys, save some time and cheat
ni_port_metrics.packets.extend([
- PonSimPacketCounter(name=ctr.name,value=ctr.value[1])
+ PonSimPacketCounter(name=ctr.name, value=ctr.value[1])
])
sim_metrics.metrics.extend([pon_port_metrics])
sim_metrics.metrics.extend([ni_port_metrics])
@@ -132,7 +148,6 @@
class SimDevice(object):
-
def __init__(self, name, logical_port_no):
self.name = name
self.logical_port_no = logical_port_no
@@ -154,7 +169,8 @@
forwarded = 0
links = self.links.get(egress_port)
if links is not None:
- self.counter.count_tx_frame(egress_port, len(egress_frame["Ether"].payload))
+ self.counter.count_tx_frame(egress_port,
+ len(egress_frame["Ether"].payload))
for fun in links:
forwarded += 1
self.log.debug('forwarding', egress_port=egress_port)
@@ -169,15 +185,23 @@
self.flows = sorted(flows, key=lambda fm: fm.priority, reverse=True)
def process_frame(self, ingress_port, ingress_frame):
+ matched_mask = 0
+ matched_flow = None
for flow in self.flows:
- if self.is_match(flow, ingress_port, ingress_frame):
- egress_port, egress_frame = self.process_actions(
- flow, ingress_frame)
- return egress_port, egress_frame
+ current_mask = self.is_match(flow, ingress_port, ingress_frame)
+ if current_mask > matched_mask:
+ matched_mask = current_mask
+ matched_flow = flow
+
+ if matched_flow:
+ egress_port, egress_frame = self.process_actions(
+ matched_flow, ingress_frame)
+ return egress_port, egress_frame
return None
@staticmethod
def is_match(flow, ingress_port, frame):
+ matched_mask = 0
def get_non_shim_ether_type(f):
if f.haslayer(Dot1Q):
@@ -208,41 +232,49 @@
if field.type == IN_PORT:
if field.port != ingress_port:
- return False
+ return 0
+ matched_mask |= _FlowMatchMask.IN_PORT
elif field.type == ETH_TYPE:
if field.eth_type != get_non_shim_ether_type(frame):
- return False
+ return 0
+ matched_mask |= _FlowMatchMask.ETH_TYPE
elif field.type == IP_PROTO:
if field.ip_proto != get_ip_proto(frame):
- return False
+ return 0
+ matched_mask |= _FlowMatchMask.IP_PROTO
elif field.type == VLAN_VID:
expected_vlan = field.vlan_vid
tagged = frame.haslayer(Dot1Q)
if bool(expected_vlan & 4096) != bool(tagged):
- return False
+ return 0
if tagged:
actual_vid = frame.getlayer(Dot1Q).vlan
if actual_vid != expected_vlan & 4095:
- return False
+ return 0
+ matched_mask |= _FlowMatchMask.VLAN_VID
elif field.type == VLAN_PCP:
if field.vlan_pcp != get_vlan_pcp(frame):
- return False
+ return 0
+ matched_mask |= _FlowMatchMask.VLAN_PCP
elif field.type == IPV4_DST:
if ipv4int2str(field.ipv4_dst) != get_ipv4_dst(frame):
- return False
+ return 0
+ matched_mask |= _FlowMatchMask.IPV4_DST
elif field.type == UDP_SRC:
if field.udp_src != get_udp_src(frame):
- return False
+ return 0
+ matched_mask |= _FlowMatchMask.UDP_SRC
elif field.type == UDP_DST:
if field.udp_dst != get_udp_dst(frame):
- return False
+ return 0
+ matched_mask |= _FlowMatchMask.UDP_DST
elif field.type == METADATA:
pass # safe to ignore
@@ -250,7 +282,7 @@
else:
raise NotImplementedError('field.type=%d' % field.type)
- return True
+ return matched_mask
@staticmethod
def process_actions(flow, frame):
@@ -300,7 +332,6 @@
class PonSim(object):
-
def __init__(self, onus, egress_fun):
self.egress_fun = egress_fun
@@ -312,7 +343,7 @@
self.devices[0] = self.olt
# TODO: This can be removed, it's for debugging purposes
self.lc = LoopingCall(self.olt.counter.log_counts)
- self.lc.start(90) # To correlate with Kafka
+ self.lc.start(90) # To correlate with Kafka
# Create ONUs of the requested number and hook them up with OLT
# and with egress fun
@@ -325,9 +356,11 @@
for i in range(onus):
port_no = 128 + i
onu = SimDevice('onu%d' % i, port_no)
- onu.link(1, lambda _, frame: self.olt.ingress(1, frame)) # Send to the OLT
- onu.link(2, mk_egress_fun(port_no)) # Send from the ONU to the world
- self.olt.link(1, mk_onu_ingress(onu)) # Internal send to the ONU
+ onu.link(1, lambda _, frame: self.olt.ingress(1,
+ frame)) # Send to the OLT
+ onu.link(2,
+ mk_egress_fun(port_no)) # Send from the ONU to the world
+ self.olt.link(1, mk_onu_ingress(onu)) # Internal send to the ONU
self.devices[port_no] = onu
for d in self.devices:
self.log.info("pon-sim-init", port=d, name=self.devices[d].name,
@@ -349,4 +382,3 @@
if not isinstance(frame, Packet):
frame = Ether(frame)
self.devices[port].ingress(2, frame)
-
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index 3c2e267..b1cf67b 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -25,7 +25,7 @@
from voltha.core.config.config_proxy import CallbackType
from voltha.protos.common_pb2 import AdminState, OperStatus
from voltha.registry import registry
-
+from voltha.protos.openflow_13_pb2 import Flows, FlowGroups
class InvalidStateTransition(Exception): pass
@@ -183,10 +183,20 @@
self.log.info('abandon-device', device=device, dry_run=dry_run)
raise NotImplementedError()
+ def _delete_all_flows(self):
+ """ Delete all flows on the device """
+ try:
+ self.flows_proxy.update('/', Flows(items=[]))
+ self.groups_proxy.update('/', FlowGroups(items=[]))
+ except Exception, e:
+ self.exception('flow-delete-exception', e=e)
+
@inlineCallbacks
def _disable_device(self, device, dry_run=False):
self.log.info('disable-device', device=device, dry_run=dry_run)
if not dry_run:
+ # Remove all flows before disabling device
+ self._delete_all_flows()
yield self.adapter_agent.disable_device(device)
@inlineCallbacks
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index dc5d1c0..91ba454 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -44,39 +44,40 @@
class LogicalDeviceAgent(FlowDecomposer, DeviceGraph):
def __init__(self, core, logical_device):
- self.core = core
- self.local_handler = core.get_local_handler()
- self.logical_device_id = logical_device.id
+ try:
+ self.core = core
+ self.local_handler = core.get_local_handler()
+ self.logical_device_id = logical_device.id
- self.root_proxy = core.get_proxy('/')
- self.flows_proxy = core.get_proxy(
- '/logical_devices/{}/flows'.format(logical_device.id))
- self.groups_proxy = core.get_proxy(
- '/logical_devices/{}/flow_groups'.format(logical_device.id))
- # self.port_proxy = core.get_proxy(
- # '/logical_devices/{}/ports'.format(logical_device.id))
- self.self_proxy = core.get_proxy(
- '/logical_devices/{}'.format(logical_device.id))
+ self.root_proxy = core.get_proxy('/')
+ self.flows_proxy = core.get_proxy(
+ '/logical_devices/{}/flows'.format(logical_device.id))
+ self.groups_proxy = core.get_proxy(
+ '/logical_devices/{}/flow_groups'.format(logical_device.id))
+ self.self_proxy = core.get_proxy(
+ '/logical_devices/{}'.format(logical_device.id))
- self.flows_proxy.register_callback(
- CallbackType.POST_UPDATE, self._flow_table_updated)
- self.groups_proxy.register_callback(
- CallbackType.POST_UPDATE, self._group_table_updated)
- # self.port_proxy.register_callback(
- # CallbackType.POST_UPDATE, self._port_changed)
- self.self_proxy.register_callback(
- CallbackType.POST_ADD, self._port_added)
- self.self_proxy.register_callback(
- CallbackType.POST_REMOVE, self._port_removed)
+ self.flows_proxy.register_callback(
+ CallbackType.POST_UPDATE, self._flow_table_updated)
+ self.groups_proxy.register_callback(
+ CallbackType.POST_UPDATE, self._group_table_updated)
+ self.self_proxy.register_callback(
+ CallbackType.POST_ADD, self._port_added)
+ self.self_proxy.register_callback(
+ CallbackType.POST_REMOVE, self._port_removed)
- self.event_bus = EventBusClient()
- self.packet_in_subscription = self.event_bus.subscribe(
- topic='packet-in:{}'.format(logical_device.id),
- callback=self.handle_packet_in_event)
+ self.port_proxy = {}
- self.log = structlog.get_logger(logical_device_id=logical_device.id)
+ self.event_bus = EventBusClient()
+ self.packet_in_subscription = self.event_bus.subscribe(
+ topic='packet-in:{}'.format(logical_device.id),
+ callback=self.handle_packet_in_event)
- self._routes = None
+ self.log = structlog.get_logger(logical_device_id=logical_device.id)
+
+ self._routes = None
+ except Exception, e:
+ self.log.exception('init-error', e=e)
def start(self):
self.log.debug('starting')
@@ -91,9 +92,9 @@
self.groups_proxy.unregister_callback(
CallbackType.POST_UPDATE, self._group_table_updated)
self.self_proxy.unregister_callback(
- CallbackType.POST_ADD, self._port_list_updated)
+ CallbackType.POST_ADD, self._port_added)
self.self_proxy.unregister_callback(
- CallbackType.POST_REMOVE, self._port_list_updated)
+ CallbackType.POST_REMOVE, self._port_removed)
# Remove subscription to the event bus
self.event_bus.unsubscribe(self.packet_in_subscription)
@@ -534,6 +535,14 @@
self.log.debug('port-added', port=port)
assert isinstance(port, LogicalPort)
self._port_list_updated(port)
+
+ # Set a proxy and callback for that specific port
+ self.port_proxy[port.id] = self.core.get_proxy(
+ '/logical_devices/{}/ports/{}'.format(self.logical_device_id,
+ port.id))
+ self.port_proxy[port.id].register_callback(
+ CallbackType.POST_UPDATE, self._port_changed)
+
self.local_handler.send_port_change_event(
device_id=self.logical_device_id,
port_status=ofp.ofp_port_status(
@@ -546,6 +555,12 @@
self.log.debug('port-removed', port=port)
assert isinstance(port, LogicalPort)
self._port_list_updated(port)
+
+ # Remove the proxy references
+ self.port_proxy[port.id].unregister_callback(
+ CallbackType.POST_UPDATE, self._port_changed)
+ del self.port_proxy[port.id]
+
self.local_handler.send_port_change_event(
device_id=self.logical_device_id,
port_status=ofp.ofp_port_status(
@@ -554,8 +569,8 @@
)
)
- # TODO not yet hooked up
def _port_changed(self, port):
+ self.log.debug('port-changed', port=port)
assert isinstance(port, LogicalPort)
self.local_handler.send_port_change_event(
device_id=self.logical_device_id,
diff --git a/voltha/main.py b/voltha/main.py
index 5a31f42..5f282cc 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -46,7 +46,6 @@
from voltha.registry import registry, IComponent
from common.frameio.frameio import FrameIOManager
-
VERSION = '0.9.0'
defs = dict(