1)flow table is improved for OpenOlt device adding more information e.g.
flow_id, flow_category, flow_type, gemport_id, alloc_id, o_pbits, intf_onu_id
2)exit command added
Change-Id: Ia7c8e2ad67455a78d99b1c439965c6c58df3b59e
diff --git a/cli/device.py b/cli/device.py
index 44407e2..4456a4c 100644
--- a/cli/device.py
+++ b/cli/device.py
@@ -30,16 +30,25 @@
from voltha.protos import voltha_pb2, common_pb2
import sys
import json
+import unicodedata
+import ast
from google.protobuf.json_format import MessageToDict
# Since proto3 won't send fields that are set to 0/false/"" any object that
# might have those values set in them needs to be replicated here such that the
# fields can be adequately
+FLOW_ID_INFO_PATH = '{}/{}/flow_id_info/{}'
+FLOW_IDS_PATH = '{}/{}/flow_ids'
+technology = 'xgspon'
+PATH_PREFIX0 = 'service/voltha/resource_manager/{}'
+PATH_PREFIX = PATH_PREFIX0.format(technology)
+PATH = '{}/{}'
+
class DeviceCli(Cmd):
- def __init__(self, device_id, get_stub):
+ def __init__(self, device_id, get_stub, etcd):
Cmd.__init__(self)
self.get_stub = get_stub
self.device_id = device_id
@@ -47,6 +56,7 @@
self.colorize('device {}'.format(device_id), 'red'), 'bold') + ') '
self.pm_config_last = None
self.pm_config_dirty = False
+ self.etcd = etcd
def cmdloop(self):
self._cmdloop()
@@ -347,15 +357,99 @@
omit_fields, self.poutput, dividers=100,
show_nulls=True)
+ def get_flow_id(self, id_str):
+ # original representation of the flow id
+ # there is a mask for upstream flow ids of openolt adapter as 0x1 << 15 | flow_id
+ # ponsim and other flow does not need any modification
+ if int(id_str) >= 0x1 << 15:
+ flow_id = int(id_str) ^ 0x1 << 15
+ else:
+ flow_id = int(id_str)
+
+ return flow_id
+
+ def flow_exist(self, pon_intf_onu_id, flow_id):
+ # checks whether the flow still exists in ETCD or not
+ flow_ids_path = FLOW_IDS_PATH.format(self.device_id, pon_intf_onu_id)
+ path_to_flow_ids = PATH.format(PATH_PREFIX, flow_ids_path)
+ (flow_ids, _) = self.etcd.get(path_to_flow_ids)
+ if flow_ids is None:
+ return False
+ else:
+ if flow_id in eval(flow_ids):
+ return True
+ else:
+ return False
+
+ def update_repeated_ids_dict(self,flow_id, repeated_ids):
+ # updates how many times an id is seen
+ if str(flow_id) in repeated_ids:
+ repeated_ids[str(flow_id)] += 1
+ else:
+ repeated_ids[str(flow_id)] = 1
+ return repeated_ids
+
+ def get_flow_index(self,flow, flow_info_all):
+ if 'flow_store_cookie' in flow:
+ for i, flow_info in enumerate(flow_info_all):
+ if unicodedata.normalize('NFKD', flow['flow_store_cookie']).encode('ascii', 'ignore') == flow_info['flow_store_cookie']:
+ return i
+ return None
+ else: #only one flow or those flows that are not added to device
+ return 0
+
def do_flows(self, line):
"""Show flow table for device"""
device = pb2dict(self.get_device(-1))
+ flows_info = list()
+ flows = device['flows']['items']
+ for i, flow in enumerate(flows):
+ flow_info = dict()
+ if unicodedata.normalize('NFKD', device['type']).encode('ascii', 'ignore') == 'openolt':
+ flow_id = self.get_flow_id(flow['id'])
+ else:
+ flow_id = int(flow['id'])
+
+ flow_info.update({'flow_id' : str(flow_id)})
+
+ if 'intf_tuple' in flow and len(flow['intf_tuple']) > 0: # we have extra flow info in ETCD!!!
+ pon_intf_onu_id = unicodedata.normalize('NFKD', flow['intf_tuple'][0]).encode('ascii', 'ignore')
+ flow_info.update({'pon_intf_onu_id': pon_intf_onu_id})
+
+ # check if the flow info still exists in ETCD
+ if self.flow_exist(pon_intf_onu_id, flow_id):
+ flow_id_info_path = FLOW_ID_INFO_PATH.format(self.device_id, pon_intf_onu_id, flow_id)
+ path_to_flow_info = PATH.format(PATH_PREFIX, flow_id_info_path)
+ (flow_info_all, _) = self.etcd.get(path_to_flow_info)
+ flow_info_all = ast.literal_eval(flow_info_all)
+ flow_info_index = self.get_flow_index(flow, flow_info_all)
+
+ if flow_info_index is not None:
+ flow_info_all = flow_info_all[flow_info_index]
+ if 'gemport_id' in flow_info_all:
+ flow_info.update({'gemport_id': flow_info_all['gemport_id']})
+
+ if 'alloc_id' in flow_info_all:
+ flow_info.update({'alloc_id': flow_info_all['alloc_id']})
+
+ if 'flow_type' in flow_info_all:
+ flow_info.update({'flow_type': flow_info_all['flow_type']})
+
+ if 'o_pbits' in flow_info_all['classifier']:
+ flow_info.update({'o_pbits': flow_info_all['classifier']['o_pbits']})
+
+ if 'flow_category' in flow_info_all:
+ flow_info.update({'flow_category': flow_info_all['flow_category']})
+
+ flows_info.append(flow_info)
print_flows(
'Device',
self.device_id,
type=device['type'],
flows=device['flows']['items'],
- groups=device['flow_groups']['items']
+ groups=device['flow_groups']['items'],
+ flows_info=flows_info,
+ fields_to_omit = ['table_id', 'goto-table']
)
def do_images(self, line):
diff --git a/cli/main.py b/cli/main.py
index 3a7dfed..3a30a0d 100755
--- a/cli/main.py
+++ b/cli/main.py
@@ -21,6 +21,7 @@
from optparse import make_option
from time import sleep, time
+import etcd3
import grpc
import requests
from cmd2 import Cmd, options
@@ -43,6 +44,7 @@
defs = dict(
# config=os.environ.get('CONFIG', './cli.yml'),
+ etcd=os.environ.get('ETCD', 'etcd-cluster.default.svc.cluster.local:2379'),
consul=os.environ.get('CONSUL', 'localhost:8500'),
voltha_grpc_endpoint=os.environ.get('VOLTHA_GRPC_ENDPOINT',
'localhost:50055'),
@@ -56,7 +58,7 @@
__ _____| | |_| |_ __ _ / __| | |_ _|
\ V / _ \ | _| ' \/ _` | | (__| |__ | |
\_/\___/_|\__|_||_\__,_| \___|____|___|
-(to exit type quit or hit Ctrl-D)
+(to exit type quit or exit or hit Ctrl-D)
"""
@@ -88,10 +90,11 @@
del Cmd.do_load
del Cmd.do__relative_load
- def __init__(self, voltha_grpc, voltha_sim_rest, global_request=False):
+ def __init__(self, voltha_grpc, voltha_sim_rest, etcd, global_request=False):
VolthaCli.voltha_grpc = voltha_grpc
VolthaCli.voltha_sim_rest = voltha_sim_rest
VolthaCli.global_request = global_request
+ VolthaCli.etcd = etcd
Cmd.__init__(self)
self.prompt = '(' + self.colorize(
self.colorize(self.prompt, 'blue'), 'bold') + ') '
@@ -154,6 +157,10 @@
while self.history:
self.history.pop()
+ def do_exit(self,line):
+ """exit from CLI"""
+ quit()
+
def do_launch(self, line):
"""If Voltha is not running yet, launch it"""
raise NotImplementedError('not implemented yet')
@@ -220,7 +227,7 @@
self.poutput( self.colorize('Error: ', 'red') +
'There is no such device')
raise Exception('<device-id> is not a valid one')
- sub = DeviceCli(device_id, self.get_stub)
+ sub = DeviceCli(device_id, self.get_stub, self.etcd)
sub.cmdloop()
def do_logical_device(self, line):
@@ -868,6 +875,10 @@
parser.add_argument(
'-C', '--consul', action='store', default=defs['consul'], help=_help)
+ _help = '<hostname>:<port> to etcd container (default: %s)' % defs['etcd']
+ parser.add_argument(
+ '-E', '--etcd', action='store', default=defs['etcd'], help=_help)
+
_help = 'Lookup Voltha endpoints based on service entries in Consul'
parser.add_argument(
'-L', '--lookup', action='store_true', help=_help)
@@ -908,7 +919,10 @@
args.sim_rest_endpoint = '{}:{}'.format(services[0]['ServiceAddress'],
services[0]['ServicePort'])
- c = VolthaCli(args.grpc_endpoint, args.sim_rest_endpoint,
+ host = args.etcd.split(':')[0].strip()
+ port = int(args.etcd.split(':')[1].strip())
+ etcd = etcd3.client(host=host, port=port)
+ c = VolthaCli(args.grpc_endpoint, args.sim_rest_endpoint, etcd,
args.global_request)
c.poutput(banner)
c.load_history()
diff --git a/cli/utils.py b/cli/utils.py
index 82d6a12..ea190a1 100644
--- a/cli/utils.py
+++ b/cli/utils.py
@@ -111,7 +111,7 @@
}
-def print_flows(what, id, type, flows, groups, printfn=_printfn):
+def print_flows(what, id, type, flows, groups, printfn=_printfn, flows_info=[], fields_to_omit=[]):
header = ''.join([
'{} '.format(what),
@@ -124,9 +124,31 @@
table = TablePrinter()
for i, flow in enumerate(flows):
- table.add_cell(i, 0, 'table_id', value=str(flow['table_id']))
- table.add_cell(i, 1, 'priority', value=str(flow['priority']))
- table.add_cell(i, 2, 'cookie', p_cookie(flow['cookie']))
+ if flows_info:
+ flow_info = flows_info[i]
+ else:
+ flow_info = dict()
+
+ if 'table_id' not in fields_to_omit:
+ table.add_cell(i, 0, 'table_id', value=str(flow['table_id']))
+ if 'flow_id' not in fields_to_omit and 'flow_id' in flow_info:
+ table.add_cell(i, 1, 'flow_id', value=str(flow_info['flow_id']))
+ if 'flow_category' not in fields_to_omit and 'flow_category' in flow_info:
+ table.add_cell(i, 2, 'flow_category', value=str(flow_info['flow_category']))
+ if 'flow_type' not in fields_to_omit and 'flow_type' in flow_info:
+ table.add_cell(i, 3, 'flow_type', value=str(flow_info['flow_type']))
+ if 'priority' not in fields_to_omit:
+ table.add_cell(i, 4, 'priority', value=str(flow['priority']))
+ if 'gemport_id' not in fields_to_omit and 'gemport_id' in flow_info:
+ table.add_cell(i, 5, 'gemport_id', value=str(flow_info['gemport_id']))
+ if 'alloc_id' not in fields_to_omit and 'alloc_id' in flow_info:
+ table.add_cell(i, 6, 'alloc_id', value=str(flow_info['alloc_id']))
+ if 'o_pbits' not in fields_to_omit and 'o_pbits' in flow_info:
+ table.add_cell(i, 7, 'o_pbits', value=str(flow_info['o_pbits']))
+ if 'pon_intf_onu_id' not in fields_to_omit and 'pon_intf_onu_id' in flow_info:
+ table.add_cell(i, 8, 'intf_onu_id', value=str(flow_info['pon_intf_onu_id']))
+ if 'cookie' not in fields_to_omit:
+ table.add_cell(i, 9, 'cookie', p_cookie(flow['cookie']))
assert flow['match']['type'] == 'OFPMT_OXM'
for field in flow['match']['oxm_fields']:
@@ -142,16 +164,20 @@
atype = action['type'][len('OFPAT_'):]
table.add_cell(i, *action_printers[atype](action))
elif itype == 1:
- table.add_cell(i, 10000, 'goto-table',
- instruction['goto_table']['table_id'])
+ if 'goto-table' not in fields_to_omit:
+ table.add_cell(i, 10000, 'goto-table',
+ instruction['goto_table']['table_id'])
elif itype == 2:
- table.add_cell(i, 10001, 'write-metadata',
- instruction['write_metadata']['metadata'])
+ if 'write-metadata' not in fields_to_omit:
+ table.add_cell(i, 10001, 'write-metadata',
+ instruction['write_metadata']['metadata'])
elif itype == 5:
- table.add_cell(i, 10002, 'clear-actions', [])
+ if 'clear-actions' not in fields_to_omit:
+ table.add_cell(i, 10002, 'clear-actions', [])
elif itype == 6:
- table.add_cell(i, 10003, 'meter',
- instruction['meter']['meter_id'])
+ if 'meter' not in fields_to_omit:
+ table.add_cell(i, 10003, 'meter',
+ instruction['meter']['meter_id'])
else:
raise NotImplementedError(
'not handling instruction type {}'.format(itype))
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 68ad36f..2f211f1 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -34,6 +34,9 @@
# Flow categories
HSIA_FLOW = "HSIA_FLOW"
HSIA_TRANSPARENT = "HSIA_TRANSPARENT-{}"
+DHCP_FLOW = "DHCP_FLOW"
+EAPOL_FLOW = "EAPOL_FLOW"
+LLDP_FLOW = "LLDP_FLOW"
EAP_ETH_TYPE = 0x888e
LLDP_ETH_TYPE = 0x88cc
@@ -1005,7 +1008,7 @@
action=self.mk_action(action), priority=logical_flow.priority,
port_no=port_no, cookie=logical_flow.cookie)
- if self.add_flow_to_device(flow, logical_flow):
+ if self.add_flow_to_device(flow, logical_flow, flow_store_cookie):
flow_info = self._get_flow_info_as_json_blob(flow,
flow_store_cookie,
flow_category)
@@ -1050,9 +1053,10 @@
port_no=port_no,
cookie=logical_flow.cookie)
- if self.add_flow_to_device(dhcp_flow, logical_flow):
+ if self.add_flow_to_device(dhcp_flow, logical_flow, flow_store_cookie):
flow_info = self._get_flow_info_as_json_blob(dhcp_flow,
- flow_store_cookie)
+ flow_store_cookie,
+ DHCP_FLOW)
self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
dhcp_flow.onu_id,
dhcp_flow.uni_id,
@@ -1101,9 +1105,10 @@
vlan_id | 0x1000)]))
logical_flow.match.type = OFPMT_OXM
- if self.add_flow_to_device(upstream_flow, logical_flow):
+ if self.add_flow_to_device(upstream_flow, logical_flow, flow_store_cookie):
flow_info = self._get_flow_info_as_json_blob(upstream_flow,
- flow_store_cookie)
+ flow_store_cookie,
+ EAPOL_FLOW)
self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
upstream_flow.onu_id,
upstream_flow.uni_id,
@@ -1171,9 +1176,9 @@
uni_id))]))
if self.add_flow_to_device(downstream_flow,
- downstream_logical_flow):
+ downstream_logical_flow, flow_store_cookie):
flow_info = self._get_flow_info_as_json_blob(
- downstream_flow, flow_store_cookie)
+ downstream_flow, flow_store_cookie, EAPOL_FLOW)
self.update_flow_info_to_kv_store(
downstream_flow.access_intf_id, downstream_flow.onu_id,
downstream_flow.uni_id, downstream_flow.flow_id,
@@ -1247,9 +1252,9 @@
self.log.debug('add dhcp downstream trap', classifier=classifier,
action=action, flow=downstream_flow,
port_no=port_no)
- if self.add_flow_to_device(downstream_flow, logical_flow):
+ if self.add_flow_to_device(downstream_flow, logical_flow, flow_store_cookie):
flow_info = self._get_flow_info_as_json_blob(downstream_flow,
- flow_store_cookie)
+ flow_store_cookie, DHCP_FLOW)
self.update_flow_info_to_kv_store(
network_intf_id, onu_id, uni_id, flow_id, flow_info)
@@ -1300,9 +1305,10 @@
self.log.debug('add lldp downstream trap', classifier=classifier,
action=action, flow=downstream_flow,
port_no=port_no)
- if self.add_flow_to_device(downstream_flow, logical_flow):
+ if self.add_flow_to_device(downstream_flow, logical_flow, flow_store_cookie):
flow_info = self._get_flow_info_as_json_blob(downstream_flow,
- flow_store_cookie)
+ flow_store_cookie,
+ LLDP_FLOW)
self.update_flow_info_to_kv_store(
network_intf_id, onu_id, uni_id, flow_id, flow_info)
@@ -1410,7 +1416,7 @@
self.log.debug('No subscriber flow found', port=port)
return None
- def add_flow_to_device(self, flow, logical_flow):
+ def add_flow_to_device(self, flow, logical_flow, flow_store_cookie=None):
self.log.debug('pushing flow to device', flow=flow)
try:
self.stub.FlowAdd(flow)
@@ -1431,6 +1437,12 @@
return False
else:
+ intf_onu_id = (flow.access_intf_id if flow.access_intf_id > 0 else flow.network_intf_id,
+ flow.onu_id, flow.uni_id)
+ logical_flow.intf_tuple.append(str(intf_onu_id))
+ if flow_store_cookie is not None:
+ logical_flow.flow_store_cookie = flow_store_cookie
+
self.register_flow(logical_flow, flow)
return True
diff --git a/voltha/protos/openflow_13.proto b/voltha/protos/openflow_13.proto
index 3c7e858..fea7e49 100644
--- a/voltha/protos/openflow_13.proto
+++ b/voltha/protos/openflow_13.proto
@@ -1808,19 +1808,21 @@
/* Body of reply to OFPMP_FLOW request. */
message ofp_flow_stats {
- uint64 id = 14; /* Unique ID of flow within device. */
- uint32 table_id = 1; /* ID of table flow came from. */
- uint32 duration_sec = 2; /* Time flow has been alive in seconds. */
- uint32 duration_nsec = 3; /* Time flow has been alive in nanoseconds
- beyond duration_sec. */
- uint32 priority = 4; /* Priority of the entry. */
- uint32 idle_timeout = 5; /* Number of seconds idle before expiration. */
- uint32 hard_timeout = 6; /* Number of seconds before expiration. */
- uint32 flags = 7; /* Bitmap of OFPFF_* flags. */
- uint64 cookie = 8; /* Opaque controller-issued identifier. */
- uint64 packet_count = 9; /* Number of packets in flow. */
- uint64 byte_count = 10; /* Number of bytes in flow. */
- ofp_match match = 12; /* Description of fields. Variable size. */
+ uint64 id = 14; /* Unique ID of flow within device. */
+ repeated string intf_tuple = 15; /* the tuple of (intf_id, onu_id, uni_id) for each flow */
+ uint32 table_id = 1; /* ID of table flow came from. */
+ uint32 duration_sec = 2; /* Time flow has been alive in seconds. */
+ uint32 duration_nsec = 3; /* Time flow has been alive in nanoseconds
+ beyond duration_sec. */
+ uint32 priority = 4; /* Priority of the entry. */
+ uint32 idle_timeout = 5; /* Number of seconds idle before expiration. */
+ uint32 hard_timeout = 6; /* Number of seconds before expiration. */
+ uint32 flags = 7; /* Bitmap of OFPFF_* flags. */
+ uint64 cookie = 8; /* Opaque controller-issued identifier. */
+ uint64 packet_count = 9; /* Number of packets in flow. */
+ uint64 byte_count = 10; /* Number of bytes in flow. */
+ string flow_store_cookie = 11; /* Opaque controller-issued identifier. */
+ ofp_match match = 12; /* Description of fields. Variable size. */
repeated ofp_instruction instructions = 13; /* Instruction set
(0 or more) */
};