blob: bb52bce47a683c4f85aa56fd2f5ed2bada13e038 [file] [log] [blame]
Andrea Campanella37f07e42021-02-16 11:24:39 +01001/*
2 * Copyright 2016-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.olt.driver;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalCause;
21import com.google.common.cache.RemovalNotification;
22import com.google.common.collect.ImmutableList;
23import com.google.common.collect.Lists;
24import org.apache.commons.lang3.tuple.ImmutablePair;
25import org.apache.commons.lang3.tuple.Pair;
26import org.onlab.osgi.ServiceDirectory;
27import org.onlab.packet.EthType;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IPv6;
30import org.onlab.packet.IpPrefix;
31import org.onlab.packet.VlanId;
32import org.onlab.util.AbstractAccumulator;
33import org.onlab.util.Accumulator;
34import org.onlab.util.KryoNamespace;
35import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
37import org.onosproject.net.DeviceId;
38import org.onosproject.net.PortNumber;
39import org.onosproject.net.behaviour.NextGroup;
40import org.onosproject.net.behaviour.Pipeliner;
41import org.onosproject.net.behaviour.PipelinerContext;
42import org.onosproject.net.driver.AbstractHandlerBehaviour;
43import org.onosproject.net.driver.Driver;
44import org.onosproject.net.flow.DefaultFlowRule;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.FlowRule;
48import org.onosproject.net.flow.FlowRuleOperations;
49import org.onosproject.net.flow.FlowRuleOperationsContext;
50import org.onosproject.net.flow.FlowRuleService;
51import org.onosproject.net.flow.TrafficSelector;
52import org.onosproject.net.flow.TrafficTreatment;
53import org.onosproject.net.flow.criteria.Criteria;
54import org.onosproject.net.flow.criteria.Criterion;
55import org.onosproject.net.flow.criteria.EthTypeCriterion;
56import org.onosproject.net.flow.criteria.IPCriterion;
57import org.onosproject.net.flow.criteria.IPProtocolCriterion;
58import org.onosproject.net.flow.criteria.PortCriterion;
59import org.onosproject.net.flow.criteria.UdpPortCriterion;
60import org.onosproject.net.flow.criteria.VlanIdCriterion;
61import org.onosproject.net.flow.instructions.Instruction;
62import org.onosproject.net.flow.instructions.Instructions;
63import org.onosproject.net.flow.instructions.L2ModificationInstruction;
64import org.onosproject.net.flowobjective.FilteringObjective;
65import org.onosproject.net.flowobjective.FlowObjectiveStore;
66import org.onosproject.net.flowobjective.ForwardingObjective;
67import org.onosproject.net.flowobjective.NextObjective;
68import org.onosproject.net.flowobjective.Objective;
69import org.onosproject.net.flowobjective.ObjectiveError;
70import org.onosproject.net.group.DefaultGroupBucket;
71import org.onosproject.net.group.DefaultGroupDescription;
72import org.onosproject.net.group.DefaultGroupKey;
73import org.onosproject.net.group.Group;
74import org.onosproject.net.group.GroupBucket;
75import org.onosproject.net.group.GroupBuckets;
76import org.onosproject.net.group.GroupDescription;
77import org.onosproject.net.group.GroupEvent;
78import org.onosproject.net.group.GroupKey;
79import org.onosproject.net.group.GroupListener;
80import org.onosproject.net.group.GroupService;
81import org.onosproject.store.serializers.KryoNamespaces;
82import org.onosproject.store.service.StorageService;
83import org.slf4j.Logger;
84
85import java.util.Arrays;
86import java.util.Collection;
87import java.util.Collections;
88import java.util.List;
89import java.util.Objects;
90import java.util.Optional;
91import java.util.concurrent.ScheduledExecutorService;
92import java.util.Timer;
93import java.util.concurrent.TimeUnit;
94import java.util.stream.Collectors;
95
96import static org.onosproject.core.CoreService.CORE_APP_NAME;
yasin saplib4b8ee12021-06-13 18:25:20 +000097import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_OLT;
98import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_ONU;
99import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
100import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
Andrea Campanella37f07e42021-02-16 11:24:39 +0100101import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
102import static org.onlab.util.Tools.groupedThreads;
103import static org.slf4j.LoggerFactory.getLogger;
104
105/**
106 * Pipeliner for OLT device.
107 */
108
109public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
110
111 private static final Integer QQ_TABLE = 1;
112 private static final int NO_ACTION_PRIORITY = 500;
113 private static final String DOWNSTREAM = "downstream";
114 private static final String UPSTREAM = "upstream";
115 private final Logger log = getLogger(getClass());
116
117 private ServiceDirectory serviceDirectory;
118 private FlowRuleService flowRuleService;
119 private GroupService groupService;
120 private CoreService coreService;
121 private StorageService storageService;
122
123 private DeviceId deviceId;
124 private ApplicationId appId;
125
126
127 protected FlowObjectiveStore flowObjectiveStore;
128
129 private Cache<GroupKey, NextObjective> pendingGroups;
130
131 protected static KryoNamespace appKryo = new KryoNamespace.Builder()
132 .register(KryoNamespaces.API)
133 .register(GroupKey.class)
134 .register(DefaultGroupKey.class)
135 .register(OltPipelineGroup.class)
136 .build("OltPipeline");
137
138 private static final Timer TIMER = new Timer("filterobj-batching");
139 private Accumulator<Pair<FilteringObjective, FlowRule>> accumulator;
140
141 // accumulator executor service
142 private ScheduledExecutorService accumulatorExecutorService
143 = newSingleThreadScheduledExecutor(groupedThreads("OltPipeliner", "acc-%d", log));
144
145 @Override
146 public void init(DeviceId deviceId, PipelinerContext context) {
147 log.debug("Initiate OLT pipeline");
148 this.serviceDirectory = context.directory();
149 this.deviceId = deviceId;
150
151 flowRuleService = serviceDirectory.get(FlowRuleService.class);
152 coreService = serviceDirectory.get(CoreService.class);
153 groupService = serviceDirectory.get(GroupService.class);
154 flowObjectiveStore = context.store();
155 storageService = serviceDirectory.get(StorageService.class);
156
157 appId = coreService.registerApplication(
158 "org.onosproject.driver.OLTPipeline");
159
160 // Init the accumulator, if enabled
161 if (isAccumulatorEnabled()) {
162 log.debug("Building accumulator with maxObjs {}, batchMs {}, idleMs {}",
163 context.accumulatorMaxObjectives(), context.accumulatorMaxBatchMillis(),
164 context.accumulatorMaxIdleMillis());
165 accumulator = new ObjectiveAccumulator(context.accumulatorMaxObjectives(),
166 context.accumulatorMaxBatchMillis(),
167 context.accumulatorMaxIdleMillis());
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100168 } else {
169 log.debug("Olt Pipeliner accumulator is disabled, processing immediately");
Andrea Campanella37f07e42021-02-16 11:24:39 +0100170 }
171
172
173 pendingGroups = CacheBuilder.newBuilder()
174 .expireAfterWrite(20, TimeUnit.SECONDS)
175 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
176 if (notification.getCause() == RemovalCause.EXPIRED) {
177 fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
178 }
179 }).build();
180
181 groupService.addListener(new InnerGroupListener());
182
183 }
184
185 public boolean isAccumulatorEnabled() {
186 Driver driver = super.data().driver();
187 // we cannot determine the property
188 if (driver == null) {
189 return false;
190 }
191 return Boolean.parseBoolean(driver.getProperty(ACCUMULATOR_ENABLED));
192 }
193
194 @Override
195 public void filter(FilteringObjective filter) {
196 Instructions.OutputInstruction output;
197
198 if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
199 output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
200 .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
201 .limit(1)
202 .findFirst().get();
203
204 if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
205 log.warn("OLT can only filter packet to controller");
206 fail(filter, ObjectiveError.UNSUPPORTED);
207 return;
208 }
209 } else {
210 fail(filter, ObjectiveError.BADPARAMS);
211 return;
212 }
213
214 if (filter.key().type() != Criterion.Type.IN_PORT) {
215 fail(filter, ObjectiveError.BADPARAMS);
216 return;
217 }
218
219 EthTypeCriterion ethType = (EthTypeCriterion)
220 filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
221
222 if (ethType == null) {
223 fail(filter, ObjectiveError.BADPARAMS);
224 return;
225 }
226 Optional<Instruction> vlanId = filter.meta().immediate().stream()
227 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
228 && ((L2ModificationInstruction) t).subtype()
229 .equals(L2ModificationInstruction.L2SubType.VLAN_ID))
230 .limit(1)
231 .findFirst();
232
233 Optional<Instruction> vlanPcp = filter.meta().immediate().stream()
234 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
235 && ((L2ModificationInstruction) t).subtype()
236 .equals(L2ModificationInstruction.L2SubType.VLAN_PCP))
237 .limit(1)
238 .findFirst();
239
240 Optional<Instruction> vlanPush = filter.meta().immediate().stream()
241 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
242 && ((L2ModificationInstruction) t).subtype()
243 .equals(L2ModificationInstruction.L2SubType.VLAN_PUSH))
244 .limit(1)
245 .findFirst();
246
247 if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
248
249 if (vlanId.isEmpty() || vlanPush.isEmpty()) {
250 log.warn("Missing EAPOL vlan or vlanPush");
251 fail(filter, ObjectiveError.BADPARAMS);
252 return;
253 }
254 provisionEthTypeBasedFilter(filter, ethType, output,
255 (L2ModificationInstruction) vlanId.get(),
256 (L2ModificationInstruction) vlanPush.get());
257 } else if (ethType.ethType().equals(EthType.EtherType.PPPoED.ethType())) {
258 provisionPPPoED(filter, ethType, vlanId.orElse(null), vlanPcp.orElse(null), output);
259 } else if (ethType.ethType().equals(EthType.EtherType.LLDP.ethType())) {
260 provisionEthTypeBasedFilter(filter, ethType, output, null, null);
261 } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
262 IPProtocolCriterion ipProto = (IPProtocolCriterion)
263 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
264 if (ipProto == null) {
265 log.warn("OLT can only filter IGMP and DHCP");
266 fail(filter, ObjectiveError.UNSUPPORTED);
267 return;
268 }
269 if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
270 provisionIgmp(filter, ethType, ipProto, output,
271 vlanId.orElse(null),
272 vlanPcp.orElse(null));
273 } else if (ipProto.protocol() == IPv4.PROTOCOL_UDP) {
274 UdpPortCriterion udpSrcPort = (UdpPortCriterion)
275 filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
276
277 UdpPortCriterion udpDstPort = (UdpPortCriterion)
278 filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
279
280 if ((udpSrcPort.udpPort().toInt() == 67 && udpDstPort.udpPort().toInt() == 68) ||
281 (udpSrcPort.udpPort().toInt() == 68 && udpDstPort.udpPort().toInt() == 67)) {
282 provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
283 vlanPcp.orElse(null), output);
284 } else {
285 log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
286 fail(filter, ObjectiveError.UNSUPPORTED);
287 }
288 } else {
289 log.warn("Currently supporting only IGMP and DHCP filters for IPv4 packets");
290 fail(filter, ObjectiveError.UNSUPPORTED);
291 }
292 } else if (ethType.ethType().equals(EthType.EtherType.IPV6.ethType())) {
293 IPProtocolCriterion ipProto = (IPProtocolCriterion)
294 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
295 if (ipProto == null) {
296 log.warn("OLT can only filter DHCP");
297 fail(filter, ObjectiveError.UNSUPPORTED);
298 return;
299 }
300 if (ipProto.protocol() == IPv6.PROTOCOL_UDP) {
301 UdpPortCriterion udpSrcPort = (UdpPortCriterion)
302 filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
303
304 UdpPortCriterion udpDstPort = (UdpPortCriterion)
305 filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
306
307 if ((udpSrcPort.udpPort().toInt() == 546 && udpDstPort.udpPort().toInt() == 547) ||
308 (udpSrcPort.udpPort().toInt() == 547 && udpDstPort.udpPort().toInt() == 546)) {
309 provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
310 vlanPcp.orElse(null), output);
311 } else {
312 log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
313 fail(filter, ObjectiveError.UNSUPPORTED);
314 }
315 } else {
316 log.warn("Currently supporting only DHCP filters for IPv6 packets");
317 fail(filter, ObjectiveError.UNSUPPORTED);
318 }
319 } else {
320 log.warn("\nOnly the following are Supported in OLT for filter ->\n"
321 + "ETH TYPE : EAPOL, LLDP and IPV4\n"
322 + "IPV4 TYPE: IGMP and UDP (for DHCP)"
323 + "IPV6 TYPE: UDP (for DHCP)");
324 fail(filter, ObjectiveError.UNSUPPORTED);
325 }
326
327 }
328
329
330 @Override
331 public void forward(ForwardingObjective fwd) {
332 log.debug("Installing forwarding objective {}", fwd);
333 if (checkForMulticast(fwd)) {
334 processMulticastRule(fwd);
335 return;
336 }
337
338 TrafficTreatment treatment = fwd.treatment();
339
340 List<Instruction> instructions = treatment.allInstructions();
341
342 Optional<Instruction> vlanInstruction = instructions.stream()
343 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
344 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
345 L2ModificationInstruction.L2SubType.VLAN_PUSH ||
346 ((L2ModificationInstruction) i).subtype() ==
347 L2ModificationInstruction.L2SubType.VLAN_POP)
348 .findAny();
349
350
351 if (!vlanInstruction.isPresent()) {
352 installNoModificationRules(fwd);
353 } else {
354 L2ModificationInstruction vlanIns =
355 (L2ModificationInstruction) vlanInstruction.get();
356 if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
357 installUpstreamRules(fwd);
358 } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
359 installDownstreamRules(fwd);
360 } else {
361 log.error("Unknown OLT operation: {}", fwd);
362 fail(fwd, ObjectiveError.UNSUPPORTED);
363 return;
364 }
365 }
366
367 pass(fwd);
368
369 }
370
371
372 @Override
373 public void next(NextObjective nextObjective) {
374 if (nextObjective.type() != NextObjective.Type.BROADCAST) {
375 log.error("OLT only supports broadcast groups.");
376 fail(nextObjective, ObjectiveError.BADPARAMS);
377 return;
378 }
379
380 if (nextObjective.next().size() != 1 && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
381 log.error("OLT only supports singleton broadcast groups.");
382 fail(nextObjective, ObjectiveError.BADPARAMS);
383 return;
384 }
385
386 Optional<TrafficTreatment> treatmentOpt = nextObjective.next().stream().findFirst();
387 if (treatmentOpt.isEmpty() && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
388 log.error("Next objective {} does not have a treatment", nextObjective);
389 fail(nextObjective, ObjectiveError.BADPARAMS);
390 return;
391 }
392
393 GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
394
395 pendingGroups.put(key, nextObjective);
396 log.trace("NextObjective Operation {}", nextObjective.op());
397 switch (nextObjective.op()) {
398 case ADD:
399 GroupDescription groupDesc =
400 new DefaultGroupDescription(deviceId,
401 GroupDescription.Type.ALL,
402 new GroupBuckets(
403 Collections.singletonList(
404 buildBucket(treatmentOpt.get()))),
405 key,
406 null,
407 nextObjective.appId());
408 groupService.addGroup(groupDesc);
409 break;
410 case REMOVE:
411 groupService.removeGroup(deviceId, key, nextObjective.appId());
412 break;
413 case ADD_TO_EXISTING:
414 groupService.addBucketsToGroup(deviceId, key,
415 new GroupBuckets(
416 Collections.singletonList(
417 buildBucket(treatmentOpt.get()))),
418 key, nextObjective.appId());
419 break;
420 case REMOVE_FROM_EXISTING:
421 groupService.removeBucketsFromGroup(deviceId, key,
422 new GroupBuckets(
423 Collections.singletonList(
424 buildBucket(treatmentOpt.get()))),
425 key, nextObjective.appId());
426 break;
427 default:
428 log.warn("Unknown next objective operation: {}", nextObjective.op());
429 }
430
431
432 }
433
434 private GroupBucket buildBucket(TrafficTreatment treatment) {
435 return DefaultGroupBucket.createAllGroupBucket(treatment);
436 }
437
438 private void processMulticastRule(ForwardingObjective fwd) {
439 if (fwd.nextId() == null) {
440 log.error("Multicast objective does not have a next id");
441 fail(fwd, ObjectiveError.BADPARAMS);
442 }
443
444 GroupKey key = getGroupForNextObjective(fwd.nextId());
445
446 if (key == null) {
447 log.error("Group for forwarding objective missing: {}", fwd);
448 fail(fwd, ObjectiveError.GROUPMISSING);
449 }
450
451 Group group = groupService.getGroup(deviceId, key);
452 TrafficTreatment treatment =
453 buildTreatment(Instructions.createGroup(group.id()));
454
455 TrafficSelector.Builder selectorBuilder = buildIpv4SelectorForMulticast(fwd);
456
457 FlowRule rule = DefaultFlowRule.builder()
458 .fromApp(fwd.appId())
459 .forDevice(deviceId)
460 .forTable(0)
461 .makePermanent()
462 .withPriority(fwd.priority())
463 .withSelector(selectorBuilder.build())
464 .withTreatment(treatment)
465 .build();
466
467 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
468 switch (fwd.op()) {
469
470 case ADD:
471 builder.add(rule);
472 break;
473 case REMOVE:
474 builder.remove(rule);
475 break;
476 case ADD_TO_EXISTING:
477 case REMOVE_FROM_EXISTING:
478 break;
479 default:
480 log.warn("Unknown forwarding operation: {}", fwd.op());
481 }
482
483 applyFlowRules(ImmutableList.of(fwd), builder);
484
485
486 }
487
488 private TrafficSelector.Builder buildIpv4SelectorForMulticast(ForwardingObjective fwd) {
489 TrafficSelector.Builder builderToUpdate = DefaultTrafficSelector.builder();
490
491 Optional<Criterion> vlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.VLAN_VID);
492 if (vlanIdCriterion.isPresent()) {
493 VlanId assignedVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
494 builderToUpdate.matchVlanId(assignedVlan);
495 }
496
497 Optional<Criterion> innerVlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.INNER_VLAN_VID);
498 if (innerVlanIdCriterion.isPresent()) {
499 VlanId assignedInnerVlan = ((VlanIdCriterion) innerVlanIdCriterion.get()).vlanId();
500 builderToUpdate.matchMetadata(assignedInnerVlan.toShort());
501 }
502
503 Optional<Criterion> ethTypeCriterion = readFromSelector(fwd.selector(), Criterion.Type.ETH_TYPE);
504 if (ethTypeCriterion.isPresent()) {
505 EthType ethType = ((EthTypeCriterion) ethTypeCriterion.get()).ethType();
506 builderToUpdate.matchEthType(ethType.toShort());
507 }
508
509 Optional<Criterion> ipv4DstCriterion = readFromSelector(fwd.selector(), Criterion.Type.IPV4_DST);
510 if (ipv4DstCriterion.isPresent()) {
511 IpPrefix ipv4Dst = ((IPCriterion) ipv4DstCriterion.get()).ip();
512 builderToUpdate.matchIPDst(ipv4Dst);
513 }
514
515 return builderToUpdate;
516 }
517
518 static Optional<Criterion> readFromSelector(TrafficSelector selector, Criterion.Type type) {
519 if (selector == null) {
520 return Optional.empty();
521 }
522 Criterion criterion = selector.getCriterion(type);
523 return (criterion == null)
524 ? Optional.empty() : Optional.of(criterion);
525 }
526
527 private boolean checkForMulticast(ForwardingObjective fwd) {
528
529 IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
530 Criterion.Type.IPV4_DST);
531
532 if (ip == null) {
533 return false;
534 }
535
536 return ip.ip().isMulticast();
537
538 }
539
540 private GroupKey getGroupForNextObjective(Integer nextId) {
541 NextGroup next = flowObjectiveStore.getNextGroup(nextId);
542 return appKryo.deserialize(next.data());
543
544 }
545
546 private void installNoModificationRules(ForwardingObjective fwd) {
547 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
548 Instructions.MetadataInstruction writeMetadata = fetchWriteMetadata(fwd);
yasin saplib4b8ee12021-06-13 18:25:20 +0000549 Instructions.MeterInstruction meter = fwd.treatment().metered();
Andrea Campanella37f07e42021-02-16 11:24:39 +0100550
551 TrafficSelector selector = fwd.selector();
552
553 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
554 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
555 Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
556
557 if (inport == null || output == null || innerVlan == null || outerVlan == null) {
558 // Avoid logging a non-error from lldp, bbdp and eapol core flows.
559 if (!fwd.appId().name().equals(CORE_APP_NAME)) {
560 log.error("Forwarding objective is underspecified: {}", fwd);
561 } else {
562 log.debug("Not installing unsupported core generated flow {}", fwd);
563 }
564 fail(fwd, ObjectiveError.BADPARAMS);
565 return;
566 }
567
568
569 FlowRule.Builder outer = DefaultFlowRule.builder()
570 .fromApp(fwd.appId())
571 .forDevice(deviceId)
572 .makePermanent()
573 .withPriority(fwd.priority())
574 .withSelector(buildSelector(inport, outerVlan))
575 .withTreatment(buildTreatment(output, writeMetadata, meter));
576
577 applyRules(fwd, outer);
578 }
579
580 private void installDownstreamRules(ForwardingObjective fwd) {
581 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
582
583 if (output == null) {
584 return;
585 }
586
587 TrafficSelector selector = fwd.selector();
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100588 Criterion outerVlanCriterion = selector.getCriterion(Criterion.Type.VLAN_VID);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100589 Criterion outerPbit = selector.getCriterion(Criterion.Type.VLAN_PCP);
590 Criterion innerVlanCriterion = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
591 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
592 Criterion dstMac = selector.getCriterion(Criterion.Type.ETH_DST);
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100593 //TODO better check for innerVlan
594 if (outerVlanCriterion == null || inport == null) {
Andrea Campanella37f07e42021-02-16 11:24:39 +0100595 // Avoid logging a non-error from lldp, bbdp and eapol core flows.
596 if (!fwd.appId().name().equals(CORE_APP_NAME)) {
597 log.error("Forwarding objective is underspecified: {}", fwd);
598 } else {
599 log.debug("Not installing unsupported core generated flow {}", fwd);
600 }
601 fail(fwd, ObjectiveError.BADPARAMS);
602 return;
603 }
604
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100605 VlanId outerVlan = ((VlanIdCriterion) outerVlanCriterion).vlanId();
606 //Verify if this is needed.
607 Criterion outerVid = Criteria.matchVlanId(outerVlan);
608
Andrea Campanella37f07e42021-02-16 11:24:39 +0100609 VlanId innerVlan = ((VlanIdCriterion) innerVlanCriterion).vlanId();
610 Criterion innerVid = Criteria.matchVlanId(innerVlan);
611
612 // In the case where the C-tag is the same for all the subscribers,
613 // we add a metadata with the outport in the selector to make the flow unique
614 Criterion innerSelectorMeta = Criteria.matchMetadata(output.port().toLong());
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100615 if (outerVlan.toShort() == VlanId.ANY_VALUE) {
616 Criterion metadata = Criteria.matchMetadata(innerVlan.toShort());
617 TrafficSelector outerSelector = buildSelector(inport, metadata, outerVlanCriterion, outerPbit, dstMac);
618 installDownstreamRulesForOuterAnyVlan(fwd, output, outerSelector, buildSelector(inport, innerVid,
619 innerSelectorMeta));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100620
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100621 } else if (innerVlan.toShort() == VlanId.ANY_VALUE) {
622 TrafficSelector outerSelector = buildSelector(inport, outerVlanCriterion, outerPbit, dstMac);
623
624 Criterion matchedVlanId = Criteria.matchVlanId(VlanId.ANY);
625 installDownstreamRulesForInnerAnyVlan(fwd, output, outerSelector,
626 buildSelector(inport,
627 matchedVlanId,
628 innerSelectorMeta));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100629 } else {
630 // Required to differentiate the same match flows
631 // Please note that S tag and S p bit values will be same for the same service - so conflict flows!
632 // Metadata match criteria solves the conflict issue - but not used by the voltha
633 // Maybe - find a better way to solve the above problem
634 Criterion metadata = Criteria.matchMetadata(innerVlan.toShort());
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100635 TrafficSelector outerSelector = buildSelector(inport, metadata, outerVlanCriterion, outerPbit, dstMac);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100636 installDownstreamRulesForVlans(fwd, output, outerSelector, buildSelector(inport, innerVid,
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100637 innerSelectorMeta));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100638 }
639 }
640
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100641 private void installDownstreamRulesForOuterAnyVlan(ForwardingObjective fwd, Instruction output,
642 TrafficSelector outerSelector, TrafficSelector innerSelector) {
643
644 Instruction onuDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_ONU));
645 Instruction oltDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_OLT));
646
647 List<Pair<Instruction, Instruction>> vlanOps =
648 vlanOps(fwd,
649 L2ModificationInstruction.L2SubType.VLAN_POP);
650
651 if (vlanOps == null || vlanOps.isEmpty()) {
652 return;
653 }
654
655 Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
656
657 TrafficTreatment innerTreatment;
658 VlanId setVlanId = ((L2ModificationInstruction.ModVlanIdInstruction) popAndRewrite.getRight()).vlanId();
659 if (VlanId.NONE.equals(setVlanId)) {
660 innerTreatment = (buildTreatment(popAndRewrite.getLeft(), onuDsMeter,
661 writeMetadataIncludingOnlyTp(fwd), output));
662 } else {
663 innerTreatment = (buildTreatment(popAndRewrite.getRight(),
664 onuDsMeter, writeMetadataIncludingOnlyTp(fwd), output));
665 }
666
667 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
668 fwd.treatment().allInstructions());
669
670 Instruction innerPbitSet = null;
671
672 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
673 innerPbitSet = setVlanPcps.get(0);
674 }
675
676 VlanId remarkInnerVlan = null;
677 Optional<Criterion> vlanIdCriterion = readFromSelector(innerSelector, Criterion.Type.VLAN_VID);
678 if (vlanIdCriterion.isPresent()) {
679 remarkInnerVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
680 }
681
682 Instruction modVlanId = null;
683 if (innerPbitSet != null) {
684 modVlanId = Instructions.modVlanId(remarkInnerVlan);
685 }
686
687 //match: in port (nni), s-tag
688 //action: pop vlan (s-tag), write metadata, go to table 1, meter
689 FlowRule.Builder outer = DefaultFlowRule.builder()
690 .fromApp(fwd.appId())
691 .forDevice(deviceId)
692 .makePermanent()
693 .withPriority(fwd.priority())
694 .withSelector(outerSelector)
695 .withTreatment(buildTreatment(oltDsMeter,
696 fetchWriteMetadata(fwd),
697 Instructions.transition(QQ_TABLE)));
698
699 //match: in port (nni), c-tag
700 //action: immediate: write metadata and pop, meter, output
701 FlowRule.Builder inner = DefaultFlowRule.builder()
702 .fromApp(fwd.appId())
703 .forDevice(deviceId)
704 .forTable(QQ_TABLE)
705 .makePermanent()
706 .withPriority(fwd.priority())
707 .withSelector(innerSelector)
708 .withTreatment(innerTreatment);
709 applyRules(fwd, inner, outer);
710 }
711
Andrea Campanella37f07e42021-02-16 11:24:39 +0100712 private void installDownstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
713 TrafficSelector outerSelector, TrafficSelector innerSelector) {
714
yasin saplib4b8ee12021-06-13 18:25:20 +0000715 Instruction onuDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_ONU));
716 Instruction oltDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_OLT));
717
Andrea Campanella37f07e42021-02-16 11:24:39 +0100718 List<Pair<Instruction, Instruction>> vlanOps =
719 vlanOps(fwd,
720 L2ModificationInstruction.L2SubType.VLAN_POP);
721
722 if (vlanOps == null || vlanOps.isEmpty()) {
723 return;
724 }
725
726 Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
727
728 TrafficTreatment innerTreatment;
729 VlanId setVlanId = ((L2ModificationInstruction.ModVlanIdInstruction) popAndRewrite.getRight()).vlanId();
730 if (VlanId.NONE.equals(setVlanId)) {
yasin saplib4b8ee12021-06-13 18:25:20 +0000731 innerTreatment = (buildTreatment(popAndRewrite.getLeft(), onuDsMeter,
732 writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100733 } else {
734 innerTreatment = (buildTreatment(popAndRewrite.getRight(),
yasin saplib4b8ee12021-06-13 18:25:20 +0000735 onuDsMeter, writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100736 }
737
738 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
739 fwd.treatment().allInstructions());
740
741 Instruction innerPbitSet = null;
742
743 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
744 innerPbitSet = setVlanPcps.get(0);
745 }
746
747 VlanId remarkInnerVlan = null;
748 Optional<Criterion> vlanIdCriterion = readFromSelector(innerSelector, Criterion.Type.VLAN_VID);
749 if (vlanIdCriterion.isPresent()) {
750 remarkInnerVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
751 }
752
753 Instruction modVlanId = null;
754 if (innerPbitSet != null) {
755 modVlanId = Instructions.modVlanId(remarkInnerVlan);
756 }
757
758 //match: in port (nni), s-tag
759 //action: pop vlan (s-tag), write metadata, go to table 1, meter
760 FlowRule.Builder outer = DefaultFlowRule.builder()
761 .fromApp(fwd.appId())
762 .forDevice(deviceId)
763 .makePermanent()
764 .withPriority(fwd.priority())
765 .withSelector(outerSelector)
766 .withTreatment(buildTreatment(popAndRewrite.getLeft(), modVlanId,
yasin saplib4b8ee12021-06-13 18:25:20 +0000767 innerPbitSet, oltDsMeter,
Andrea Campanella37f07e42021-02-16 11:24:39 +0100768 fetchWriteMetadata(fwd),
769 Instructions.transition(QQ_TABLE)));
770
771 //match: in port (nni), c-tag
772 //action: immediate: write metadata and pop, meter, output
773 FlowRule.Builder inner = DefaultFlowRule.builder()
774 .fromApp(fwd.appId())
775 .forDevice(deviceId)
776 .forTable(QQ_TABLE)
777 .makePermanent()
778 .withPriority(fwd.priority())
779 .withSelector(innerSelector)
780 .withTreatment(innerTreatment);
781 applyRules(fwd, inner, outer);
782 }
783
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100784 private void installDownstreamRulesForInnerAnyVlan(ForwardingObjective fwd, Instruction output,
785 TrafficSelector outerSelector, TrafficSelector innerSelector) {
Andrea Campanella37f07e42021-02-16 11:24:39 +0100786
yasin saplib4b8ee12021-06-13 18:25:20 +0000787 Instruction onuDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_ONU));
788 Instruction oltDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_OLT));
789
Andrea Campanella37f07e42021-02-16 11:24:39 +0100790 //match: in port (nni), s-tag
791 //action: immediate: write metadata, pop vlan, meter and go to table 1
792 FlowRule.Builder outer = DefaultFlowRule.builder()
793 .fromApp(fwd.appId())
794 .forDevice(deviceId)
795 .makePermanent()
796 .withPriority(fwd.priority())
797 .withSelector(outerSelector)
yasin saplib4b8ee12021-06-13 18:25:20 +0000798 .withTreatment(buildTreatment(Instructions.popVlan(), oltDsMeter,
Andrea Campanella37f07e42021-02-16 11:24:39 +0100799 fetchWriteMetadata(fwd), Instructions.transition(QQ_TABLE)));
800
801 //match: in port (nni) and s-tag
802 //action: immediate : write metadata, meter and output
803 FlowRule.Builder inner = DefaultFlowRule.builder()
804 .fromApp(fwd.appId())
805 .forDevice(deviceId)
806 .forTable(QQ_TABLE)
807 .makePermanent()
808 .withPriority(fwd.priority())
809 .withSelector(innerSelector)
yasin saplib4b8ee12021-06-13 18:25:20 +0000810 .withTreatment(buildTreatment(onuDsMeter, writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100811
812 applyRules(fwd, inner, outer);
813 }
814
815 private void installUpstreamRules(ForwardingObjective fwd) {
816 List<Pair<Instruction, Instruction>> vlanOps =
817 vlanOps(fwd,
818 L2ModificationInstruction.L2SubType.VLAN_PUSH);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100819 if (vlanOps == null || vlanOps.isEmpty()) {
820 return;
821 }
822
823 Instruction output = fetchOutput(fwd, UPSTREAM);
824
825 if (output == null) {
826 return;
827 }
828
829 Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
830
831 boolean noneValueVlanStatus = checkNoneVlanCriteria(fwd);
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100832 //check if treatment is PUSH or POP
833 boolean popAndPush = checkIfIsPopAndPush(fwd);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100834 boolean anyValueVlanStatus = checkAnyVlanMatchCriteria(fwd);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100835 if (anyValueVlanStatus) {
836 installUpstreamRulesForAnyVlan(fwd, output, outerPair);
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100837 } else if (popAndPush) {
838 Pair<Instruction, Instruction> innerPair = outerPair;
839 outerPair = vlanOps.remove(0);
840 installUpstreamRulesForAnyOuterVlan(fwd, output, innerPair, outerPair, noneValueVlanStatus);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100841 } else {
842 Pair<Instruction, Instruction> innerPair = outerPair;
843 outerPair = vlanOps.remove(0);
844 installUpstreamRulesForVlans(fwd, output, innerPair, outerPair, noneValueVlanStatus);
845 }
846 }
847
848 private void installUpstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
849 Pair<Instruction, Instruction> innerPair,
850 Pair<Instruction, Instruction> outerPair, Boolean noneValueVlanStatus) {
851
yasin saplib4b8ee12021-06-13 18:25:20 +0000852 Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_ONU));
853 Instruction oltUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_OLT));
854
Andrea Campanella37f07e42021-02-16 11:24:39 +0100855 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
856 fwd.treatment().allInstructions());
857
858 Instruction innerPbitSet = null;
859 Instruction outerPbitSet = null;
860
861 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
862 innerPbitSet = setVlanPcps.get(0);
863 outerPbitSet = setVlanPcps.get(1);
864 }
865
866 TrafficTreatment innerTreatment;
867 if (noneValueVlanStatus) {
yasin saplib4b8ee12021-06-13 18:25:20 +0000868 innerTreatment = buildTreatment(innerPair.getLeft(), innerPair.getRight(), onuUsMeter,
Andrea Campanella37f07e42021-02-16 11:24:39 +0100869 fetchWriteMetadata(fwd), innerPbitSet,
870 Instructions.transition(QQ_TABLE));
871 } else {
yasin saplib4b8ee12021-06-13 18:25:20 +0000872 innerTreatment = buildTreatment(innerPair.getRight(), onuUsMeter, fetchWriteMetadata(fwd),
Andrea Campanella37f07e42021-02-16 11:24:39 +0100873 innerPbitSet, Instructions.transition(QQ_TABLE));
874 }
875
876 //match: in port, vlanId (0 or None)
877 //action:
878 //if vlanId None, push & set c-tag go to table 1
879 //if vlanId 0 or any specific vlan, set c-tag, write metadata, meter and go to table 1
880 FlowRule.Builder inner = DefaultFlowRule.builder()
881 .fromApp(fwd.appId())
882 .forDevice(deviceId)
883 .makePermanent()
884 .withPriority(fwd.priority())
885 .withSelector(fwd.selector())
886 .withTreatment(innerTreatment);
887
888 PortCriterion inPort = (PortCriterion)
889 fwd.selector().getCriterion(Criterion.Type.IN_PORT);
890
891 VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
892 innerPair.getRight()).vlanId();
893
894 //match: in port, c-tag
895 //action: immediate: push s-tag, write metadata, meter and output
896 FlowRule.Builder outer = DefaultFlowRule.builder()
897 .fromApp(fwd.appId())
898 .forDevice(deviceId)
899 .forTable(QQ_TABLE)
900 .makePermanent()
901 .withPriority(fwd.priority())
902 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
yasin saplib4b8ee12021-06-13 18:25:20 +0000903 oltUsMeter, writeMetadataIncludingOnlyTp(fwd),
904 outerPbitSet, output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100905
906 if (innerPbitSet != null) {
907 byte innerPbit = ((L2ModificationInstruction.ModVlanPcpInstruction)
908 innerPbitSet).vlanPcp();
909 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId), Criteria.matchVlanPcp(innerPbit)));
910 } else {
911 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId)));
912 }
913
914 applyRules(fwd, inner, outer);
915 }
916
917 private void installUpstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
918 Pair<Instruction, Instruction> outerPair) {
919
920 log.debug("Installing upstream rules for any value vlan");
yasin saplib4b8ee12021-06-13 18:25:20 +0000921 Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_ONU));
922 Instruction oltUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_OLT));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100923
924 //match: in port and any-vlan (coming from OLT app.)
925 //action: write metadata, go to table 1 and meter
926 FlowRule.Builder inner = DefaultFlowRule.builder()
927 .fromApp(fwd.appId())
928 .forDevice(deviceId)
929 .makePermanent()
930 .withPriority(fwd.priority())
931 .withSelector(fwd.selector())
yasin saplib4b8ee12021-06-13 18:25:20 +0000932 .withTreatment(buildTreatment(Instructions.transition(QQ_TABLE), onuUsMeter, fetchWriteMetadata(fwd)));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100933
934 //match: in port and any-vlan (coming from OLT app.)
935 //action: immediate: push:QinQ, vlanId (s-tag), write metadata, meter and output
936 FlowRule.Builder outer = DefaultFlowRule.builder()
937 .fromApp(fwd.appId())
938 .forDevice(deviceId)
939 .forTable(QQ_TABLE)
940 .makePermanent()
941 .withPriority(fwd.priority())
942 .withSelector(fwd.selector())
943 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
yasin saplib4b8ee12021-06-13 18:25:20 +0000944 oltUsMeter, writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100945
946 applyRules(fwd, inner, outer);
947 }
948
949 private boolean checkNoneVlanCriteria(ForwardingObjective fwd) {
950 // Add the VLAN_PUSH treatment if we're matching on VlanId.NONE
951 Criterion vlanMatchCriterion = filterForCriterion(fwd.selector().criteria(), Criterion.Type.VLAN_VID);
952 boolean noneValueVlanStatus = false;
953 if (vlanMatchCriterion != null) {
954 noneValueVlanStatus = ((VlanIdCriterion) vlanMatchCriterion).vlanId().equals(VlanId.NONE);
955 }
956 return noneValueVlanStatus;
957 }
958
959 private boolean checkAnyVlanMatchCriteria(ForwardingObjective fwd) {
960 Criterion anyValueVlanCriterion = fwd.selector().criteria().stream()
961 .filter(c -> c.type().equals(Criterion.Type.VLAN_VID))
962 .filter(vc -> ((VlanIdCriterion) vc).vlanId().toShort() == VlanId.ANY_VALUE)
963 .findAny().orElse(null);
964
965 if (anyValueVlanCriterion == null) {
966 log.debug("Any value vlan match criteria is not found, criteria {}",
967 fwd.selector().criteria());
968 return false;
969 }
970
971 return true;
972 }
973
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +0100974 private boolean checkIfIsPopAndPush(ForwardingObjective fwd) {
975 TrafficTreatment treatment = fwd.treatment();
976 List<Instruction> instructions = treatment.allInstructions();
977 Optional<Instruction> vlanInstructionPush = instructions.stream()
978 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
979 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
980 L2ModificationInstruction.L2SubType.VLAN_PUSH)
981 .findAny();
982 Optional<Instruction> vlanInstructionPop = instructions.stream()
983 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
984 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
985 L2ModificationInstruction.L2SubType.VLAN_POP)
986 .findAny();
987 return vlanInstructionPush.isPresent() && vlanInstructionPop.isPresent();
988 }
989
Andrea Campanella37f07e42021-02-16 11:24:39 +0100990 private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
991 Instruction output = fwd.treatment().allInstructions().stream()
992 .filter(i -> i.type() == Instruction.Type.OUTPUT)
993 .findFirst().orElse(null);
994
995 if (output == null) {
996 log.error("OLT {} rule has no output", direction);
997 fail(fwd, ObjectiveError.BADPARAMS);
998 return null;
999 }
1000 return output;
1001 }
1002
yasin saplib4b8ee12021-06-13 18:25:20 +00001003 private Instruction fetchMeterById(ForwardingObjective fwd, String meterId) {
1004 Optional<Instructions.MeterInstruction> meter = fwd.treatment().meters().stream()
1005 .filter(meterInstruction -> meterInstruction.meterId().toString().equals(meterId)).findAny();
1006 if (meter.isEmpty()) {
1007 log.debug("Meter instruction is not found for the meterId: {} ", meterId);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001008 return null;
1009 }
yasin saplib4b8ee12021-06-13 18:25:20 +00001010 log.debug("Meter instruction is found for the meterId: {} ", meterId);
1011 return meter.get();
Andrea Campanella37f07e42021-02-16 11:24:39 +01001012 }
1013
1014 private Instructions.MetadataInstruction fetchWriteMetadata(ForwardingObjective fwd) {
1015 Instructions.MetadataInstruction writeMetadata = fwd.treatment().writeMetadata();
1016
1017 if (writeMetadata == null) {
1018 log.warn("Write metadata is not found for the forwarding obj");
1019 fail(fwd, ObjectiveError.BADPARAMS);
1020 return null;
1021 }
1022
1023 log.debug("Write metadata is found {}", writeMetadata);
1024 return writeMetadata;
1025 }
1026
1027 private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
1028 L2ModificationInstruction.L2SubType type) {
1029
1030 List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
1031 fwd.treatment().allInstructions(), type);
1032
1033 if (vlanOps == null || vlanOps.isEmpty()) {
1034 String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
1035 ? DOWNSTREAM : UPSTREAM;
1036 log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
1037 fail(fwd, ObjectiveError.BADPARAMS);
1038 return ImmutableList.of();
1039 }
1040 return vlanOps;
1041 }
1042
1043
1044 private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
1045 L2ModificationInstruction.L2SubType type) {
1046
1047 List<Instruction> vlanOperations = findL2Instructions(
1048 type,
1049 instructions);
1050 List<Instruction> vlanSets = findL2Instructions(
1051 L2ModificationInstruction.L2SubType.VLAN_ID,
1052 instructions);
1053
1054 if (vlanOperations.size() != vlanSets.size()) {
1055 return ImmutableList.of();
1056 }
1057
1058 List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
1059
1060 for (int i = 0; i < vlanOperations.size(); i++) {
1061 pairs.add(new ImmutablePair<>(vlanOperations.get(i), vlanSets.get(i)));
1062 }
1063 return pairs;
1064 }
1065
1066 private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
1067 List<Instruction> actions) {
1068 return actions.stream()
1069 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
1070 .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
1071 .collect(Collectors.toList());
1072 }
1073
1074 private void provisionEthTypeBasedFilter(FilteringObjective filter,
1075 EthTypeCriterion ethType,
1076 Instructions.OutputInstruction output,
1077 L2ModificationInstruction vlanId,
1078 L2ModificationInstruction vlanPush) {
1079
1080 Instruction meter = filter.meta().metered();
1081 Instruction writeMetadata = filter.meta().writeMetadata();
1082
1083 TrafficSelector selector = buildSelector(filter.key(), ethType);
1084 TrafficTreatment treatment;
1085
1086 if (vlanPush == null || vlanId == null) {
1087 treatment = buildTreatment(output, meter, writeMetadata);
1088 } else {
1089 // we need to push the vlan because it came untagged (ATT)
1090 treatment = buildTreatment(output, meter, vlanPush, vlanId, writeMetadata);
1091 }
1092
1093 buildAndApplyRule(filter, selector, treatment);
1094
1095 }
1096
1097 private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
1098 IPProtocolCriterion ipProto,
1099 Instructions.OutputInstruction output,
1100 Instruction vlan, Instruction pcp) {
1101
1102 Instruction meter = filter.meta().metered();
1103 Instruction writeMetadata = filter.meta().writeMetadata();
1104
1105 // uniTagMatch
1106 VlanIdCriterion vlanId = (VlanIdCriterion) filterForCriterion(filter.conditions(),
1107 Criterion.Type.VLAN_VID);
1108
1109 TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto, vlanId);
1110 TrafficTreatment treatment = buildTreatment(output, vlan, pcp, meter, writeMetadata);
1111 buildAndApplyRule(filter, selector, treatment);
1112 }
1113
1114 private void provisionDhcp(FilteringObjective filter, EthTypeCriterion ethType,
1115 IPProtocolCriterion ipProto,
1116 UdpPortCriterion udpSrcPort,
1117 UdpPortCriterion udpDstPort,
1118 Instruction vlanIdInstruction,
1119 Instruction vlanPcpInstruction,
1120 Instructions.OutputInstruction output) {
1121
1122 Instruction meter = filter.meta().metered();
1123 Instruction writeMetadata = filter.meta().writeMetadata();
1124
1125 VlanIdCriterion matchVlanId = (VlanIdCriterion)
1126 filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
1127
1128 TrafficSelector selector;
1129 TrafficTreatment treatment;
1130
1131 if (matchVlanId != null) {
1132 log.debug("Building selector with match VLAN, {}", matchVlanId);
1133 // in case of TT upstream the packet comes tagged and the vlan is swapped.
1134 selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort,
1135 udpDstPort, matchVlanId);
1136 treatment = buildTreatment(output, meter, writeMetadata,
1137 vlanIdInstruction, vlanPcpInstruction);
1138 } else {
1139 log.debug("Building selector with no VLAN");
1140 // in case of ATT upstream the packet comes in untagged and we need to push the vlan
1141 selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort, udpDstPort);
1142 treatment = buildTreatment(output, meter, vlanIdInstruction, writeMetadata);
1143 }
1144 //In case of downstream there will be no match on the VLAN, which is null,
1145 // so it will just be output, meter, writeMetadata
1146
1147 buildAndApplyRule(filter, selector, treatment);
1148 }
1149
1150 private void provisionPPPoED(FilteringObjective filter, EthTypeCriterion ethType,
1151 Instruction vlanIdInstruction,
1152 Instruction vlanPcpInstruction,
1153 Instructions.OutputInstruction output) {
1154 Instruction meter = filter.meta().metered();
1155 Instruction writeMetadata = filter.meta().writeMetadata();
1156
1157 VlanIdCriterion matchVlanId = (VlanIdCriterion)
1158 filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
1159
1160 TrafficSelector selector;
1161 TrafficTreatment treatment;
1162
1163 if (matchVlanId != null) {
1164 log.debug("Building pppoed selector with match VLAN {}.", matchVlanId);
1165 } else {
1166 log.debug("Building pppoed selector without match VLAN.");
1167 }
1168
1169 selector = buildSelector(filter.key(), ethType, matchVlanId);
1170 treatment = buildTreatment(output, meter, writeMetadata, vlanIdInstruction, vlanPcpInstruction);
1171 buildAndApplyRule(filter, selector, treatment);
1172 }
1173
1174 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
1175 TrafficTreatment treatment) {
1176 FlowRule rule = DefaultFlowRule.builder()
1177 .fromApp(filter.appId())
1178 .forDevice(deviceId)
1179 .forTable(0)
1180 .makePermanent()
1181 .withSelector(selector)
1182 .withTreatment(treatment)
1183 .withPriority(filter.priority())
1184 .build();
1185
1186 if (accumulator != null) {
1187 if (log.isDebugEnabled()) {
1188 log.debug("Adding pair to batch: {}", Pair.of(filter, rule));
1189 }
1190 accumulator.add(Pair.of(filter, rule));
1191 } else {
1192 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
1193 switch (filter.type()) {
1194 case PERMIT:
1195 opsBuilder.add(rule);
1196 break;
1197 case DENY:
1198 opsBuilder.remove(rule);
1199 break;
1200 default:
1201 log.warn("Unknown filter type : {}", filter.type());
1202 fail(filter, ObjectiveError.UNSUPPORTED);
1203 }
1204 applyFlowRules(ImmutableList.of(filter), opsBuilder);
1205 }
1206 }
1207
1208 private void applyRules(ForwardingObjective fwd, FlowRule.Builder... fwdBuilders) {
1209 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
1210 switch (fwd.op()) {
1211 case ADD:
1212 for (FlowRule.Builder fwdBuilder : fwdBuilders) {
1213 builder.add(fwdBuilder.build());
1214 }
1215 break;
1216 case REMOVE:
1217 for (FlowRule.Builder fwdBuilder : fwdBuilders) {
1218 builder.remove(fwdBuilder.build());
1219 }
1220 break;
1221 case ADD_TO_EXISTING:
1222 break;
1223 case REMOVE_FROM_EXISTING:
1224 break;
1225 default:
1226 log.warn("Unknown forwarding operation: {}", fwd.op());
1227 }
1228
1229 applyFlowRules(ImmutableList.of(fwd), builder);
1230
1231
1232 }
1233
1234 private void applyFlowRules(List<Objective> objectives, FlowRuleOperations.Builder builder) {
1235 flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
1236 @Override
1237 public void onSuccess(FlowRuleOperations ops) {
1238 objectives.forEach(obj -> {
1239 pass(obj);
1240 });
1241 }
1242
1243 @Override
1244 public void onError(FlowRuleOperations ops) {
1245 objectives.forEach(obj -> {
1246 fail(obj, ObjectiveError.FLOWINSTALLATIONFAILED);
1247 });
1248
1249 }
1250 }));
1251 }
1252
1253 // Builds the batch using the accumulated flow rules
1254 private void sendFilters(List<Pair<FilteringObjective, FlowRule>> pairs) {
1255 FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07001256 if (log.isDebugEnabled()) {
1257 log.debug("Sending batch of {} filter-objs", pairs.size());
1258 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001259 List<Objective> filterObjs = Lists.newArrayList();
1260 // Iterates over all accumulated flow rules and then build an unique batch
1261 pairs.forEach(pair -> {
1262 FilteringObjective filter = pair.getLeft();
1263 FlowRule rule = pair.getRight();
1264 switch (filter.type()) {
1265 case PERMIT:
1266 flowOpsBuilder.add(rule);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07001267 if (log.isTraceEnabled()) {
1268 log.trace("Applying add filter-obj {} to device: {}", filter.id(), deviceId);
1269 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001270 filterObjs.add(filter);
1271 break;
1272 case DENY:
1273 flowOpsBuilder.remove(rule);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07001274 if (log.isTraceEnabled()) {
1275 log.trace("Deleting flow rule {} from device: {}", rule, deviceId);
1276 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001277 filterObjs.add(filter);
1278 break;
1279 default:
1280 fail(filter, ObjectiveError.UNKNOWN);
1281 log.warn("Unknown forwarding type {}", filter.type());
1282 }
1283 });
1284 if (log.isDebugEnabled()) {
1285 log.debug("Applying batch {}", flowOpsBuilder.build());
1286 }
1287 // Finally applies the operations
1288 applyFlowRules(filterObjs, flowOpsBuilder);
1289 }
1290
1291 private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
1292 return criteria.stream()
1293 .filter(c -> c.type().equals(type))
1294 .limit(1)
1295 .findFirst().orElse(null);
1296 }
1297
1298 private TrafficSelector buildSelector(Criterion... criteria) {
1299
1300 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
1301
1302 Arrays.stream(criteria).filter(Objects::nonNull).forEach(sBuilder::add);
1303
1304 return sBuilder.build();
1305 }
1306
1307 private TrafficTreatment buildTreatment(Instruction... instructions) {
1308
1309 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
1310
1311 Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add);
1312
1313 return tBuilder.build();
1314 }
1315
1316 private Instruction writeMetadataIncludingOnlyTp(ForwardingObjective fwd) {
1317
1318 return Instructions.writeMetadata(
1319 fetchWriteMetadata(fwd).metadata() & 0xFFFF00000000L, 0L);
1320 }
1321
1322 private void fail(Objective obj, ObjectiveError error) {
1323 obj.context().ifPresent(context -> context.onError(obj, error));
1324 }
1325
1326 private void pass(Objective obj) {
1327 obj.context().ifPresent(context -> context.onSuccess(obj));
1328 }
1329
1330
1331 private class InnerGroupListener implements GroupListener {
1332 @Override
1333 public void event(GroupEvent event) {
1334 GroupKey key = event.subject().appCookie();
1335 NextObjective obj = pendingGroups.getIfPresent(key);
1336 if (obj == null) {
Andrea Campanella438e1ad2021-03-26 11:41:16 +01001337 if (log.isTraceEnabled()) {
1338 log.trace("No pending group for {}, moving on", key);
1339 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001340 return;
1341 }
Andrea Campanella438e1ad2021-03-26 11:41:16 +01001342 log.debug("Event {} for group {}, handling pending" +
Andrea Campanella37f07e42021-02-16 11:24:39 +01001343 "NextGroup {}", event.type(), key, obj.id());
1344 if (event.type() == GroupEvent.Type.GROUP_ADDED ||
1345 event.type() == GroupEvent.Type.GROUP_UPDATED) {
1346 flowObjectiveStore.putNextGroup(obj.id(), new OltPipelineGroup(key));
Andrea Campanella37f07e42021-02-16 11:24:39 +01001347 pendingGroups.invalidate(key);
Esin Karamand106e522021-03-15 14:08:48 +00001348 pass(obj);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001349 } else if (event.type() == GroupEvent.Type.GROUP_REMOVED) {
1350 flowObjectiveStore.removeNextGroup(obj.id());
Andrea Campanella37f07e42021-02-16 11:24:39 +01001351 pendingGroups.invalidate(key);
Esin Karamand106e522021-03-15 14:08:48 +00001352 pass(obj);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001353 }
1354 }
1355 }
1356
1357 private static class OltPipelineGroup implements NextGroup {
1358
1359 private final GroupKey key;
1360
1361 public OltPipelineGroup(GroupKey key) {
1362 this.key = key;
1363 }
1364
1365 public GroupKey key() {
1366 return key;
1367 }
1368
1369 @Override
1370 public byte[] data() {
1371 return appKryo.serialize(key);
1372 }
1373
1374 }
1375
1376 @Override
1377 public List<String> getNextMappings(NextGroup nextGroup) {
1378 // TODO Implementation deferred to vendor
1379 return null;
1380 }
1381
1382 // Flow rules accumulator for reducing the number of transactions required to the devices.
1383 private final class ObjectiveAccumulator
1384 extends AbstractAccumulator<Pair<FilteringObjective, FlowRule>> {
1385
1386 ObjectiveAccumulator(int maxFilter, int maxBatchMS, int maxIdleMS) {
1387 super(TIMER, maxFilter, maxBatchMS, maxIdleMS);
1388 }
1389
1390 @Override
1391 public void processItems(List<Pair<FilteringObjective, FlowRule>> pairs) {
1392 // Triggers creation of a batch using the list of flowrules generated from objs.
1393 accumulatorExecutorService.execute(new FlowRulesBuilderTask(pairs));
1394 }
1395 }
1396
1397 // Task for building batch of flow rules in a separate thread.
1398 private final class FlowRulesBuilderTask implements Runnable {
1399 private final List<Pair<FilteringObjective, FlowRule>> pairs;
1400
1401 FlowRulesBuilderTask(List<Pair<FilteringObjective, FlowRule>> pairs) {
1402 this.pairs = pairs;
1403 }
1404
1405 @Override
1406 public void run() {
1407 try {
1408 sendFilters(pairs);
1409 } catch (Exception e) {
1410 log.warn("Unable to send objectives", e);
1411 }
1412 }
1413 }
Maria Carmela Cascino067ee4d2021-11-02 13:14:43 +01001414
1415 private void installUpstreamRulesForAnyOuterVlan(ForwardingObjective fwd, Instruction output,
1416 Pair<Instruction, Instruction> innerPair,
1417 Pair<Instruction, Instruction> outerPair, Boolean noneValueVlanStatus) {
1418
1419 Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_ONU));
1420 Instruction oltUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_OLT));
1421
1422 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
1423 fwd.treatment().allInstructions());
1424
1425 Instruction innerPbitSet = null;
1426 Instruction outerPbitSet = null;
1427
1428 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
1429 innerPbitSet = setVlanPcps.get(0);
1430 outerPbitSet = setVlanPcps.get(1);
1431 }
1432
1433 TrafficTreatment innerTreatment;
1434 if (noneValueVlanStatus) {
1435 innerTreatment = buildTreatment(innerPair.getLeft(), innerPair.getRight(), onuUsMeter,
1436 fetchWriteMetadata(fwd), innerPbitSet,
1437 Instructions.transition(QQ_TABLE));
1438 } else {
1439 innerTreatment = buildTreatment(innerPair.getRight(), onuUsMeter, fetchWriteMetadata(fwd),
1440 innerPbitSet, Instructions.transition(QQ_TABLE));
1441 }
1442
1443 //match: in port, vlanId (0 or None)
1444 //action:
1445 //if vlanId None, push & set c-tag go to table 1
1446 //if vlanId 0 or any specific vlan, set c-tag, write metadata, meter and go to table 1
1447 FlowRule.Builder inner = DefaultFlowRule.builder()
1448 .fromApp(fwd.appId())
1449 .forDevice(deviceId)
1450 .makePermanent()
1451 .withPriority(fwd.priority())
1452 .withSelector(fwd.selector())
1453 .withTreatment(innerTreatment);
1454
1455 PortCriterion inPort = (PortCriterion)
1456 fwd.selector().getCriterion(Criterion.Type.IN_PORT);
1457
1458 VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
1459 innerPair.getRight()).vlanId();
1460
1461 //match: in port, c-tag
1462 //action: immediate: push s-tag, write metadata, meter and output
1463 FlowRule.Builder outer = DefaultFlowRule.builder()
1464 .fromApp(fwd.appId())
1465 .forDevice(deviceId)
1466 .forTable(QQ_TABLE)
1467 .makePermanent()
1468 .withPriority(fwd.priority())
1469 .withTreatment(buildTreatment(oltUsMeter, writeMetadataIncludingOnlyTp(fwd),
1470 outerPbitSet, output));
1471
1472 if (innerPbitSet != null) {
1473 byte innerPbit = ((L2ModificationInstruction.ModVlanPcpInstruction)
1474 innerPbitSet).vlanPcp();
1475 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId), Criteria.matchVlanPcp(innerPbit)));
1476 } else {
1477 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId)));
1478 }
1479
1480 applyRules(fwd, inner, outer);
1481 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001482}