blob: b05753f018883eb55c02aa369637e49acc993907 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import copy
17from twisted.internet import reactor
18import grpc
19from google.protobuf.json_format import MessageToDict
20import hashlib
21from simplejson import dumps
22
William Kurkian8b1690c2019-03-04 16:53:22 -050023from voltha_protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
William Kurkian6f436d02019-02-06 16:25:01 -050024 ofp_flow_stats, OFPMT_OXM, Flows, FlowGroups, OFPXMT_OFB_IN_PORT, \
25 OFPXMT_OFB_VLAN_VID
William Kurkian8b1690c2019-03-04 16:53:22 -050026from voltha_protos.device_pb2 import Port
William Kurkian44cd7bb2019-02-11 16:39:12 -050027import pyvoltha.common.openflow.utils as fd
William Kurkian8b1690c2019-03-04 16:53:22 -050028from voltha_protos import openolt_pb2
William Kurkian44cd7bb2019-02-11 16:39:12 -050029from pyvoltha.common.utils.registry import registry
William Kurkian6f436d02019-02-06 16:25:01 -050030
William Kurkian44cd7bb2019-02-11 16:39:12 -050031from pyvoltha.common.tech_profile.tech_profile import DEFAULT_TECH_PROFILE_TABLE_ID
William Kurkian6f436d02019-02-06 16:25:01 -050032
33# Flow categories
34HSIA_FLOW = "HSIA_FLOW"
35
36EAP_ETH_TYPE = 0x888e
37LLDP_ETH_TYPE = 0x88cc
38
39IGMP_PROTO = 2
40
41# FIXME - see also BRDCM_DEFAULT_VLAN in broadcom_onu.py
42DEFAULT_MGMT_VLAN = 4091
43
44# Openolt Flow
45UPSTREAM = "upstream"
46DOWNSTREAM = "downstream"
47PACKET_TAG_TYPE = "pkt_tag_type"
48UNTAGGED = "untagged"
49SINGLE_TAG = "single_tag"
50DOUBLE_TAG = "double_tag"
51
52# Classifier
53ETH_TYPE = 'eth_type'
54TPID = 'tpid'
55IP_PROTO = 'ip_proto'
56IN_PORT = 'in_port'
57VLAN_VID = 'vlan_vid'
58VLAN_PCP = 'vlan_pcp'
59UDP_DST = 'udp_dst'
60UDP_SRC = 'udp_src'
61IPV4_DST = 'ipv4_dst'
62IPV4_SRC = 'ipv4_src'
63METADATA = 'metadata'
64OUTPUT = 'output'
65# Action
66POP_VLAN = 'pop_vlan'
67PUSH_VLAN = 'push_vlan'
68TRAP_TO_HOST = 'trap_to_host'
69
70
71class OpenOltFlowMgr(object):
72
73 def __init__(self, adapter_agent, log, stub, device_id, logical_device_id,
74 platform, resource_mgr):
75 self.adapter_agent = adapter_agent
76 self.log = log
77 self.stub = stub
78 self.device_id = device_id
79 self.logical_device_id = logical_device_id
80 self.nni_intf_id = None
81 self.platform = platform
William Kurkian3a341a22019-02-13 18:23:44 -050082 #self.logical_flows_proxy = registry('core').get_proxy(
83 # '/logical_devices/{}/flows'.format(self.logical_device_id))
84 self.logical_flows_proxy = adapter_agent
85 #self.flows_proxy = registry('core').get_proxy(
86 # '/devices/{}/flows'.format(self.device_id))
87 #self.root_proxy = registry('core').get_proxy('/')
William Kurkian6f436d02019-02-06 16:25:01 -050088 self.resource_mgr = resource_mgr
89 self.tech_profile = dict()
90 self._populate_tech_profile_per_pon_port()
91 self.retry_add_flow_list = []
92
93 def add_flow(self, flow):
94 self.log.debug('add flow', flow=flow)
95 classifier_info = dict()
96 action_info = dict()
97
98 for field in fd.get_ofb_fields(flow):
99 if field.type == fd.ETH_TYPE:
100 classifier_info[ETH_TYPE] = field.eth_type
101 self.log.debug('field-type-eth-type',
102 eth_type=classifier_info[ETH_TYPE])
103 elif field.type == fd.IP_PROTO:
104 classifier_info[IP_PROTO] = field.ip_proto
105 self.log.debug('field-type-ip-proto',
106 ip_proto=classifier_info[IP_PROTO])
107 elif field.type == fd.IN_PORT:
108 classifier_info[IN_PORT] = field.port
109 self.log.debug('field-type-in-port',
110 in_port=classifier_info[IN_PORT])
111 elif field.type == fd.VLAN_VID:
112 classifier_info[VLAN_VID] = field.vlan_vid & 0xfff
113 self.log.debug('field-type-vlan-vid',
114 vlan=classifier_info[VLAN_VID])
115 elif field.type == fd.VLAN_PCP:
116 classifier_info[VLAN_PCP] = field.vlan_pcp
117 self.log.debug('field-type-vlan-pcp',
118 pcp=classifier_info[VLAN_PCP])
119 elif field.type == fd.UDP_DST:
120 classifier_info[UDP_DST] = field.udp_dst
121 self.log.debug('field-type-udp-dst',
122 udp_dst=classifier_info[UDP_DST])
123 elif field.type == fd.UDP_SRC:
124 classifier_info[UDP_SRC] = field.udp_src
125 self.log.debug('field-type-udp-src',
126 udp_src=classifier_info[UDP_SRC])
127 elif field.type == fd.IPV4_DST:
128 classifier_info[IPV4_DST] = field.ipv4_dst
129 self.log.debug('field-type-ipv4-dst',
130 ipv4_dst=classifier_info[IPV4_DST])
131 elif field.type == fd.IPV4_SRC:
132 classifier_info[IPV4_SRC] = field.ipv4_src
133 self.log.debug('field-type-ipv4-src',
134 ipv4_dst=classifier_info[IPV4_SRC])
135 elif field.type == fd.METADATA:
136 classifier_info[METADATA] = field.table_metadata
137 self.log.debug('field-type-metadata',
138 metadata=classifier_info[METADATA])
139 else:
140 raise NotImplementedError('field.type={}'.format(
141 field.type))
142
143 for action in fd.get_actions(flow):
144 if action.type == fd.OUTPUT:
145 action_info[OUTPUT] = action.output.port
146 self.log.debug('action-type-output',
147 output=action_info[OUTPUT],
148 in_port=classifier_info[IN_PORT])
149 elif action.type == fd.POP_VLAN:
150 if fd.get_goto_table_id(flow) is None:
151 self.log.debug('being taken care of by ONU', flow=flow)
152 return
153 action_info[POP_VLAN] = True
154 self.log.debug('action-type-pop-vlan',
155 in_port=classifier_info[IN_PORT])
156 elif action.type == fd.PUSH_VLAN:
157 action_info[PUSH_VLAN] = True
158 action_info[TPID] = action.push.ethertype
159 self.log.debug('action-type-push-vlan',
160 push_tpid=action_info[TPID], in_port=classifier_info[IN_PORT])
161 if action.push.ethertype != 0x8100:
162 self.log.error('unhandled-tpid',
163 ethertype=action.push.ethertype)
164 elif action.type == fd.SET_FIELD:
165 # action_info['action_type'] = 'set_field'
166 _field = action.set_field.field.ofb_field
167 assert (action.set_field.field.oxm_class ==
168 OFPXMC_OPENFLOW_BASIC)
169 self.log.debug('action-type-set-field',
170 field=_field, in_port=classifier_info[IN_PORT])
171 if _field.type == fd.VLAN_VID:
172 self.log.debug('set-field-type-vlan-vid',
173 vlan_vid=_field.vlan_vid & 0xfff)
174 action_info[VLAN_VID] = (_field.vlan_vid & 0xfff)
175 else:
176 self.log.error('unsupported-action-set-field-type',
177 field_type=_field.type)
178 else:
179 self.log.error('unsupported-action-type',
180 action_type=action.type, in_port=classifier_info[IN_PORT])
181
182 if fd.get_goto_table_id(flow) is not None and POP_VLAN not in action_info:
183 self.log.debug('being taken care of by ONU', flow=flow)
184 return
185
186 if OUTPUT not in action_info and METADATA in classifier_info:
187 # find flow in the next table
188 next_flow = self.find_next_flow(flow)
189 if next_flow is None:
190 return
191 action_info[OUTPUT] = fd.get_out_port(next_flow)
192 for field in fd.get_ofb_fields(next_flow):
193 if field.type == fd.VLAN_VID:
194 classifier_info[METADATA] = field.vlan_vid & 0xfff
195
196 self.log.debug('flow-ports', classifier_inport=classifier_info[IN_PORT], action_output=action_info[OUTPUT])
197 (port_no, intf_id, onu_id, uni_id) = self.platform.extract_access_from_flow(
198 classifier_info[IN_PORT], action_info[OUTPUT])
199
200 self.divide_and_add_flow(intf_id, onu_id, uni_id, port_no, classifier_info,
201 action_info, flow)
202
203 def _is_uni_port(self, port_no):
204 try:
205 port = self.adapter_agent.get_logical_port(self.logical_device_id,
206 'uni-{}'.format(port_no))
207 if port is not None:
208 return (not port.root_port), port.device_id
209 else:
210 return False, None
211 except Exception as e:
212 self.log.error("error-retrieving-port", e=e)
213 return False, None
214
215 def _clear_flow_id_from_rm(self, flow, flow_id, flow_direction):
216 uni_port_no = None
217 child_device_id = None
218 if flow_direction == UPSTREAM:
219 for field in fd.get_ofb_fields(flow):
220 if field.type == fd.IN_PORT:
221 is_uni, child_device_id = self._is_uni_port(field.port)
222 if is_uni:
223 uni_port_no = field.port
224 elif flow_direction == DOWNSTREAM:
225 for field in fd.get_ofb_fields(flow):
226 if field.type == fd.METADATA:
227 uni_port = field.table_metadata & 0xFFFFFFFF
228 is_uni, child_device_id = self._is_uni_port(uni_port)
229 if is_uni:
230 uni_port_no = field.port
231
232 if uni_port_no is None:
233 for action in fd.get_actions(flow):
234 if action.type == fd.OUTPUT:
235 is_uni, child_device_id = \
236 self._is_uni_port(action.output.port)
237 if is_uni:
238 uni_port_no = action.output.port
239
240 if child_device_id:
241 child_device = self.adapter_agent.get_device(child_device_id)
242 pon_intf = child_device.proxy_address.channel_id
243 onu_id = child_device.proxy_address.onu_id
244 uni_id = self.platform.uni_id_from_port_num(uni_port_no) if uni_port_no is not None else None
245 flows = self.resource_mgr.get_flow_id_info(pon_intf, onu_id, uni_id, flow_id)
246 assert (isinstance(flows, list))
247 self.log.debug("retrieved-flows", flows=flows)
248 for idx in range(len(flows)):
249 if flow_direction == flows[idx]['flow_type']:
250 flows.pop(idx)
251 self.update_flow_info_to_kv_store(pon_intf, onu_id, uni_id, flow_id, flows)
252 if len(flows) > 0:
253 # There are still flows referencing the same flow_id.
254 # So the flow should not be freed yet.
255 # For ex: Case of HSIA where same flow is shared
256 # between DS and US.
257 return
258
259 self.resource_mgr.free_flow_id_for_uni(pon_intf, onu_id, uni_id, flow_id)
260 else:
261 self.log.error("invalid-info", uni_port_no=uni_port_no,
262 child_device_id=child_device_id)
263
264 def retry_add_flow(self, flow):
265 self.log.debug("retry-add-flow")
266 if flow.id in self.retry_add_flow_list:
267 self.retry_add_flow_list.remove(flow.id)
268 self.add_flow(flow)
269
270 def remove_flow(self, flow):
271 self.log.debug('trying to remove flows from logical flow :',
272 logical_flow=flow)
273 device_flows_to_remove = []
274 device_flows = self.flows_proxy.get('/').items
275 for f in device_flows:
276 if f.cookie == flow.id:
277 device_flows_to_remove.append(f)
278
279 for f in device_flows_to_remove:
280 (id, direction) = self.decode_stored_id(f.id)
281 flow_to_remove = openolt_pb2.Flow(flow_id=id, flow_type=direction)
282 try:
283 self.stub.FlowRemove(flow_to_remove)
284 except grpc.RpcError as grpc_e:
285 if grpc_e.code() == grpc.StatusCode.NOT_FOUND:
286 self.log.debug('This flow does not exist on the switch, '
287 'normal after an OLT reboot',
288 flow=flow_to_remove)
289 else:
290 raise grpc_e
291
292 # once we have successfully deleted the flow on the device
293 # release the flow_id on resource pool and also clear any
294 # data associated with the flow_id on KV store.
295 self._clear_flow_id_from_rm(f, id, direction)
296 self.log.debug('flow removed from device', flow=f,
297 flow_key=flow_to_remove)
298
299 if len(device_flows_to_remove) > 0:
300 new_flows = []
301 flows_ids_to_remove = [f.id for f in device_flows_to_remove]
302 for f in device_flows:
303 if f.id not in flows_ids_to_remove:
304 new_flows.append(f)
305
306 self.flows_proxy.update('/', Flows(items=new_flows))
307 self.log.debug('flows removed from the data store',
308 flow_ids_removed=flows_ids_to_remove,
309 number_of_flows_removed=(len(device_flows) - len(
310 new_flows)), expected_flows_removed=len(
311 device_flows_to_remove))
312 else:
313 self.log.debug('no device flow to remove for this flow (normal '
314 'for multi table flows)', flow=flow)
315
316 def _get_ofp_port_name(self, intf_id, onu_id, uni_id):
317 parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
318 child_device = self.adapter_agent.get_child_device(self.device_id,
319 parent_port_no=parent_port_no, onu_id=onu_id)
320 if child_device is None:
321 self.log.error("could-not-find-child-device",
322 parent_port_no=intf_id, onu_id=onu_id)
323 return (None, None)
324 ports = self.adapter_agent.get_ports(child_device.id, Port.ETHERNET_UNI)
325 logical_port = self.adapter_agent.get_logical_port(
326 self.logical_device_id, ports[uni_id].label)
327 ofp_port_name = (logical_port.ofp_port.name, logical_port.ofp_port.port_no)
328 return ofp_port_name
329
330 def get_tp_path(self, intf_id, ofp_port_name):
331 # FIXME Should get Table id form the flow, as of now hardcoded to
332 # DEFAULT_TECH_PROFILE_TABLE_ID (64)
333 # 'tp_path' contains the suffix part of the tech_profile_instance path.
334 # The prefix to the 'tp_path' should be set to \
335 # TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
336 return self.tech_profile[intf_id]. \
337 get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
338 ofp_port_name)
339
340 def delete_tech_profile_instance(self, intf_id, onu_id, uni_id):
341 # Remove the TP instance associated with the ONU
342 ofp_port_name = self._get_ofp_port_name(intf_id, onu_id, uni_id)
343 tp_path = self.get_tp_path(intf_id, ofp_port_name)
344 return self.tech_profile[intf_id].delete_tech_profile_instance(tp_path)
345
346 def divide_and_add_flow(self, intf_id, onu_id, uni_id, port_no, classifier,
347 action, flow):
348
349 self.log.debug('sorting flow', intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, port_no=port_no,
350 classifier=classifier, action=action)
351
352 alloc_id, gem_ports = self.create_tcont_gemport(intf_id, onu_id, uni_id,
353 flow.table_id)
354 if alloc_id is None or gem_ports is None:
355 self.log.error("alloc-id-gem-ports-unavailable", alloc_id=alloc_id,
356 gem_ports=gem_ports)
357 return
358
359 self.log.debug('Generated required alloc and gemport ids',
360 alloc_id=alloc_id, gemports=gem_ports)
361
362 # Flows can't be added specific to gemport unless p-bits are received.
363 # Hence adding flows for all gemports
364 for gemport_id in gem_ports:
365 if IP_PROTO in classifier:
366 if classifier[IP_PROTO] == 17:
367 self.log.debug('dhcp flow add')
368 self.add_dhcp_trap(intf_id, onu_id, uni_id, port_no, classifier,
369 action, flow, alloc_id, gemport_id)
370 elif classifier[IP_PROTO] == 2:
371 self.log.warn('igmp flow add ignored, not implemented yet')
372 else:
373 self.log.warn("Invalid-Classifier-to-handle",
374 classifier=classifier,
375 action=action)
376 elif ETH_TYPE in classifier:
377 if classifier[ETH_TYPE] == EAP_ETH_TYPE:
378 self.log.debug('eapol flow add')
379 self.add_eapol_flow(intf_id, onu_id, uni_id, port_no, flow, alloc_id,
380 gemport_id)
381 vlan_id = self.get_subscriber_vlan(fd.get_in_port(flow))
382 if vlan_id is not None:
383 self.add_eapol_flow(
384 intf_id, onu_id, uni_id, port_no, flow, alloc_id, gemport_id,
385 vlan_id=vlan_id)
386 parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
387 onu_device = self.adapter_agent.get_child_device(self.device_id,
388 onu_id=onu_id,
389 parent_port_no=parent_port_no)
390 (ofp_port_name, ofp_port_no) = self._get_ofp_port_name(intf_id, onu_id, uni_id)
391 if ofp_port_name is None:
392 self.log.error("port-name-not-found")
393 return
394
395 tp_path = self.get_tp_path(intf_id, ofp_port_name)
396
397 self.log.debug('Load-tech-profile-request-to-brcm-handler',
398 tp_path=tp_path)
399 msg = {'proxy_address': onu_device.proxy_address, 'uni_id': uni_id,
400 'event': 'download_tech_profile', 'event_data': tp_path}
401
402 # Send the event message to the ONU adapter
403 self.adapter_agent.publish_inter_adapter_message(onu_device.id,
404 msg)
405
406 if classifier[ETH_TYPE] == LLDP_ETH_TYPE:
407 self.log.debug('lldp flow add')
408 nni_intf_id = self.get_nni_intf_id()
409 self.add_lldp_flow(flow, port_no, nni_intf_id)
410
411 elif PUSH_VLAN in action:
412 self.add_upstream_data_flow(intf_id, onu_id, uni_id, port_no, classifier,
413 action, flow, alloc_id, gemport_id)
414 elif POP_VLAN in action:
415 self.add_downstream_data_flow(intf_id, onu_id, uni_id, port_no, classifier,
416 action, flow, alloc_id, gemport_id)
417 else:
418 self.log.debug('Invalid-flow-type-to-handle',
419 classifier=classifier,
420 action=action, flow=flow)
421
422 def create_tcont_gemport(self, intf_id, onu_id, uni_id, table_id):
423 alloc_id, gem_port_ids = None, None
424 pon_intf_onu_id = (intf_id, onu_id)
425
426 # If we already have allocated alloc_id and gem_ports earlier, render them
427 alloc_id = \
428 self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_onu_id)
429 gem_port_ids = \
430 self.resource_mgr.get_current_gemport_ids_for_onu(pon_intf_onu_id)
431 if alloc_id is not None and gem_port_ids is not None:
432 return alloc_id, gem_port_ids
433
434 try:
435 (ofp_port_name, ofp_port_no) = self._get_ofp_port_name(intf_id, onu_id, uni_id)
436 if ofp_port_name is None:
437 self.log.error("port-name-not-found")
438 return alloc_id, gem_port_ids
439 # FIXME: If table id is <= 63 using 64 as table id
440 if table_id < DEFAULT_TECH_PROFILE_TABLE_ID:
441 table_id = DEFAULT_TECH_PROFILE_TABLE_ID
442
443 # Check tech profile instance already exists for derived port name
444 tech_profile_instance = self.tech_profile[intf_id]. \
445 get_tech_profile_instance(table_id, ofp_port_name)
446 self.log.debug('Get-tech-profile-instance-status', tech_profile_instance=tech_profile_instance)
447
448 if tech_profile_instance is None:
449 # create tech profile instance
450 tech_profile_instance = self.tech_profile[intf_id]. \
451 create_tech_profile_instance(table_id, ofp_port_name,
452 intf_id)
453 if tech_profile_instance is None:
454 raise Exception('Tech-profile-instance-creation-failed')
455 else:
456 self.log.debug(
457 'Tech-profile-instance-already-exist-for-given port-name',
458 ofp_port_name=ofp_port_name)
459
460 # upstream scheduler
461 us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
462 tech_profile_instance)
463 # downstream scheduler
464 ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
465 tech_profile_instance)
466 # create Tcont
467 tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
468 us_scheduler,
469 ds_scheduler)
470
471 self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
472 onu_id=onu_id,
473 uni_id=uni_id,
474 port_no=ofp_port_no,
475 tconts=tconts))
476
477 # Fetch alloc id and gemports from tech profile instance
478 alloc_id = tech_profile_instance.us_scheduler.alloc_id
479 gem_port_ids = []
480 for i in range(len(
481 tech_profile_instance.upstream_gem_port_attribute_list)):
482 gem_port_ids.append(
483 tech_profile_instance.upstream_gem_port_attribute_list[i].
484 gemport_id)
485 except BaseException as e:
486 self.log.exception(exception=e)
487
488 # Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV store
489 pon_intf_onu_id = (intf_id, onu_id, uni_id)
490 self.resource_mgr.resource_mgrs[intf_id].update_alloc_ids_for_onu(
491 pon_intf_onu_id,
492 list([alloc_id])
493 )
494 self.resource_mgr.resource_mgrs[intf_id].update_gemport_ids_for_onu(
495 pon_intf_onu_id,
496 gem_port_ids
497 )
498
499 self.resource_mgr.update_gemports_ponport_to_onu_map_on_kv_store(
500 gem_port_ids, intf_id, onu_id, uni_id
501 )
502
503 return alloc_id, gem_port_ids
504
505 def add_upstream_data_flow(self, intf_id, onu_id, uni_id, port_no, uplink_classifier,
506 uplink_action, logical_flow, alloc_id,
507 gemport_id):
508
509 uplink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
510
511 self.add_hsia_flow(intf_id, onu_id, uni_id, port_no, uplink_classifier,
512 uplink_action, UPSTREAM,
513 logical_flow, alloc_id, gemport_id)
514
515 # Secondary EAP on the subscriber vlan
516 (eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id, uni_id)
517 if eap_active:
518 self.add_eapol_flow(intf_id, onu_id, uni_id, port_no, eap_logical_flow, alloc_id,
519 gemport_id, vlan_id=uplink_classifier[VLAN_VID])
520
521 def add_downstream_data_flow(self, intf_id, onu_id, uni_id, port_no, downlink_classifier,
522 downlink_action, flow, alloc_id, gemport_id):
523 downlink_classifier[PACKET_TAG_TYPE] = DOUBLE_TAG
524 # Needed ???? It should be already there
525 downlink_action[POP_VLAN] = True
526 downlink_action[VLAN_VID] = downlink_classifier[VLAN_VID]
527
528 self.add_hsia_flow(intf_id, onu_id, uni_id, port_no, downlink_classifier,
529 downlink_action, DOWNSTREAM,
530 flow, alloc_id, gemport_id)
531
532 def add_hsia_flow(self, intf_id, onu_id, uni_id, port_no, classifier, action,
533 direction, logical_flow, alloc_id, gemport_id):
534
535 flow_store_cookie = self._get_flow_store_cookie(classifier,
536 gemport_id)
537
538 # One of the OLT platform (Broadcom BAL) requires that symmetric
539 # flows require the same flow_id to be used across UL and DL.
540 # Since HSIA flow is the only symmetric flow currently, we need to
541 # re-use the flow_id across both direction. The 'flow_category'
542 # takes priority over flow_cookie to find any available HSIA_FLOW
543 # id for the ONU.
544 flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id,
545 flow_store_cookie,
546 HSIA_FLOW)
547 if flow_id is None:
548 self.log.error("hsia-flow-unavailable")
549 return
550 flow = openolt_pb2.Flow(
551 access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=flow_id,
552 flow_type=direction, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
553 gemport_id=gemport_id,
554 classifier=self.mk_classifier(classifier),
555 action=self.mk_action(action),
556 priority=logical_flow.priority,
557 port_no=port_no,
558 cookie=logical_flow.cookie)
559
560 if self.add_flow_to_device(flow, logical_flow):
561 flow_info = self._get_flow_info_as_json_blob(flow,
562 flow_store_cookie,
563 HSIA_FLOW)
564 self.update_flow_info_to_kv_store(flow.access_intf_id,
565 flow.onu_id, flow.uni_id,
566 flow.flow_id, flow_info)
567
568 def add_dhcp_trap(self, intf_id, onu_id, uni_id, port_no, classifier, action, logical_flow,
569 alloc_id, gemport_id):
570
571 self.log.debug('add dhcp upstream trap', classifier=classifier,
572 intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, action=action)
573
574 action.clear()
575 action[TRAP_TO_HOST] = True
576 classifier[UDP_SRC] = 68
577 classifier[UDP_DST] = 67
578 classifier[PACKET_TAG_TYPE] = SINGLE_TAG
579 classifier.pop(VLAN_VID, None)
580
581 flow_store_cookie = self._get_flow_store_cookie(classifier,
582 gemport_id)
583
584 flow_id = self.resource_mgr.get_flow_id(
585 intf_id, onu_id, uni_id, flow_store_cookie
586 )
587 dhcp_flow = openolt_pb2.Flow(
588 onu_id=onu_id, uni_id=uni_id, flow_id=flow_id, flow_type=UPSTREAM,
589 access_intf_id=intf_id, gemport_id=gemport_id,
590 alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
591 priority=logical_flow.priority,
592 classifier=self.mk_classifier(classifier),
593 action=self.mk_action(action),
594 port_no=port_no,
595 cookie=logical_flow.cookie)
596
597 if self.add_flow_to_device(dhcp_flow, logical_flow):
598 flow_info = self._get_flow_info_as_json_blob(dhcp_flow, flow_store_cookie)
599 self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
600 dhcp_flow.onu_id,
601 dhcp_flow.uni_id,
602 dhcp_flow.flow_id,
603 flow_info)
604
605 def add_eapol_flow(self, intf_id, onu_id, uni_id, port_no, logical_flow, alloc_id,
606 gemport_id, vlan_id=DEFAULT_MGMT_VLAN):
607
608 uplink_classifier = dict()
609 uplink_classifier[ETH_TYPE] = EAP_ETH_TYPE
610 uplink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
611 uplink_classifier[VLAN_VID] = vlan_id
612
613 uplink_action = dict()
614 uplink_action[TRAP_TO_HOST] = True
615
616 flow_store_cookie = self._get_flow_store_cookie(uplink_classifier,
617 gemport_id)
618 # Add Upstream EAPOL Flow.
619 uplink_flow_id = self.resource_mgr.get_flow_id(
620 intf_id, onu_id, uni_id, flow_store_cookie
621 )
622
623 upstream_flow = openolt_pb2.Flow(
624 access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=uplink_flow_id,
625 flow_type=UPSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
626 gemport_id=gemport_id,
627 classifier=self.mk_classifier(uplink_classifier),
628 action=self.mk_action(uplink_action),
629 priority=logical_flow.priority,
630 port_no=port_no,
631 cookie=logical_flow.cookie)
632
633 logical_flow = copy.deepcopy(logical_flow)
634 logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([fd.vlan_vid(
635 vlan_id | 0x1000)]))
636 logical_flow.match.type = OFPMT_OXM
637
638 if self.add_flow_to_device(upstream_flow, logical_flow):
639 flow_info = self._get_flow_info_as_json_blob(upstream_flow,
640 flow_store_cookie)
641 self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
642 upstream_flow.onu_id,
643 upstream_flow.uni_id,
644 upstream_flow.flow_id,
645 flow_info)
646
647 if vlan_id == DEFAULT_MGMT_VLAN:
648 # Add Downstream EAPOL Flow, Only for first EAP flow (BAL
649 # requirement)
650 # On one of the platforms (Broadcom BAL), when same DL classifier
651 # vlan was used across multiple ONUs, eapol flow re-adds after
652 # flow delete (cases of onu reboot/disable) fails.
653 # In order to generate unique vlan, a combination of intf_id
654 # onu_id and uni_id is used.
655 # uni_id defaults to 0, so add 1 to it.
656 special_vlan_downstream_flow = 4090 - intf_id * onu_id * (uni_id+1)
657 # Assert that we do not generate invalid vlans under no condition
658 assert (special_vlan_downstream_flow >= 2, 'invalid-vlan-generated')
659
660 downlink_classifier = dict()
661 downlink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
662 downlink_classifier[VLAN_VID] = special_vlan_downstream_flow
663
664 downlink_action = dict()
665 downlink_action[PUSH_VLAN] = True
666 downlink_action[VLAN_VID] = vlan_id
667
668
669 flow_store_cookie = self._get_flow_store_cookie(downlink_classifier,
670 gemport_id)
671
672 downlink_flow_id = self.resource_mgr.get_flow_id(
673 intf_id, onu_id, uni_id, flow_store_cookie
674 )
675
676 downstream_flow = openolt_pb2.Flow(
677 access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=downlink_flow_id,
678 flow_type=DOWNSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
679 gemport_id=gemport_id,
680 classifier=self.mk_classifier(downlink_classifier),
681 action=self.mk_action(downlink_action),
682 priority=logical_flow.priority,
683 port_no=port_no,
684 cookie=logical_flow.cookie)
685
686 downstream_logical_flow = ofp_flow_stats(
687 id=logical_flow.id, cookie=logical_flow.cookie,
688 table_id=logical_flow.table_id, priority=logical_flow.priority,
689 flags=logical_flow.flags)
690
691 downstream_logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([
692 fd.in_port(fd.get_out_port(logical_flow)),
693 fd.vlan_vid(special_vlan_downstream_flow | 0x1000)]))
694 downstream_logical_flow.match.type = OFPMT_OXM
695
696 downstream_logical_flow.instructions.extend(
697 fd.mk_instructions_from_actions([fd.output(
698 self.platform.mk_uni_port_num(intf_id, onu_id, uni_id))]))
699
700 if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
701 flow_info = self._get_flow_info_as_json_blob(downstream_flow,
702 flow_store_cookie)
703 self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
704 downstream_flow.onu_id,
705 downstream_flow.uni_id,
706 downstream_flow.flow_id,
707 flow_info)
708
709 def repush_all_different_flows(self):
710 # Check if the device is supposed to have flows, if so add them
711 # Recover static flows after a reboot
712 logical_flows = self.logical_flows_proxy.get('/').items
713 devices_flows = self.flows_proxy.get('/').items
714 logical_flows_ids_provisioned = [f.cookie for f in devices_flows]
715 for logical_flow in logical_flows:
716 try:
717 if logical_flow.id not in logical_flows_ids_provisioned:
718 self.add_flow(logical_flow)
719 except Exception as e:
720 self.log.exception('Problem reading this flow', e=e)
721
722 def reset_flows(self):
723 self.flows_proxy.update('/', Flows())
724
725 """ Add a downstream LLDP trap flow on the NNI interface
726 """
727
728 def add_lldp_flow(self, logical_flow, port_no, network_intf_id=0):
729
730 classifier = dict()
731 classifier[ETH_TYPE] = LLDP_ETH_TYPE
732 classifier[PACKET_TAG_TYPE] = UNTAGGED
733 action = dict()
734 action[TRAP_TO_HOST] = True
735
736 # LLDP flow is installed to trap LLDP packets on the NNI port.
737 # We manage flow_id resource pool on per PON port basis.
738 # Since this situation is tricky, as a hack, we pass the NNI port
739 # index (network_intf_id) as PON port Index for the flow_id resource
740 # pool. Also, there is no ONU Id available for trapping LLDP packets
741 # on NNI port, use onu_id as -1 (invalid)
742 # ****************** CAVEAT *******************
743 # This logic works if the NNI Port Id falls within the same valid
744 # range of PON Port Ids. If this doesn't work for some OLT Vendor
745 # we need to have a re-look at this.
746 # *********************************************
747 onu_id = -1
748 uni_id = -1
749 flow_store_cookie = self._get_flow_store_cookie(classifier)
750 flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id,
751 flow_store_cookie)
752
753 downstream_flow = openolt_pb2.Flow(
754 access_intf_id=-1, # access_intf_id not required
755 onu_id=onu_id, # onu_id not required
756 uni_id=uni_id, # uni_id not used
757 flow_id=flow_id,
758 flow_type=DOWNSTREAM,
759 network_intf_id=network_intf_id,
760 gemport_id=-1, # gemport_id not required
761 classifier=self.mk_classifier(classifier),
762 action=self.mk_action(action),
763 priority=logical_flow.priority,
764 port_no=port_no,
765 cookie=logical_flow.cookie)
766
767 self.log.debug('add lldp downstream trap', classifier=classifier,
768 action=action, flow=downstream_flow, port_no=port_no)
769 if self.add_flow_to_device(downstream_flow, logical_flow):
770 self.update_flow_info_to_kv_store(network_intf_id, onu_id, uni_id,
771 flow_id, downstream_flow)
772
773 def mk_classifier(self, classifier_info):
774
775 classifier = openolt_pb2.Classifier()
776
777 if ETH_TYPE in classifier_info:
778 classifier.eth_type = classifier_info[ETH_TYPE]
779 if IP_PROTO in classifier_info:
780 classifier.ip_proto = classifier_info[IP_PROTO]
781 if VLAN_VID in classifier_info:
782 classifier.o_vid = classifier_info[VLAN_VID]
783 if METADATA in classifier_info:
784 classifier.i_vid = classifier_info[METADATA]
785 if VLAN_PCP in classifier_info:
786 classifier.o_pbits = classifier_info[VLAN_PCP]
787 if UDP_SRC in classifier_info:
788 classifier.src_port = classifier_info[UDP_SRC]
789 if UDP_DST in classifier_info:
790 classifier.dst_port = classifier_info[UDP_DST]
791 if IPV4_DST in classifier_info:
792 classifier.dst_ip = classifier_info[IPV4_DST]
793 if IPV4_SRC in classifier_info:
794 classifier.src_ip = classifier_info[IPV4_SRC]
795 if PACKET_TAG_TYPE in classifier_info:
796 if classifier_info[PACKET_TAG_TYPE] == SINGLE_TAG:
797 classifier.pkt_tag_type = SINGLE_TAG
798 elif classifier_info[PACKET_TAG_TYPE] == DOUBLE_TAG:
799 classifier.pkt_tag_type = DOUBLE_TAG
800 elif classifier_info[PACKET_TAG_TYPE] == UNTAGGED:
801 classifier.pkt_tag_type = UNTAGGED
802 else:
803 classifier.pkt_tag_type = 'none'
804
805 return classifier
806
807 def mk_action(self, action_info):
808 action = openolt_pb2.Action()
809
810 if POP_VLAN in action_info:
811 action.o_vid = action_info[VLAN_VID]
812 action.cmd.remove_outer_tag = True
813 elif PUSH_VLAN in action_info:
814 action.o_vid = action_info[VLAN_VID]
815 action.cmd.add_outer_tag = True
816 elif TRAP_TO_HOST in action_info:
817 action.cmd.trap_to_host = True
818 else:
819 self.log.info('Invalid-action-field', action_info=action_info)
820 return
821 return action
822
823 def is_eap_enabled(self, intf_id, onu_id, uni_id):
824 flows = self.logical_flows_proxy.get('/').items
825
826 for flow in flows:
827 eap_flow = False
828 eap_intf_id = None
829 eap_onu_id = None
830 eap_uni_id = None
831 for field in fd.get_ofb_fields(flow):
832 if field.type == fd.ETH_TYPE:
833 if field.eth_type == EAP_ETH_TYPE:
834 eap_flow = True
835 if field.type == fd.IN_PORT:
836 eap_intf_id = self.platform.intf_id_from_uni_port_num(
837 field.port)
838 eap_onu_id = self.platform.onu_id_from_port_num(field.port)
839 eap_uni_id = self.platform.uni_id_from_port_num(field.port)
840
841 if eap_flow:
842 self.log.debug('eap flow detected', onu_id=onu_id, uni_id=uni_id,
843 intf_id=intf_id, eap_intf_id=eap_intf_id,
844 eap_onu_id=eap_onu_id,
845 eap_uni_id=eap_uni_id)
846 if eap_flow and intf_id == eap_intf_id and onu_id == eap_onu_id and uni_id == eap_uni_id:
847 return True, flow
848
849 return False, None
850
851 def get_subscriber_vlan(self, port):
852 self.log.debug('looking from subscriber flow for port', port=port)
853
854 flows = self.logical_flows_proxy.get('/').items
855 for flow in flows:
856 in_port = fd.get_in_port(flow)
857 out_port = fd.get_out_port(flow)
858 if in_port == port and out_port is not None and \
859 self.platform.intf_id_to_port_type_name(out_port) \
860 == Port.ETHERNET_NNI:
861 fields = fd.get_ofb_fields(flow)
862 self.log.debug('subscriber flow found', fields=fields)
863 for field in fields:
864 if field.type == OFPXMT_OFB_VLAN_VID:
865 self.log.debug('subscriber vlan found',
866 vlan_id=field.vlan_vid)
867 return field.vlan_vid & 0x0fff
868 self.log.debug('No subscriber flow found', port=port)
869 return None
870
871 def add_flow_to_device(self, flow, logical_flow):
872 self.log.debug('pushing flow to device', flow=flow)
873 try:
874 self.stub.FlowAdd(flow)
875 except grpc.RpcError as grpc_e:
876 if grpc_e.code() == grpc.StatusCode.ALREADY_EXISTS:
877 self.log.warn('flow already exists', e=grpc_e, flow=flow)
878 else:
879 self.log.error('failed to add flow',
880 logical_flow=logical_flow, flow=flow,
881 grpc_error=grpc_e)
882 return False
883 else:
884 self.register_flow(logical_flow, flow)
885 return True
886
887 def update_flow_info_to_kv_store(self, intf_id, onu_id, uni_id, flow_id, flow):
888 self.resource_mgr.update_flow_id_info_for_uni(intf_id, onu_id, uni_id,
889 flow_id, flow)
890
891 def register_flow(self, logical_flow, device_flow):
892 self.log.debug('registering flow in device',
893 logical_flow=logical_flow, device_flow=device_flow)
894 stored_flow = copy.deepcopy(logical_flow)
895 stored_flow.id = self.generate_stored_id(device_flow.flow_id,
896 device_flow.flow_type)
897 self.log.debug('generated device flow id', id=stored_flow.id,
898 flow_id=device_flow.flow_id,
899 direction=device_flow.flow_type)
900 stored_flow.cookie = logical_flow.id
901 flows = self.flows_proxy.get('/')
902 flows.items.extend([stored_flow])
903 self.flows_proxy.update('/', flows)
904
905 def find_next_flow(self, flow):
906 table_id = fd.get_goto_table_id(flow)
907 metadata = 0
908 # Prior to ONOS 1.13.5, Metadata contained the UNI output port number. In
909 # 1.13.5 and later, the lower 32-bits is the output port number and the
910 # upper 32-bits is the inner-vid we are looking for. Use just the lower 32
911 # bits. Allows this code to work with pre- and post-1.13.5 ONOS OltPipeline
912
913 for field in fd.get_ofb_fields(flow):
914 if field.type == fd.METADATA:
915 metadata = field.table_metadata & 0xFFFFFFFF
916 if table_id is None:
917 return None
918 flows = self.logical_flows_proxy.get('/').items
919 next_flows = []
920 for f in flows:
921 if f.table_id == table_id:
922 # FIXME
923 if fd.get_in_port(f) == fd.get_in_port(flow) and \
924 fd.get_out_port(f) == metadata:
925 next_flows.append(f)
926
927 if len(next_flows) == 0:
928 self.log.warning('no next flow found, it may be a timing issue',
929 flow=flow, number_of_flows=len(flows))
930 if flow.id in self.retry_add_flow_list:
931 self.log.debug('flow is already in retry list', flow_id=flow.id)
932 else:
933 self.retry_add_flow_list.append(flow.id)
934 reactor.callLater(5, self.retry_add_flow, flow)
935 return None
936
937 next_flows.sort(key=lambda f: f.priority, reverse=True)
938
939 return next_flows[0]
940
941 def update_children_flows(self, device_rules_map):
942
943 for device_id, (flows, groups) in device_rules_map.iteritems():
944 if device_id != self.device_id:
945 self.root_proxy.update('/devices/{}/flows'.format(device_id),
946 Flows(items=flows.values()))
947 self.root_proxy.update('/devices/{}/flow_groups'.format(
948 device_id), FlowGroups(items=groups.values()))
949
950 def clear_flows_and_scheduler_for_logical_port(self, child_device, logical_port):
951 ofp_port_name = logical_port.ofp_port.name
952 port_no = logical_port.ofp_port.port_no
953 pon_port = child_device.proxy_address.channel_id
954 onu_id = child_device.proxy_address.onu_id
955 uni_id = self.platform.uni_id_from_port_num(logical_port)
956
957 # TODO: The DEFAULT_TECH_PROFILE_ID is assumed. Right way to do,
958 # is probably to maintain a list of Tech-profile table IDs associated
959 # with the UNI logical_port. This way, when the logical port is deleted,
960 # all the associated tech-profile configuration with the UNI logical_port
961 # can be cleared.
962 tech_profile_instance = self.tech_profile[pon_port]. \
963 get_tech_profile_instance(
964 DEFAULT_TECH_PROFILE_TABLE_ID,
965 ofp_port_name)
966 flow_ids = self.resource_mgr.get_current_flow_ids_for_uni(pon_port, onu_id, uni_id)
967 self.log.debug("outstanding-flows-to-be-cleared", flow_ids=flow_ids)
968 for flow_id in flow_ids:
969 flow_infos = self.resource_mgr.get_flow_id_info(pon_port, onu_id, uni_id, flow_id)
970 for flow_info in flow_infos:
971 direction = flow_info['flow_type']
972 flow_to_remove = openolt_pb2.Flow(flow_id=flow_id,
973 flow_type=direction)
974 try:
975 self.stub.FlowRemove(flow_to_remove)
976 except grpc.RpcError as grpc_e:
977 if grpc_e.code() == grpc.StatusCode.NOT_FOUND:
978 self.log.debug('This flow does not exist on the switch, '
979 'normal after an OLT reboot',
980 flow=flow_to_remove)
981 else:
982 raise grpc_e
983
984 self.resource_mgr.free_flow_id_for_uni(pon_port, onu_id, uni_id, flow_id)
985
986 try:
987 tconts = self.tech_profile[pon_port].get_tconts(tech_profile_instance)
988 self.stub.RemoveTconts(openolt_pb2.Tconts(intf_id=pon_port,
989 onu_id=onu_id,
990 uni_id=uni_id,
991 port_no=port_no,
992 tconts=tconts))
993 except grpc.RpcError as grpc_e:
994 self.log.error('error-removing-tcont-scheduler-queues',
995 err=grpc_e)
996
997 def generate_stored_id(self, flow_id, direction):
998 if direction == UPSTREAM:
999 self.log.debug('upstream flow, shifting id')
1000 return 0x1 << 15 | flow_id
1001 elif direction == DOWNSTREAM:
1002 self.log.debug('downstream flow, not shifting id')
1003 return flow_id
1004 else:
1005 self.log.warn('Unrecognized direction', direction=direction)
1006 return flow_id
1007
1008 def decode_stored_id(self, id):
1009 if id >> 15 == 0x1:
1010 return id & 0x7fff, UPSTREAM
1011 else:
1012 return id, DOWNSTREAM
1013
1014 def _populate_tech_profile_per_pon_port(self):
1015 for arange in self.resource_mgr.device_info.ranges:
1016 for intf_id in arange.intf_ids:
1017 self.tech_profile[intf_id] = \
1018 self.resource_mgr.resource_mgrs[intf_id].tech_profile
1019
1020 # Make sure we have as many tech_profiles as there are pon ports on
1021 # the device
1022 assert len(self.tech_profile) == self.resource_mgr.device_info.pon_ports
1023
1024 def _get_flow_info_as_json_blob(self, flow, flow_store_cookie,
1025 flow_category=None):
1026 json_blob = MessageToDict(message=flow,
1027 preserving_proto_field_name=True)
1028 self.log.debug("flow-info", json_blob=json_blob)
1029 json_blob['flow_store_cookie'] = flow_store_cookie
1030 if flow_category is not None:
1031 json_blob['flow_category'] = flow_category
1032 flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
1033 flow.onu_id, flow.uni_id, flow.flow_id)
1034
1035 if flow_info is None:
1036 flow_info = list()
1037 flow_info.append(json_blob)
1038 else:
1039 assert (isinstance(flow_info, list))
1040 flow_info.append(json_blob)
1041
1042 return flow_info
1043
1044 @staticmethod
1045 def _get_flow_store_cookie(classifier, gem_port=None):
1046 assert isinstance(classifier, dict)
1047 # We need unique flows per gem_port
1048 if gem_port is not None:
1049 to_hash = dumps(classifier, sort_keys=True) + str(gem_port)
1050 else:
1051 to_hash = dumps(classifier, sort_keys=True)
1052 return hashlib.md5(to_hash).hexdigest()[:12]
1053
1054 def get_nni_intf_id(self):
1055 if self.nni_intf_id is not None:
1056 return self.nni_intf_id
1057
1058 port_list = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
1059 logical_port = self.adapter_agent.get_logical_port(self.logical_device_id,
1060 port_list[0].label)
1061 self.nni_intf_id = self.platform.intf_id_from_nni_port_num(logical_port.ofp_port.port_no)
1062 self.log.debug("nni-intf-d ", nni_intf_id=self.nni_intf_id)
1063 return self.nni_intf_id