blob: bbf1ba7fbfb4ff4b257ba872888e8bf51904fb26 [file] [log] [blame]
Andrea Campanella37f07e42021-02-16 11:24:39 +01001/*
2 * Copyright 2016-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.olt.driver;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalCause;
21import com.google.common.cache.RemovalNotification;
22import com.google.common.collect.ImmutableList;
23import com.google.common.collect.Lists;
24import org.apache.commons.lang3.tuple.ImmutablePair;
25import org.apache.commons.lang3.tuple.Pair;
26import org.onlab.osgi.ServiceDirectory;
27import org.onlab.packet.EthType;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IPv6;
30import org.onlab.packet.IpPrefix;
31import org.onlab.packet.VlanId;
32import org.onlab.util.AbstractAccumulator;
33import org.onlab.util.Accumulator;
34import org.onlab.util.KryoNamespace;
35import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
37import org.onosproject.net.DeviceId;
38import org.onosproject.net.PortNumber;
39import org.onosproject.net.behaviour.NextGroup;
40import org.onosproject.net.behaviour.Pipeliner;
41import org.onosproject.net.behaviour.PipelinerContext;
42import org.onosproject.net.driver.AbstractHandlerBehaviour;
43import org.onosproject.net.driver.Driver;
44import org.onosproject.net.flow.DefaultFlowRule;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.FlowRule;
48import org.onosproject.net.flow.FlowRuleOperations;
49import org.onosproject.net.flow.FlowRuleOperationsContext;
50import org.onosproject.net.flow.FlowRuleService;
51import org.onosproject.net.flow.TrafficSelector;
52import org.onosproject.net.flow.TrafficTreatment;
53import org.onosproject.net.flow.criteria.Criteria;
54import org.onosproject.net.flow.criteria.Criterion;
55import org.onosproject.net.flow.criteria.EthTypeCriterion;
56import org.onosproject.net.flow.criteria.IPCriterion;
57import org.onosproject.net.flow.criteria.IPProtocolCriterion;
58import org.onosproject.net.flow.criteria.PortCriterion;
59import org.onosproject.net.flow.criteria.UdpPortCriterion;
60import org.onosproject.net.flow.criteria.VlanIdCriterion;
61import org.onosproject.net.flow.instructions.Instruction;
62import org.onosproject.net.flow.instructions.Instructions;
63import org.onosproject.net.flow.instructions.L2ModificationInstruction;
64import org.onosproject.net.flowobjective.FilteringObjective;
65import org.onosproject.net.flowobjective.FlowObjectiveStore;
66import org.onosproject.net.flowobjective.ForwardingObjective;
67import org.onosproject.net.flowobjective.NextObjective;
68import org.onosproject.net.flowobjective.Objective;
69import org.onosproject.net.flowobjective.ObjectiveError;
70import org.onosproject.net.group.DefaultGroupBucket;
71import org.onosproject.net.group.DefaultGroupDescription;
72import org.onosproject.net.group.DefaultGroupKey;
73import org.onosproject.net.group.Group;
74import org.onosproject.net.group.GroupBucket;
75import org.onosproject.net.group.GroupBuckets;
76import org.onosproject.net.group.GroupDescription;
77import org.onosproject.net.group.GroupEvent;
78import org.onosproject.net.group.GroupKey;
79import org.onosproject.net.group.GroupListener;
80import org.onosproject.net.group.GroupService;
81import org.onosproject.store.serializers.KryoNamespaces;
82import org.onosproject.store.service.StorageService;
83import org.slf4j.Logger;
84
85import java.util.Arrays;
86import java.util.Collection;
87import java.util.Collections;
88import java.util.List;
89import java.util.Objects;
90import java.util.Optional;
91import java.util.concurrent.ScheduledExecutorService;
92import java.util.Timer;
93import java.util.concurrent.TimeUnit;
94import java.util.stream.Collectors;
95
96import static org.onosproject.core.CoreService.CORE_APP_NAME;
yasin saplib4b8ee12021-06-13 18:25:20 +000097import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_OLT;
98import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_ONU;
99import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
100import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
Andrea Campanella37f07e42021-02-16 11:24:39 +0100101import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
102import static org.onlab.util.Tools.groupedThreads;
103import static org.slf4j.LoggerFactory.getLogger;
104
105/**
106 * Pipeliner for OLT device.
107 */
108
109public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
110
111 private static final Integer QQ_TABLE = 1;
112 private static final int NO_ACTION_PRIORITY = 500;
113 private static final String DOWNSTREAM = "downstream";
114 private static final String UPSTREAM = "upstream";
115 private final Logger log = getLogger(getClass());
116
117 private ServiceDirectory serviceDirectory;
118 private FlowRuleService flowRuleService;
119 private GroupService groupService;
120 private CoreService coreService;
121 private StorageService storageService;
122
123 private DeviceId deviceId;
124 private ApplicationId appId;
125
126
127 protected FlowObjectiveStore flowObjectiveStore;
128
129 private Cache<GroupKey, NextObjective> pendingGroups;
130
131 protected static KryoNamespace appKryo = new KryoNamespace.Builder()
132 .register(KryoNamespaces.API)
133 .register(GroupKey.class)
134 .register(DefaultGroupKey.class)
135 .register(OltPipelineGroup.class)
136 .build("OltPipeline");
137
138 private static final Timer TIMER = new Timer("filterobj-batching");
139 private Accumulator<Pair<FilteringObjective, FlowRule>> accumulator;
140
141 // accumulator executor service
142 private ScheduledExecutorService accumulatorExecutorService
143 = newSingleThreadScheduledExecutor(groupedThreads("OltPipeliner", "acc-%d", log));
144
145 @Override
146 public void init(DeviceId deviceId, PipelinerContext context) {
147 log.debug("Initiate OLT pipeline");
148 this.serviceDirectory = context.directory();
149 this.deviceId = deviceId;
150
151 flowRuleService = serviceDirectory.get(FlowRuleService.class);
152 coreService = serviceDirectory.get(CoreService.class);
153 groupService = serviceDirectory.get(GroupService.class);
154 flowObjectiveStore = context.store();
155 storageService = serviceDirectory.get(StorageService.class);
156
157 appId = coreService.registerApplication(
158 "org.onosproject.driver.OLTPipeline");
159
160 // Init the accumulator, if enabled
161 if (isAccumulatorEnabled()) {
162 log.debug("Building accumulator with maxObjs {}, batchMs {}, idleMs {}",
163 context.accumulatorMaxObjectives(), context.accumulatorMaxBatchMillis(),
164 context.accumulatorMaxIdleMillis());
165 accumulator = new ObjectiveAccumulator(context.accumulatorMaxObjectives(),
166 context.accumulatorMaxBatchMillis(),
167 context.accumulatorMaxIdleMillis());
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100168 } else {
169 log.debug("Olt Pipeliner accumulator is disabled, processing immediately");
Andrea Campanella37f07e42021-02-16 11:24:39 +0100170 }
171
172
173 pendingGroups = CacheBuilder.newBuilder()
174 .expireAfterWrite(20, TimeUnit.SECONDS)
175 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
176 if (notification.getCause() == RemovalCause.EXPIRED) {
177 fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
178 }
179 }).build();
180
181 groupService.addListener(new InnerGroupListener());
182
183 }
184
185 public boolean isAccumulatorEnabled() {
186 Driver driver = super.data().driver();
187 // we cannot determine the property
188 if (driver == null) {
189 return false;
190 }
191 return Boolean.parseBoolean(driver.getProperty(ACCUMULATOR_ENABLED));
192 }
193
194 @Override
195 public void filter(FilteringObjective filter) {
196 Instructions.OutputInstruction output;
197
198 if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
199 output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
200 .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
201 .limit(1)
202 .findFirst().get();
203
204 if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
205 log.warn("OLT can only filter packet to controller");
206 fail(filter, ObjectiveError.UNSUPPORTED);
207 return;
208 }
209 } else {
210 fail(filter, ObjectiveError.BADPARAMS);
211 return;
212 }
213
214 if (filter.key().type() != Criterion.Type.IN_PORT) {
215 fail(filter, ObjectiveError.BADPARAMS);
216 return;
217 }
218
219 EthTypeCriterion ethType = (EthTypeCriterion)
220 filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
221
222 if (ethType == null) {
223 fail(filter, ObjectiveError.BADPARAMS);
224 return;
225 }
226 Optional<Instruction> vlanId = filter.meta().immediate().stream()
227 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
228 && ((L2ModificationInstruction) t).subtype()
229 .equals(L2ModificationInstruction.L2SubType.VLAN_ID))
230 .limit(1)
231 .findFirst();
232
233 Optional<Instruction> vlanPcp = filter.meta().immediate().stream()
234 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
235 && ((L2ModificationInstruction) t).subtype()
236 .equals(L2ModificationInstruction.L2SubType.VLAN_PCP))
237 .limit(1)
238 .findFirst();
239
240 Optional<Instruction> vlanPush = filter.meta().immediate().stream()
241 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
242 && ((L2ModificationInstruction) t).subtype()
243 .equals(L2ModificationInstruction.L2SubType.VLAN_PUSH))
244 .limit(1)
245 .findFirst();
246
247 if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
248
249 if (vlanId.isEmpty() || vlanPush.isEmpty()) {
250 log.warn("Missing EAPOL vlan or vlanPush");
251 fail(filter, ObjectiveError.BADPARAMS);
252 return;
253 }
254 provisionEthTypeBasedFilter(filter, ethType, output,
255 (L2ModificationInstruction) vlanId.get(),
256 (L2ModificationInstruction) vlanPush.get());
257 } else if (ethType.ethType().equals(EthType.EtherType.PPPoED.ethType())) {
258 provisionPPPoED(filter, ethType, vlanId.orElse(null), vlanPcp.orElse(null), output);
259 } else if (ethType.ethType().equals(EthType.EtherType.LLDP.ethType())) {
260 provisionEthTypeBasedFilter(filter, ethType, output, null, null);
261 } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
262 IPProtocolCriterion ipProto = (IPProtocolCriterion)
263 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
264 if (ipProto == null) {
265 log.warn("OLT can only filter IGMP and DHCP");
266 fail(filter, ObjectiveError.UNSUPPORTED);
267 return;
268 }
269 if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
270 provisionIgmp(filter, ethType, ipProto, output,
271 vlanId.orElse(null),
272 vlanPcp.orElse(null));
273 } else if (ipProto.protocol() == IPv4.PROTOCOL_UDP) {
274 UdpPortCriterion udpSrcPort = (UdpPortCriterion)
275 filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
276
277 UdpPortCriterion udpDstPort = (UdpPortCriterion)
278 filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
279
280 if ((udpSrcPort.udpPort().toInt() == 67 && udpDstPort.udpPort().toInt() == 68) ||
281 (udpSrcPort.udpPort().toInt() == 68 && udpDstPort.udpPort().toInt() == 67)) {
282 provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
283 vlanPcp.orElse(null), output);
284 } else {
285 log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
286 fail(filter, ObjectiveError.UNSUPPORTED);
287 }
288 } else {
289 log.warn("Currently supporting only IGMP and DHCP filters for IPv4 packets");
290 fail(filter, ObjectiveError.UNSUPPORTED);
291 }
292 } else if (ethType.ethType().equals(EthType.EtherType.IPV6.ethType())) {
293 IPProtocolCriterion ipProto = (IPProtocolCriterion)
294 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
295 if (ipProto == null) {
296 log.warn("OLT can only filter DHCP");
297 fail(filter, ObjectiveError.UNSUPPORTED);
298 return;
299 }
300 if (ipProto.protocol() == IPv6.PROTOCOL_UDP) {
301 UdpPortCriterion udpSrcPort = (UdpPortCriterion)
302 filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
303
304 UdpPortCriterion udpDstPort = (UdpPortCriterion)
305 filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
306
307 if ((udpSrcPort.udpPort().toInt() == 546 && udpDstPort.udpPort().toInt() == 547) ||
308 (udpSrcPort.udpPort().toInt() == 547 && udpDstPort.udpPort().toInt() == 546)) {
309 provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
310 vlanPcp.orElse(null), output);
311 } else {
312 log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
313 fail(filter, ObjectiveError.UNSUPPORTED);
314 }
315 } else {
316 log.warn("Currently supporting only DHCP filters for IPv6 packets");
317 fail(filter, ObjectiveError.UNSUPPORTED);
318 }
319 } else {
320 log.warn("\nOnly the following are Supported in OLT for filter ->\n"
321 + "ETH TYPE : EAPOL, LLDP and IPV4\n"
322 + "IPV4 TYPE: IGMP and UDP (for DHCP)"
323 + "IPV6 TYPE: UDP (for DHCP)");
324 fail(filter, ObjectiveError.UNSUPPORTED);
325 }
326
327 }
328
329
330 @Override
331 public void forward(ForwardingObjective fwd) {
332 log.debug("Installing forwarding objective {}", fwd);
333 if (checkForMulticast(fwd)) {
334 processMulticastRule(fwd);
335 return;
336 }
337
338 TrafficTreatment treatment = fwd.treatment();
339
340 List<Instruction> instructions = treatment.allInstructions();
341
342 Optional<Instruction> vlanInstruction = instructions.stream()
343 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
344 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
345 L2ModificationInstruction.L2SubType.VLAN_PUSH ||
346 ((L2ModificationInstruction) i).subtype() ==
347 L2ModificationInstruction.L2SubType.VLAN_POP)
348 .findAny();
349
350
351 if (!vlanInstruction.isPresent()) {
352 installNoModificationRules(fwd);
353 } else {
354 L2ModificationInstruction vlanIns =
355 (L2ModificationInstruction) vlanInstruction.get();
356 if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
357 installUpstreamRules(fwd);
358 } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
359 installDownstreamRules(fwd);
360 } else {
361 log.error("Unknown OLT operation: {}", fwd);
362 fail(fwd, ObjectiveError.UNSUPPORTED);
363 return;
364 }
365 }
366
367 pass(fwd);
368
369 }
370
371
372 @Override
373 public void next(NextObjective nextObjective) {
374 if (nextObjective.type() != NextObjective.Type.BROADCAST) {
375 log.error("OLT only supports broadcast groups.");
376 fail(nextObjective, ObjectiveError.BADPARAMS);
377 return;
378 }
379
380 if (nextObjective.next().size() != 1 && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
381 log.error("OLT only supports singleton broadcast groups.");
382 fail(nextObjective, ObjectiveError.BADPARAMS);
383 return;
384 }
385
386 Optional<TrafficTreatment> treatmentOpt = nextObjective.next().stream().findFirst();
387 if (treatmentOpt.isEmpty() && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
388 log.error("Next objective {} does not have a treatment", nextObjective);
389 fail(nextObjective, ObjectiveError.BADPARAMS);
390 return;
391 }
392
393 GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
394
395 pendingGroups.put(key, nextObjective);
396 log.trace("NextObjective Operation {}", nextObjective.op());
397 switch (nextObjective.op()) {
398 case ADD:
399 GroupDescription groupDesc =
400 new DefaultGroupDescription(deviceId,
401 GroupDescription.Type.ALL,
402 new GroupBuckets(
403 Collections.singletonList(
404 buildBucket(treatmentOpt.get()))),
405 key,
406 null,
407 nextObjective.appId());
408 groupService.addGroup(groupDesc);
409 break;
410 case REMOVE:
411 groupService.removeGroup(deviceId, key, nextObjective.appId());
412 break;
413 case ADD_TO_EXISTING:
414 groupService.addBucketsToGroup(deviceId, key,
415 new GroupBuckets(
416 Collections.singletonList(
417 buildBucket(treatmentOpt.get()))),
418 key, nextObjective.appId());
419 break;
420 case REMOVE_FROM_EXISTING:
421 groupService.removeBucketsFromGroup(deviceId, key,
422 new GroupBuckets(
423 Collections.singletonList(
424 buildBucket(treatmentOpt.get()))),
425 key, nextObjective.appId());
426 break;
427 default:
428 log.warn("Unknown next objective operation: {}", nextObjective.op());
429 }
430
431
432 }
433
Andrea Campanellaba1dab02021-12-01 17:16:05 -0800434 @Override
435 public void purgeAll(ApplicationId appId) {
436 log.warn("Purge All not implemented by OLT Pipeliner");
437 //TODO not used by OLT app, only by trellis
438 }
439
Andrea Campanella37f07e42021-02-16 11:24:39 +0100440 private GroupBucket buildBucket(TrafficTreatment treatment) {
441 return DefaultGroupBucket.createAllGroupBucket(treatment);
442 }
443
444 private void processMulticastRule(ForwardingObjective fwd) {
445 if (fwd.nextId() == null) {
446 log.error("Multicast objective does not have a next id");
447 fail(fwd, ObjectiveError.BADPARAMS);
448 }
449
450 GroupKey key = getGroupForNextObjective(fwd.nextId());
451
452 if (key == null) {
453 log.error("Group for forwarding objective missing: {}", fwd);
454 fail(fwd, ObjectiveError.GROUPMISSING);
455 }
456
457 Group group = groupService.getGroup(deviceId, key);
458 TrafficTreatment treatment =
459 buildTreatment(Instructions.createGroup(group.id()));
460
461 TrafficSelector.Builder selectorBuilder = buildIpv4SelectorForMulticast(fwd);
462
463 FlowRule rule = DefaultFlowRule.builder()
464 .fromApp(fwd.appId())
465 .forDevice(deviceId)
466 .forTable(0)
467 .makePermanent()
468 .withPriority(fwd.priority())
469 .withSelector(selectorBuilder.build())
470 .withTreatment(treatment)
471 .build();
472
473 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
474 switch (fwd.op()) {
475
476 case ADD:
477 builder.add(rule);
478 break;
479 case REMOVE:
480 builder.remove(rule);
481 break;
482 case ADD_TO_EXISTING:
483 case REMOVE_FROM_EXISTING:
484 break;
485 default:
486 log.warn("Unknown forwarding operation: {}", fwd.op());
487 }
488
489 applyFlowRules(ImmutableList.of(fwd), builder);
490
491
492 }
493
494 private TrafficSelector.Builder buildIpv4SelectorForMulticast(ForwardingObjective fwd) {
495 TrafficSelector.Builder builderToUpdate = DefaultTrafficSelector.builder();
496
497 Optional<Criterion> vlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.VLAN_VID);
498 if (vlanIdCriterion.isPresent()) {
499 VlanId assignedVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
500 builderToUpdate.matchVlanId(assignedVlan);
501 }
502
503 Optional<Criterion> innerVlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.INNER_VLAN_VID);
504 if (innerVlanIdCriterion.isPresent()) {
505 VlanId assignedInnerVlan = ((VlanIdCriterion) innerVlanIdCriterion.get()).vlanId();
506 builderToUpdate.matchMetadata(assignedInnerVlan.toShort());
507 }
508
509 Optional<Criterion> ethTypeCriterion = readFromSelector(fwd.selector(), Criterion.Type.ETH_TYPE);
510 if (ethTypeCriterion.isPresent()) {
511 EthType ethType = ((EthTypeCriterion) ethTypeCriterion.get()).ethType();
512 builderToUpdate.matchEthType(ethType.toShort());
513 }
514
515 Optional<Criterion> ipv4DstCriterion = readFromSelector(fwd.selector(), Criterion.Type.IPV4_DST);
516 if (ipv4DstCriterion.isPresent()) {
517 IpPrefix ipv4Dst = ((IPCriterion) ipv4DstCriterion.get()).ip();
518 builderToUpdate.matchIPDst(ipv4Dst);
519 }
520
521 return builderToUpdate;
522 }
523
524 static Optional<Criterion> readFromSelector(TrafficSelector selector, Criterion.Type type) {
525 if (selector == null) {
526 return Optional.empty();
527 }
528 Criterion criterion = selector.getCriterion(type);
529 return (criterion == null)
530 ? Optional.empty() : Optional.of(criterion);
531 }
532
533 private boolean checkForMulticast(ForwardingObjective fwd) {
534
535 IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
536 Criterion.Type.IPV4_DST);
537
538 if (ip == null) {
539 return false;
540 }
541
542 return ip.ip().isMulticast();
543
544 }
545
546 private GroupKey getGroupForNextObjective(Integer nextId) {
547 NextGroup next = flowObjectiveStore.getNextGroup(nextId);
548 return appKryo.deserialize(next.data());
549
550 }
551
552 private void installNoModificationRules(ForwardingObjective fwd) {
553 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
554 Instructions.MetadataInstruction writeMetadata = fetchWriteMetadata(fwd);
yasin saplib4b8ee12021-06-13 18:25:20 +0000555 Instructions.MeterInstruction meter = fwd.treatment().metered();
Andrea Campanella37f07e42021-02-16 11:24:39 +0100556
557 TrafficSelector selector = fwd.selector();
558
559 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
560 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
561 Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
562
563 if (inport == null || output == null || innerVlan == null || outerVlan == null) {
564 // Avoid logging a non-error from lldp, bbdp and eapol core flows.
565 if (!fwd.appId().name().equals(CORE_APP_NAME)) {
566 log.error("Forwarding objective is underspecified: {}", fwd);
567 } else {
568 log.debug("Not installing unsupported core generated flow {}", fwd);
569 }
570 fail(fwd, ObjectiveError.BADPARAMS);
571 return;
572 }
573
574
575 FlowRule.Builder outer = DefaultFlowRule.builder()
576 .fromApp(fwd.appId())
577 .forDevice(deviceId)
578 .makePermanent()
579 .withPriority(fwd.priority())
580 .withSelector(buildSelector(inport, outerVlan))
581 .withTreatment(buildTreatment(output, writeMetadata, meter));
582
583 applyRules(fwd, outer);
584 }
585
586 private void installDownstreamRules(ForwardingObjective fwd) {
587 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
588
589 if (output == null) {
590 return;
591 }
592
593 TrafficSelector selector = fwd.selector();
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100594 Criterion outerVlanCriterion = selector.getCriterion(Criterion.Type.VLAN_VID);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100595 Criterion outerPbit = selector.getCriterion(Criterion.Type.VLAN_PCP);
596 Criterion innerVlanCriterion = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
597 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
598 Criterion dstMac = selector.getCriterion(Criterion.Type.ETH_DST);
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100599 //TODO better check for innerVlan
600 if (outerVlanCriterion == null || inport == null) {
Andrea Campanella37f07e42021-02-16 11:24:39 +0100601 // Avoid logging a non-error from lldp, bbdp and eapol core flows.
602 if (!fwd.appId().name().equals(CORE_APP_NAME)) {
603 log.error("Forwarding objective is underspecified: {}", fwd);
604 } else {
605 log.debug("Not installing unsupported core generated flow {}", fwd);
606 }
607 fail(fwd, ObjectiveError.BADPARAMS);
608 return;
609 }
610
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100611 VlanId outerVlan = ((VlanIdCriterion) outerVlanCriterion).vlanId();
612 //Verify if this is needed.
613 Criterion outerVid = Criteria.matchVlanId(outerVlan);
614
Andrea Campanella37f07e42021-02-16 11:24:39 +0100615 VlanId innerVlan = ((VlanIdCriterion) innerVlanCriterion).vlanId();
616 Criterion innerVid = Criteria.matchVlanId(innerVlan);
617
618 // In the case where the C-tag is the same for all the subscribers,
619 // we add a metadata with the outport in the selector to make the flow unique
620 Criterion innerSelectorMeta = Criteria.matchMetadata(output.port().toLong());
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100621 if (outerVlan.toShort() == VlanId.ANY_VALUE) {
622 Criterion metadata = Criteria.matchMetadata(innerVlan.toShort());
623 TrafficSelector outerSelector = buildSelector(inport, metadata, outerVlanCriterion, outerPbit, dstMac);
624 installDownstreamRulesForOuterAnyVlan(fwd, output, outerSelector, buildSelector(inport, innerVid,
625 innerSelectorMeta));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100626
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100627 } else if (innerVlan.toShort() == VlanId.ANY_VALUE) {
628 TrafficSelector outerSelector = buildSelector(inport, outerVlanCriterion, outerPbit, dstMac);
629
630 Criterion matchedVlanId = Criteria.matchVlanId(VlanId.ANY);
631 installDownstreamRulesForInnerAnyVlan(fwd, output, outerSelector,
632 buildSelector(inport,
633 matchedVlanId,
634 innerSelectorMeta));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100635 } else {
636 // Required to differentiate the same match flows
637 // Please note that S tag and S p bit values will be same for the same service - so conflict flows!
638 // Metadata match criteria solves the conflict issue - but not used by the voltha
639 // Maybe - find a better way to solve the above problem
640 Criterion metadata = Criteria.matchMetadata(innerVlan.toShort());
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100641 TrafficSelector outerSelector = buildSelector(inport, metadata, outerVlanCriterion, outerPbit, dstMac);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100642 installDownstreamRulesForVlans(fwd, output, outerSelector, buildSelector(inport, innerVid,
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100643 innerSelectorMeta));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100644 }
645 }
646
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100647 private void installDownstreamRulesForOuterAnyVlan(ForwardingObjective fwd, Instruction output,
648 TrafficSelector outerSelector, TrafficSelector innerSelector) {
649
650 Instruction onuDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_ONU));
651 Instruction oltDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_OLT));
652
653 List<Pair<Instruction, Instruction>> vlanOps =
654 vlanOps(fwd,
655 L2ModificationInstruction.L2SubType.VLAN_POP);
656
657 if (vlanOps == null || vlanOps.isEmpty()) {
658 return;
659 }
660
661 Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
662
663 TrafficTreatment innerTreatment;
664 VlanId setVlanId = ((L2ModificationInstruction.ModVlanIdInstruction) popAndRewrite.getRight()).vlanId();
665 if (VlanId.NONE.equals(setVlanId)) {
666 innerTreatment = (buildTreatment(popAndRewrite.getLeft(), onuDsMeter,
667 writeMetadataIncludingOnlyTp(fwd), output));
668 } else {
669 innerTreatment = (buildTreatment(popAndRewrite.getRight(),
670 onuDsMeter, writeMetadataIncludingOnlyTp(fwd), output));
671 }
672
673 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
674 fwd.treatment().allInstructions());
675
676 Instruction innerPbitSet = null;
677
678 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
679 innerPbitSet = setVlanPcps.get(0);
680 }
681
682 VlanId remarkInnerVlan = null;
683 Optional<Criterion> vlanIdCriterion = readFromSelector(innerSelector, Criterion.Type.VLAN_VID);
684 if (vlanIdCriterion.isPresent()) {
685 remarkInnerVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
686 }
687
688 Instruction modVlanId = null;
689 if (innerPbitSet != null) {
690 modVlanId = Instructions.modVlanId(remarkInnerVlan);
691 }
692
693 //match: in port (nni), s-tag
694 //action: pop vlan (s-tag), write metadata, go to table 1, meter
695 FlowRule.Builder outer = DefaultFlowRule.builder()
696 .fromApp(fwd.appId())
697 .forDevice(deviceId)
698 .makePermanent()
699 .withPriority(fwd.priority())
700 .withSelector(outerSelector)
701 .withTreatment(buildTreatment(oltDsMeter,
702 fetchWriteMetadata(fwd),
703 Instructions.transition(QQ_TABLE)));
704
705 //match: in port (nni), c-tag
706 //action: immediate: write metadata and pop, meter, output
707 FlowRule.Builder inner = DefaultFlowRule.builder()
708 .fromApp(fwd.appId())
709 .forDevice(deviceId)
710 .forTable(QQ_TABLE)
711 .makePermanent()
712 .withPriority(fwd.priority())
713 .withSelector(innerSelector)
714 .withTreatment(innerTreatment);
715 applyRules(fwd, inner, outer);
716 }
717
Andrea Campanella37f07e42021-02-16 11:24:39 +0100718 private void installDownstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
719 TrafficSelector outerSelector, TrafficSelector innerSelector) {
720
yasin saplib4b8ee12021-06-13 18:25:20 +0000721 Instruction onuDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_ONU));
722 Instruction oltDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_OLT));
723
Andrea Campanella37f07e42021-02-16 11:24:39 +0100724 List<Pair<Instruction, Instruction>> vlanOps =
725 vlanOps(fwd,
726 L2ModificationInstruction.L2SubType.VLAN_POP);
727
728 if (vlanOps == null || vlanOps.isEmpty()) {
729 return;
730 }
731
732 Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
733
734 TrafficTreatment innerTreatment;
735 VlanId setVlanId = ((L2ModificationInstruction.ModVlanIdInstruction) popAndRewrite.getRight()).vlanId();
736 if (VlanId.NONE.equals(setVlanId)) {
yasin saplib4b8ee12021-06-13 18:25:20 +0000737 innerTreatment = (buildTreatment(popAndRewrite.getLeft(), onuDsMeter,
738 writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100739 } else {
740 innerTreatment = (buildTreatment(popAndRewrite.getRight(),
yasin saplib4b8ee12021-06-13 18:25:20 +0000741 onuDsMeter, writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100742 }
743
744 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
745 fwd.treatment().allInstructions());
746
747 Instruction innerPbitSet = null;
748
749 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
750 innerPbitSet = setVlanPcps.get(0);
751 }
752
753 VlanId remarkInnerVlan = null;
754 Optional<Criterion> vlanIdCriterion = readFromSelector(innerSelector, Criterion.Type.VLAN_VID);
755 if (vlanIdCriterion.isPresent()) {
756 remarkInnerVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
757 }
758
759 Instruction modVlanId = null;
760 if (innerPbitSet != null) {
761 modVlanId = Instructions.modVlanId(remarkInnerVlan);
762 }
763
764 //match: in port (nni), s-tag
765 //action: pop vlan (s-tag), write metadata, go to table 1, meter
766 FlowRule.Builder outer = DefaultFlowRule.builder()
767 .fromApp(fwd.appId())
768 .forDevice(deviceId)
769 .makePermanent()
770 .withPriority(fwd.priority())
771 .withSelector(outerSelector)
772 .withTreatment(buildTreatment(popAndRewrite.getLeft(), modVlanId,
yasin saplib4b8ee12021-06-13 18:25:20 +0000773 innerPbitSet, oltDsMeter,
Andrea Campanella37f07e42021-02-16 11:24:39 +0100774 fetchWriteMetadata(fwd),
775 Instructions.transition(QQ_TABLE)));
776
777 //match: in port (nni), c-tag
778 //action: immediate: write metadata and pop, meter, output
779 FlowRule.Builder inner = DefaultFlowRule.builder()
780 .fromApp(fwd.appId())
781 .forDevice(deviceId)
782 .forTable(QQ_TABLE)
783 .makePermanent()
784 .withPriority(fwd.priority())
785 .withSelector(innerSelector)
786 .withTreatment(innerTreatment);
787 applyRules(fwd, inner, outer);
788 }
789
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100790 private void installDownstreamRulesForInnerAnyVlan(ForwardingObjective fwd, Instruction output,
791 TrafficSelector outerSelector, TrafficSelector innerSelector) {
Andrea Campanella37f07e42021-02-16 11:24:39 +0100792
yasin saplib4b8ee12021-06-13 18:25:20 +0000793 Instruction onuDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_ONU));
794 Instruction oltDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_OLT));
795
Andrea Campanella37f07e42021-02-16 11:24:39 +0100796 //match: in port (nni), s-tag
797 //action: immediate: write metadata, pop vlan, meter and go to table 1
798 FlowRule.Builder outer = DefaultFlowRule.builder()
799 .fromApp(fwd.appId())
800 .forDevice(deviceId)
801 .makePermanent()
802 .withPriority(fwd.priority())
803 .withSelector(outerSelector)
yasin saplib4b8ee12021-06-13 18:25:20 +0000804 .withTreatment(buildTreatment(Instructions.popVlan(), oltDsMeter,
Andrea Campanella37f07e42021-02-16 11:24:39 +0100805 fetchWriteMetadata(fwd), Instructions.transition(QQ_TABLE)));
806
807 //match: in port (nni) and s-tag
808 //action: immediate : write metadata, meter and output
809 FlowRule.Builder inner = DefaultFlowRule.builder()
810 .fromApp(fwd.appId())
811 .forDevice(deviceId)
812 .forTable(QQ_TABLE)
813 .makePermanent()
814 .withPriority(fwd.priority())
815 .withSelector(innerSelector)
yasin saplib4b8ee12021-06-13 18:25:20 +0000816 .withTreatment(buildTreatment(onuDsMeter, writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100817
818 applyRules(fwd, inner, outer);
819 }
820
821 private void installUpstreamRules(ForwardingObjective fwd) {
822 List<Pair<Instruction, Instruction>> vlanOps =
823 vlanOps(fwd,
824 L2ModificationInstruction.L2SubType.VLAN_PUSH);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100825 if (vlanOps == null || vlanOps.isEmpty()) {
826 return;
827 }
828
829 Instruction output = fetchOutput(fwd, UPSTREAM);
830
831 if (output == null) {
832 return;
833 }
834
835 Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
836
837 boolean noneValueVlanStatus = checkNoneVlanCriteria(fwd);
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100838 //check if treatment is PUSH or POP
839 boolean popAndPush = checkIfIsPopAndPush(fwd);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100840 boolean anyValueVlanStatus = checkAnyVlanMatchCriteria(fwd);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100841 if (anyValueVlanStatus) {
842 installUpstreamRulesForAnyVlan(fwd, output, outerPair);
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100843 } else if (popAndPush) {
844 Pair<Instruction, Instruction> innerPair = outerPair;
845 outerPair = vlanOps.remove(0);
846 installUpstreamRulesForAnyOuterVlan(fwd, output, innerPair, outerPair, noneValueVlanStatus);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100847 } else {
848 Pair<Instruction, Instruction> innerPair = outerPair;
849 outerPair = vlanOps.remove(0);
850 installUpstreamRulesForVlans(fwd, output, innerPair, outerPair, noneValueVlanStatus);
851 }
852 }
853
854 private void installUpstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
855 Pair<Instruction, Instruction> innerPair,
856 Pair<Instruction, Instruction> outerPair, Boolean noneValueVlanStatus) {
857
yasin saplib4b8ee12021-06-13 18:25:20 +0000858 Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_ONU));
859 Instruction oltUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_OLT));
860
Andrea Campanella37f07e42021-02-16 11:24:39 +0100861 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
862 fwd.treatment().allInstructions());
863
864 Instruction innerPbitSet = null;
865 Instruction outerPbitSet = null;
866
867 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
868 innerPbitSet = setVlanPcps.get(0);
869 outerPbitSet = setVlanPcps.get(1);
870 }
871
872 TrafficTreatment innerTreatment;
873 if (noneValueVlanStatus) {
yasin saplib4b8ee12021-06-13 18:25:20 +0000874 innerTreatment = buildTreatment(innerPair.getLeft(), innerPair.getRight(), onuUsMeter,
Andrea Campanella37f07e42021-02-16 11:24:39 +0100875 fetchWriteMetadata(fwd), innerPbitSet,
876 Instructions.transition(QQ_TABLE));
877 } else {
yasin saplib4b8ee12021-06-13 18:25:20 +0000878 innerTreatment = buildTreatment(innerPair.getRight(), onuUsMeter, fetchWriteMetadata(fwd),
Andrea Campanella37f07e42021-02-16 11:24:39 +0100879 innerPbitSet, Instructions.transition(QQ_TABLE));
880 }
881
882 //match: in port, vlanId (0 or None)
883 //action:
884 //if vlanId None, push & set c-tag go to table 1
885 //if vlanId 0 or any specific vlan, set c-tag, write metadata, meter and go to table 1
886 FlowRule.Builder inner = DefaultFlowRule.builder()
887 .fromApp(fwd.appId())
888 .forDevice(deviceId)
889 .makePermanent()
890 .withPriority(fwd.priority())
891 .withSelector(fwd.selector())
892 .withTreatment(innerTreatment);
893
894 PortCriterion inPort = (PortCriterion)
895 fwd.selector().getCriterion(Criterion.Type.IN_PORT);
896
897 VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
898 innerPair.getRight()).vlanId();
899
900 //match: in port, c-tag
901 //action: immediate: push s-tag, write metadata, meter and output
902 FlowRule.Builder outer = DefaultFlowRule.builder()
903 .fromApp(fwd.appId())
904 .forDevice(deviceId)
905 .forTable(QQ_TABLE)
906 .makePermanent()
907 .withPriority(fwd.priority())
908 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
yasin saplib4b8ee12021-06-13 18:25:20 +0000909 oltUsMeter, writeMetadataIncludingOnlyTp(fwd),
910 outerPbitSet, output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100911
912 if (innerPbitSet != null) {
913 byte innerPbit = ((L2ModificationInstruction.ModVlanPcpInstruction)
914 innerPbitSet).vlanPcp();
915 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId), Criteria.matchVlanPcp(innerPbit)));
916 } else {
917 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId)));
918 }
919
920 applyRules(fwd, inner, outer);
921 }
922
923 private void installUpstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
924 Pair<Instruction, Instruction> outerPair) {
925
926 log.debug("Installing upstream rules for any value vlan");
yasin saplib4b8ee12021-06-13 18:25:20 +0000927 Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_ONU));
928 Instruction oltUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_OLT));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100929
930 //match: in port and any-vlan (coming from OLT app.)
931 //action: write metadata, go to table 1 and meter
932 FlowRule.Builder inner = DefaultFlowRule.builder()
933 .fromApp(fwd.appId())
934 .forDevice(deviceId)
935 .makePermanent()
936 .withPriority(fwd.priority())
937 .withSelector(fwd.selector())
yasin saplib4b8ee12021-06-13 18:25:20 +0000938 .withTreatment(buildTreatment(Instructions.transition(QQ_TABLE), onuUsMeter, fetchWriteMetadata(fwd)));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100939
940 //match: in port and any-vlan (coming from OLT app.)
941 //action: immediate: push:QinQ, vlanId (s-tag), write metadata, meter and output
942 FlowRule.Builder outer = DefaultFlowRule.builder()
943 .fromApp(fwd.appId())
944 .forDevice(deviceId)
945 .forTable(QQ_TABLE)
946 .makePermanent()
947 .withPriority(fwd.priority())
948 .withSelector(fwd.selector())
949 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
yasin saplib4b8ee12021-06-13 18:25:20 +0000950 oltUsMeter, writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100951
952 applyRules(fwd, inner, outer);
953 }
954
955 private boolean checkNoneVlanCriteria(ForwardingObjective fwd) {
956 // Add the VLAN_PUSH treatment if we're matching on VlanId.NONE
957 Criterion vlanMatchCriterion = filterForCriterion(fwd.selector().criteria(), Criterion.Type.VLAN_VID);
958 boolean noneValueVlanStatus = false;
959 if (vlanMatchCriterion != null) {
960 noneValueVlanStatus = ((VlanIdCriterion) vlanMatchCriterion).vlanId().equals(VlanId.NONE);
961 }
962 return noneValueVlanStatus;
963 }
964
965 private boolean checkAnyVlanMatchCriteria(ForwardingObjective fwd) {
966 Criterion anyValueVlanCriterion = fwd.selector().criteria().stream()
967 .filter(c -> c.type().equals(Criterion.Type.VLAN_VID))
968 .filter(vc -> ((VlanIdCriterion) vc).vlanId().toShort() == VlanId.ANY_VALUE)
969 .findAny().orElse(null);
970
971 if (anyValueVlanCriterion == null) {
972 log.debug("Any value vlan match criteria is not found, criteria {}",
973 fwd.selector().criteria());
974 return false;
975 }
976
977 return true;
978 }
979
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100980 private boolean checkIfIsPopAndPush(ForwardingObjective fwd) {
981 TrafficTreatment treatment = fwd.treatment();
982 List<Instruction> instructions = treatment.allInstructions();
983 Optional<Instruction> vlanInstructionPush = instructions.stream()
984 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
985 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
986 L2ModificationInstruction.L2SubType.VLAN_PUSH)
987 .findAny();
988 Optional<Instruction> vlanInstructionPop = instructions.stream()
989 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
990 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
991 L2ModificationInstruction.L2SubType.VLAN_POP)
992 .findAny();
993 return vlanInstructionPush.isPresent() && vlanInstructionPop.isPresent();
994 }
995
Andrea Campanella37f07e42021-02-16 11:24:39 +0100996 private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
997 Instruction output = fwd.treatment().allInstructions().stream()
998 .filter(i -> i.type() == Instruction.Type.OUTPUT)
999 .findFirst().orElse(null);
1000
1001 if (output == null) {
1002 log.error("OLT {} rule has no output", direction);
1003 fail(fwd, ObjectiveError.BADPARAMS);
1004 return null;
1005 }
1006 return output;
1007 }
1008
yasin saplib4b8ee12021-06-13 18:25:20 +00001009 private Instruction fetchMeterById(ForwardingObjective fwd, String meterId) {
1010 Optional<Instructions.MeterInstruction> meter = fwd.treatment().meters().stream()
1011 .filter(meterInstruction -> meterInstruction.meterId().toString().equals(meterId)).findAny();
1012 if (meter.isEmpty()) {
1013 log.debug("Meter instruction is not found for the meterId: {} ", meterId);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001014 return null;
1015 }
yasin saplib4b8ee12021-06-13 18:25:20 +00001016 log.debug("Meter instruction is found for the meterId: {} ", meterId);
1017 return meter.get();
Andrea Campanella37f07e42021-02-16 11:24:39 +01001018 }
1019
1020 private Instructions.MetadataInstruction fetchWriteMetadata(ForwardingObjective fwd) {
1021 Instructions.MetadataInstruction writeMetadata = fwd.treatment().writeMetadata();
1022
1023 if (writeMetadata == null) {
1024 log.warn("Write metadata is not found for the forwarding obj");
1025 fail(fwd, ObjectiveError.BADPARAMS);
1026 return null;
1027 }
1028
1029 log.debug("Write metadata is found {}", writeMetadata);
1030 return writeMetadata;
1031 }
1032
1033 private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
1034 L2ModificationInstruction.L2SubType type) {
1035
1036 List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
1037 fwd.treatment().allInstructions(), type);
1038
1039 if (vlanOps == null || vlanOps.isEmpty()) {
1040 String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
1041 ? DOWNSTREAM : UPSTREAM;
1042 log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
1043 fail(fwd, ObjectiveError.BADPARAMS);
1044 return ImmutableList.of();
1045 }
1046 return vlanOps;
1047 }
1048
1049
1050 private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
1051 L2ModificationInstruction.L2SubType type) {
1052
1053 List<Instruction> vlanOperations = findL2Instructions(
1054 type,
1055 instructions);
1056 List<Instruction> vlanSets = findL2Instructions(
1057 L2ModificationInstruction.L2SubType.VLAN_ID,
1058 instructions);
1059
1060 if (vlanOperations.size() != vlanSets.size()) {
1061 return ImmutableList.of();
1062 }
1063
1064 List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
1065
1066 for (int i = 0; i < vlanOperations.size(); i++) {
1067 pairs.add(new ImmutablePair<>(vlanOperations.get(i), vlanSets.get(i)));
1068 }
1069 return pairs;
1070 }
1071
1072 private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
1073 List<Instruction> actions) {
1074 return actions.stream()
1075 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
1076 .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
1077 .collect(Collectors.toList());
1078 }
1079
1080 private void provisionEthTypeBasedFilter(FilteringObjective filter,
1081 EthTypeCriterion ethType,
1082 Instructions.OutputInstruction output,
1083 L2ModificationInstruction vlanId,
1084 L2ModificationInstruction vlanPush) {
1085
1086 Instruction meter = filter.meta().metered();
1087 Instruction writeMetadata = filter.meta().writeMetadata();
1088
1089 TrafficSelector selector = buildSelector(filter.key(), ethType);
1090 TrafficTreatment treatment;
1091
1092 if (vlanPush == null || vlanId == null) {
1093 treatment = buildTreatment(output, meter, writeMetadata);
1094 } else {
1095 // we need to push the vlan because it came untagged (ATT)
1096 treatment = buildTreatment(output, meter, vlanPush, vlanId, writeMetadata);
1097 }
1098
1099 buildAndApplyRule(filter, selector, treatment);
1100
1101 }
1102
1103 private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
1104 IPProtocolCriterion ipProto,
1105 Instructions.OutputInstruction output,
1106 Instruction vlan, Instruction pcp) {
1107
1108 Instruction meter = filter.meta().metered();
1109 Instruction writeMetadata = filter.meta().writeMetadata();
1110
1111 // uniTagMatch
1112 VlanIdCriterion vlanId = (VlanIdCriterion) filterForCriterion(filter.conditions(),
1113 Criterion.Type.VLAN_VID);
1114
1115 TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto, vlanId);
1116 TrafficTreatment treatment = buildTreatment(output, vlan, pcp, meter, writeMetadata);
1117 buildAndApplyRule(filter, selector, treatment);
1118 }
1119
1120 private void provisionDhcp(FilteringObjective filter, EthTypeCriterion ethType,
1121 IPProtocolCriterion ipProto,
1122 UdpPortCriterion udpSrcPort,
1123 UdpPortCriterion udpDstPort,
1124 Instruction vlanIdInstruction,
1125 Instruction vlanPcpInstruction,
1126 Instructions.OutputInstruction output) {
1127
1128 Instruction meter = filter.meta().metered();
1129 Instruction writeMetadata = filter.meta().writeMetadata();
1130
1131 VlanIdCriterion matchVlanId = (VlanIdCriterion)
1132 filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
1133
1134 TrafficSelector selector;
1135 TrafficTreatment treatment;
1136
1137 if (matchVlanId != null) {
1138 log.debug("Building selector with match VLAN, {}", matchVlanId);
1139 // in case of TT upstream the packet comes tagged and the vlan is swapped.
1140 selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort,
1141 udpDstPort, matchVlanId);
1142 treatment = buildTreatment(output, meter, writeMetadata,
1143 vlanIdInstruction, vlanPcpInstruction);
1144 } else {
1145 log.debug("Building selector with no VLAN");
1146 // in case of ATT upstream the packet comes in untagged and we need to push the vlan
1147 selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort, udpDstPort);
1148 treatment = buildTreatment(output, meter, vlanIdInstruction, writeMetadata);
1149 }
1150 //In case of downstream there will be no match on the VLAN, which is null,
1151 // so it will just be output, meter, writeMetadata
1152
1153 buildAndApplyRule(filter, selector, treatment);
1154 }
1155
1156 private void provisionPPPoED(FilteringObjective filter, EthTypeCriterion ethType,
1157 Instruction vlanIdInstruction,
1158 Instruction vlanPcpInstruction,
1159 Instructions.OutputInstruction output) {
1160 Instruction meter = filter.meta().metered();
1161 Instruction writeMetadata = filter.meta().writeMetadata();
1162
1163 VlanIdCriterion matchVlanId = (VlanIdCriterion)
1164 filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
1165
1166 TrafficSelector selector;
1167 TrafficTreatment treatment;
1168
1169 if (matchVlanId != null) {
1170 log.debug("Building pppoed selector with match VLAN {}.", matchVlanId);
1171 } else {
1172 log.debug("Building pppoed selector without match VLAN.");
1173 }
1174
1175 selector = buildSelector(filter.key(), ethType, matchVlanId);
1176 treatment = buildTreatment(output, meter, writeMetadata, vlanIdInstruction, vlanPcpInstruction);
1177 buildAndApplyRule(filter, selector, treatment);
1178 }
1179
1180 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
1181 TrafficTreatment treatment) {
1182 FlowRule rule = DefaultFlowRule.builder()
1183 .fromApp(filter.appId())
1184 .forDevice(deviceId)
1185 .forTable(0)
1186 .makePermanent()
1187 .withSelector(selector)
1188 .withTreatment(treatment)
1189 .withPriority(filter.priority())
1190 .build();
1191
1192 if (accumulator != null) {
1193 if (log.isDebugEnabled()) {
1194 log.debug("Adding pair to batch: {}", Pair.of(filter, rule));
1195 }
1196 accumulator.add(Pair.of(filter, rule));
1197 } else {
1198 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
1199 switch (filter.type()) {
1200 case PERMIT:
1201 opsBuilder.add(rule);
1202 break;
1203 case DENY:
1204 opsBuilder.remove(rule);
1205 break;
1206 default:
1207 log.warn("Unknown filter type : {}", filter.type());
1208 fail(filter, ObjectiveError.UNSUPPORTED);
1209 }
1210 applyFlowRules(ImmutableList.of(filter), opsBuilder);
1211 }
1212 }
1213
1214 private void applyRules(ForwardingObjective fwd, FlowRule.Builder... fwdBuilders) {
1215 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
1216 switch (fwd.op()) {
1217 case ADD:
1218 for (FlowRule.Builder fwdBuilder : fwdBuilders) {
1219 builder.add(fwdBuilder.build());
1220 }
1221 break;
1222 case REMOVE:
1223 for (FlowRule.Builder fwdBuilder : fwdBuilders) {
1224 builder.remove(fwdBuilder.build());
1225 }
1226 break;
1227 case ADD_TO_EXISTING:
1228 break;
1229 case REMOVE_FROM_EXISTING:
1230 break;
1231 default:
1232 log.warn("Unknown forwarding operation: {}", fwd.op());
1233 }
1234
1235 applyFlowRules(ImmutableList.of(fwd), builder);
1236
1237
1238 }
1239
1240 private void applyFlowRules(List<Objective> objectives, FlowRuleOperations.Builder builder) {
1241 flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
1242 @Override
1243 public void onSuccess(FlowRuleOperations ops) {
1244 objectives.forEach(obj -> {
1245 pass(obj);
1246 });
1247 }
1248
1249 @Override
1250 public void onError(FlowRuleOperations ops) {
1251 objectives.forEach(obj -> {
1252 fail(obj, ObjectiveError.FLOWINSTALLATIONFAILED);
1253 });
1254
1255 }
1256 }));
1257 }
1258
1259 // Builds the batch using the accumulated flow rules
1260 private void sendFilters(List<Pair<FilteringObjective, FlowRule>> pairs) {
1261 FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07001262 if (log.isDebugEnabled()) {
1263 log.debug("Sending batch of {} filter-objs", pairs.size());
1264 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001265 List<Objective> filterObjs = Lists.newArrayList();
1266 // Iterates over all accumulated flow rules and then build an unique batch
1267 pairs.forEach(pair -> {
1268 FilteringObjective filter = pair.getLeft();
1269 FlowRule rule = pair.getRight();
1270 switch (filter.type()) {
1271 case PERMIT:
1272 flowOpsBuilder.add(rule);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07001273 if (log.isTraceEnabled()) {
1274 log.trace("Applying add filter-obj {} to device: {}", filter.id(), deviceId);
1275 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001276 filterObjs.add(filter);
1277 break;
1278 case DENY:
1279 flowOpsBuilder.remove(rule);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07001280 if (log.isTraceEnabled()) {
1281 log.trace("Deleting flow rule {} from device: {}", rule, deviceId);
1282 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001283 filterObjs.add(filter);
1284 break;
1285 default:
1286 fail(filter, ObjectiveError.UNKNOWN);
1287 log.warn("Unknown forwarding type {}", filter.type());
1288 }
1289 });
1290 if (log.isDebugEnabled()) {
1291 log.debug("Applying batch {}", flowOpsBuilder.build());
1292 }
1293 // Finally applies the operations
1294 applyFlowRules(filterObjs, flowOpsBuilder);
1295 }
1296
1297 private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
1298 return criteria.stream()
1299 .filter(c -> c.type().equals(type))
1300 .limit(1)
1301 .findFirst().orElse(null);
1302 }
1303
1304 private TrafficSelector buildSelector(Criterion... criteria) {
1305
1306 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
1307
1308 Arrays.stream(criteria).filter(Objects::nonNull).forEach(sBuilder::add);
1309
1310 return sBuilder.build();
1311 }
1312
1313 private TrafficTreatment buildTreatment(Instruction... instructions) {
1314
1315 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
1316
1317 Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add);
1318
1319 return tBuilder.build();
1320 }
1321
1322 private Instruction writeMetadataIncludingOnlyTp(ForwardingObjective fwd) {
1323
1324 return Instructions.writeMetadata(
1325 fetchWriteMetadata(fwd).metadata() & 0xFFFF00000000L, 0L);
1326 }
1327
1328 private void fail(Objective obj, ObjectiveError error) {
1329 obj.context().ifPresent(context -> context.onError(obj, error));
1330 }
1331
1332 private void pass(Objective obj) {
1333 obj.context().ifPresent(context -> context.onSuccess(obj));
1334 }
1335
1336
1337 private class InnerGroupListener implements GroupListener {
1338 @Override
1339 public void event(GroupEvent event) {
1340 GroupKey key = event.subject().appCookie();
1341 NextObjective obj = pendingGroups.getIfPresent(key);
1342 if (obj == null) {
Andrea Campanella438e1ad2021-03-26 11:41:16 +01001343 if (log.isTraceEnabled()) {
1344 log.trace("No pending group for {}, moving on", key);
1345 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001346 return;
1347 }
Andrea Campanella438e1ad2021-03-26 11:41:16 +01001348 log.debug("Event {} for group {}, handling pending" +
Andrea Campanella37f07e42021-02-16 11:24:39 +01001349 "NextGroup {}", event.type(), key, obj.id());
1350 if (event.type() == GroupEvent.Type.GROUP_ADDED ||
1351 event.type() == GroupEvent.Type.GROUP_UPDATED) {
1352 flowObjectiveStore.putNextGroup(obj.id(), new OltPipelineGroup(key));
Andrea Campanella37f07e42021-02-16 11:24:39 +01001353 pendingGroups.invalidate(key);
Esin Karamand106e522021-03-15 14:08:48 +00001354 pass(obj);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001355 } else if (event.type() == GroupEvent.Type.GROUP_REMOVED) {
1356 flowObjectiveStore.removeNextGroup(obj.id());
Andrea Campanella37f07e42021-02-16 11:24:39 +01001357 pendingGroups.invalidate(key);
Esin Karamand106e522021-03-15 14:08:48 +00001358 pass(obj);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001359 }
1360 }
1361 }
1362
1363 private static class OltPipelineGroup implements NextGroup {
1364
1365 private final GroupKey key;
1366
1367 public OltPipelineGroup(GroupKey key) {
1368 this.key = key;
1369 }
1370
1371 public GroupKey key() {
1372 return key;
1373 }
1374
1375 @Override
1376 public byte[] data() {
1377 return appKryo.serialize(key);
1378 }
1379
1380 }
1381
1382 @Override
1383 public List<String> getNextMappings(NextGroup nextGroup) {
1384 // TODO Implementation deferred to vendor
1385 return null;
1386 }
1387
1388 // Flow rules accumulator for reducing the number of transactions required to the devices.
1389 private final class ObjectiveAccumulator
1390 extends AbstractAccumulator<Pair<FilteringObjective, FlowRule>> {
1391
1392 ObjectiveAccumulator(int maxFilter, int maxBatchMS, int maxIdleMS) {
1393 super(TIMER, maxFilter, maxBatchMS, maxIdleMS);
1394 }
1395
1396 @Override
1397 public void processItems(List<Pair<FilteringObjective, FlowRule>> pairs) {
1398 // Triggers creation of a batch using the list of flowrules generated from objs.
1399 accumulatorExecutorService.execute(new FlowRulesBuilderTask(pairs));
1400 }
1401 }
1402
1403 // Task for building batch of flow rules in a separate thread.
1404 private final class FlowRulesBuilderTask implements Runnable {
1405 private final List<Pair<FilteringObjective, FlowRule>> pairs;
1406
1407 FlowRulesBuilderTask(List<Pair<FilteringObjective, FlowRule>> pairs) {
1408 this.pairs = pairs;
1409 }
1410
1411 @Override
1412 public void run() {
1413 try {
1414 sendFilters(pairs);
1415 } catch (Exception e) {
1416 log.warn("Unable to send objectives", e);
1417 }
1418 }
1419 }
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +01001420
1421 private void installUpstreamRulesForAnyOuterVlan(ForwardingObjective fwd, Instruction output,
1422 Pair<Instruction, Instruction> innerPair,
1423 Pair<Instruction, Instruction> outerPair, Boolean noneValueVlanStatus) {
1424
1425 Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_ONU));
1426 Instruction oltUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_OLT));
1427
1428 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
1429 fwd.treatment().allInstructions());
1430
1431 Instruction innerPbitSet = null;
1432 Instruction outerPbitSet = null;
1433
1434 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
1435 innerPbitSet = setVlanPcps.get(0);
1436 outerPbitSet = setVlanPcps.get(1);
1437 }
1438
1439 TrafficTreatment innerTreatment;
1440 if (noneValueVlanStatus) {
1441 innerTreatment = buildTreatment(innerPair.getLeft(), innerPair.getRight(), onuUsMeter,
1442 fetchWriteMetadata(fwd), innerPbitSet,
1443 Instructions.transition(QQ_TABLE));
1444 } else {
1445 innerTreatment = buildTreatment(innerPair.getRight(), onuUsMeter, fetchWriteMetadata(fwd),
1446 innerPbitSet, Instructions.transition(QQ_TABLE));
1447 }
1448
1449 //match: in port, vlanId (0 or None)
1450 //action:
1451 //if vlanId None, push & set c-tag go to table 1
1452 //if vlanId 0 or any specific vlan, set c-tag, write metadata, meter and go to table 1
1453 FlowRule.Builder inner = DefaultFlowRule.builder()
1454 .fromApp(fwd.appId())
1455 .forDevice(deviceId)
1456 .makePermanent()
1457 .withPriority(fwd.priority())
1458 .withSelector(fwd.selector())
1459 .withTreatment(innerTreatment);
1460
1461 PortCriterion inPort = (PortCriterion)
1462 fwd.selector().getCriterion(Criterion.Type.IN_PORT);
1463
1464 VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
1465 innerPair.getRight()).vlanId();
1466
1467 //match: in port, c-tag
1468 //action: immediate: push s-tag, write metadata, meter and output
1469 FlowRule.Builder outer = DefaultFlowRule.builder()
1470 .fromApp(fwd.appId())
1471 .forDevice(deviceId)
1472 .forTable(QQ_TABLE)
1473 .makePermanent()
1474 .withPriority(fwd.priority())
1475 .withTreatment(buildTreatment(oltUsMeter, writeMetadataIncludingOnlyTp(fwd),
1476 outerPbitSet, output));
1477
1478 if (innerPbitSet != null) {
1479 byte innerPbit = ((L2ModificationInstruction.ModVlanPcpInstruction)
1480 innerPbitSet).vlanPcp();
1481 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId), Criteria.matchVlanPcp(innerPbit)));
1482 } else {
1483 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId)));
1484 }
1485
1486 applyRules(fwd, inner, outer);
1487 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001488}