blob: 29d234bc051ab76f2d5effae24bb655936517f3c [file] [log] [blame]
Andrea Campanella37f07e42021-02-16 11:24:39 +01001/*
2 * Copyright 2016-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.olt.driver;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalCause;
21import com.google.common.cache.RemovalNotification;
22import com.google.common.collect.ImmutableList;
23import com.google.common.collect.Lists;
24import org.apache.commons.lang3.tuple.ImmutablePair;
25import org.apache.commons.lang3.tuple.Pair;
26import org.onlab.osgi.ServiceDirectory;
27import org.onlab.packet.EthType;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IPv6;
30import org.onlab.packet.IpPrefix;
31import org.onlab.packet.VlanId;
32import org.onlab.util.AbstractAccumulator;
33import org.onlab.util.Accumulator;
34import org.onlab.util.KryoNamespace;
35import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
37import org.onosproject.net.DeviceId;
38import org.onosproject.net.PortNumber;
39import org.onosproject.net.behaviour.NextGroup;
40import org.onosproject.net.behaviour.Pipeliner;
41import org.onosproject.net.behaviour.PipelinerContext;
42import org.onosproject.net.driver.AbstractHandlerBehaviour;
43import org.onosproject.net.driver.Driver;
44import org.onosproject.net.flow.DefaultFlowRule;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.FlowRule;
48import org.onosproject.net.flow.FlowRuleOperations;
49import org.onosproject.net.flow.FlowRuleOperationsContext;
50import org.onosproject.net.flow.FlowRuleService;
51import org.onosproject.net.flow.TrafficSelector;
52import org.onosproject.net.flow.TrafficTreatment;
53import org.onosproject.net.flow.criteria.Criteria;
54import org.onosproject.net.flow.criteria.Criterion;
55import org.onosproject.net.flow.criteria.EthTypeCriterion;
56import org.onosproject.net.flow.criteria.IPCriterion;
57import org.onosproject.net.flow.criteria.IPProtocolCriterion;
58import org.onosproject.net.flow.criteria.PortCriterion;
59import org.onosproject.net.flow.criteria.UdpPortCriterion;
60import org.onosproject.net.flow.criteria.VlanIdCriterion;
61import org.onosproject.net.flow.instructions.Instruction;
62import org.onosproject.net.flow.instructions.Instructions;
63import org.onosproject.net.flow.instructions.L2ModificationInstruction;
64import org.onosproject.net.flowobjective.FilteringObjective;
65import org.onosproject.net.flowobjective.FlowObjectiveStore;
66import org.onosproject.net.flowobjective.ForwardingObjective;
67import org.onosproject.net.flowobjective.NextObjective;
68import org.onosproject.net.flowobjective.Objective;
69import org.onosproject.net.flowobjective.ObjectiveError;
70import org.onosproject.net.group.DefaultGroupBucket;
71import org.onosproject.net.group.DefaultGroupDescription;
72import org.onosproject.net.group.DefaultGroupKey;
73import org.onosproject.net.group.Group;
74import org.onosproject.net.group.GroupBucket;
75import org.onosproject.net.group.GroupBuckets;
76import org.onosproject.net.group.GroupDescription;
77import org.onosproject.net.group.GroupEvent;
78import org.onosproject.net.group.GroupKey;
79import org.onosproject.net.group.GroupListener;
80import org.onosproject.net.group.GroupService;
81import org.onosproject.store.serializers.KryoNamespaces;
82import org.onosproject.store.service.StorageService;
83import org.slf4j.Logger;
84
85import java.util.Arrays;
86import java.util.Collection;
87import java.util.Collections;
88import java.util.List;
89import java.util.Objects;
90import java.util.Optional;
91import java.util.concurrent.ScheduledExecutorService;
92import java.util.Timer;
93import java.util.concurrent.TimeUnit;
94import java.util.stream.Collectors;
95
96import static org.onosproject.core.CoreService.CORE_APP_NAME;
yasin saplib4b8ee12021-06-13 18:25:20 +000097import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_OLT;
98import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_ONU;
99import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
100import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
Andrea Campanella37f07e42021-02-16 11:24:39 +0100101import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
102import static org.onlab.util.Tools.groupedThreads;
103import static org.slf4j.LoggerFactory.getLogger;
104
105/**
106 * Pipeliner for OLT device.
107 */
108
109public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
110
111 private static final Integer QQ_TABLE = 1;
112 private static final int NO_ACTION_PRIORITY = 500;
113 private static final String DOWNSTREAM = "downstream";
114 private static final String UPSTREAM = "upstream";
115 private final Logger log = getLogger(getClass());
116
117 private ServiceDirectory serviceDirectory;
118 private FlowRuleService flowRuleService;
119 private GroupService groupService;
120 private CoreService coreService;
121 private StorageService storageService;
122
123 private DeviceId deviceId;
124 private ApplicationId appId;
125
126
127 protected FlowObjectiveStore flowObjectiveStore;
128
129 private Cache<GroupKey, NextObjective> pendingGroups;
130
131 protected static KryoNamespace appKryo = new KryoNamespace.Builder()
132 .register(KryoNamespaces.API)
133 .register(GroupKey.class)
134 .register(DefaultGroupKey.class)
135 .register(OltPipelineGroup.class)
136 .build("OltPipeline");
137
138 private static final Timer TIMER = new Timer("filterobj-batching");
139 private Accumulator<Pair<FilteringObjective, FlowRule>> accumulator;
140
141 // accumulator executor service
142 private ScheduledExecutorService accumulatorExecutorService
143 = newSingleThreadScheduledExecutor(groupedThreads("OltPipeliner", "acc-%d", log));
144
145 @Override
146 public void init(DeviceId deviceId, PipelinerContext context) {
147 log.debug("Initiate OLT pipeline");
148 this.serviceDirectory = context.directory();
149 this.deviceId = deviceId;
150
151 flowRuleService = serviceDirectory.get(FlowRuleService.class);
152 coreService = serviceDirectory.get(CoreService.class);
153 groupService = serviceDirectory.get(GroupService.class);
154 flowObjectiveStore = context.store();
155 storageService = serviceDirectory.get(StorageService.class);
156
157 appId = coreService.registerApplication(
158 "org.onosproject.driver.OLTPipeline");
159
160 // Init the accumulator, if enabled
161 if (isAccumulatorEnabled()) {
162 log.debug("Building accumulator with maxObjs {}, batchMs {}, idleMs {}",
163 context.accumulatorMaxObjectives(), context.accumulatorMaxBatchMillis(),
164 context.accumulatorMaxIdleMillis());
165 accumulator = new ObjectiveAccumulator(context.accumulatorMaxObjectives(),
166 context.accumulatorMaxBatchMillis(),
167 context.accumulatorMaxIdleMillis());
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100168 } else {
169 log.debug("Olt Pipeliner accumulator is disabled, processing immediately");
Andrea Campanella37f07e42021-02-16 11:24:39 +0100170 }
171
172
173 pendingGroups = CacheBuilder.newBuilder()
174 .expireAfterWrite(20, TimeUnit.SECONDS)
175 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
176 if (notification.getCause() == RemovalCause.EXPIRED) {
177 fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
178 }
179 }).build();
180
181 groupService.addListener(new InnerGroupListener());
182
183 }
184
185 public boolean isAccumulatorEnabled() {
186 Driver driver = super.data().driver();
187 // we cannot determine the property
188 if (driver == null) {
189 return false;
190 }
191 return Boolean.parseBoolean(driver.getProperty(ACCUMULATOR_ENABLED));
192 }
193
194 @Override
195 public void filter(FilteringObjective filter) {
196 Instructions.OutputInstruction output;
197
198 if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
199 output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
200 .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
201 .limit(1)
202 .findFirst().get();
203
204 if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
205 log.warn("OLT can only filter packet to controller");
206 fail(filter, ObjectiveError.UNSUPPORTED);
207 return;
208 }
209 } else {
210 fail(filter, ObjectiveError.BADPARAMS);
211 return;
212 }
213
214 if (filter.key().type() != Criterion.Type.IN_PORT) {
215 fail(filter, ObjectiveError.BADPARAMS);
216 return;
217 }
218
219 EthTypeCriterion ethType = (EthTypeCriterion)
220 filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
221
222 if (ethType == null) {
223 fail(filter, ObjectiveError.BADPARAMS);
224 return;
225 }
226 Optional<Instruction> vlanId = filter.meta().immediate().stream()
227 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
228 && ((L2ModificationInstruction) t).subtype()
229 .equals(L2ModificationInstruction.L2SubType.VLAN_ID))
230 .limit(1)
231 .findFirst();
232
233 Optional<Instruction> vlanPcp = filter.meta().immediate().stream()
234 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
235 && ((L2ModificationInstruction) t).subtype()
236 .equals(L2ModificationInstruction.L2SubType.VLAN_PCP))
237 .limit(1)
238 .findFirst();
239
240 Optional<Instruction> vlanPush = filter.meta().immediate().stream()
241 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
242 && ((L2ModificationInstruction) t).subtype()
243 .equals(L2ModificationInstruction.L2SubType.VLAN_PUSH))
244 .limit(1)
245 .findFirst();
246
247 if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
248
249 if (vlanId.isEmpty() || vlanPush.isEmpty()) {
250 log.warn("Missing EAPOL vlan or vlanPush");
251 fail(filter, ObjectiveError.BADPARAMS);
252 return;
253 }
254 provisionEthTypeBasedFilter(filter, ethType, output,
255 (L2ModificationInstruction) vlanId.get(),
256 (L2ModificationInstruction) vlanPush.get());
257 } else if (ethType.ethType().equals(EthType.EtherType.PPPoED.ethType())) {
258 provisionPPPoED(filter, ethType, vlanId.orElse(null), vlanPcp.orElse(null), output);
259 } else if (ethType.ethType().equals(EthType.EtherType.LLDP.ethType())) {
260 provisionEthTypeBasedFilter(filter, ethType, output, null, null);
261 } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
262 IPProtocolCriterion ipProto = (IPProtocolCriterion)
263 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
264 if (ipProto == null) {
265 log.warn("OLT can only filter IGMP and DHCP");
266 fail(filter, ObjectiveError.UNSUPPORTED);
267 return;
268 }
269 if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
270 provisionIgmp(filter, ethType, ipProto, output,
271 vlanId.orElse(null),
272 vlanPcp.orElse(null));
273 } else if (ipProto.protocol() == IPv4.PROTOCOL_UDP) {
274 UdpPortCriterion udpSrcPort = (UdpPortCriterion)
275 filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
276
277 UdpPortCriterion udpDstPort = (UdpPortCriterion)
278 filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
279
280 if ((udpSrcPort.udpPort().toInt() == 67 && udpDstPort.udpPort().toInt() == 68) ||
281 (udpSrcPort.udpPort().toInt() == 68 && udpDstPort.udpPort().toInt() == 67)) {
282 provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
283 vlanPcp.orElse(null), output);
284 } else {
285 log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
286 fail(filter, ObjectiveError.UNSUPPORTED);
287 }
288 } else {
289 log.warn("Currently supporting only IGMP and DHCP filters for IPv4 packets");
290 fail(filter, ObjectiveError.UNSUPPORTED);
291 }
292 } else if (ethType.ethType().equals(EthType.EtherType.IPV6.ethType())) {
293 IPProtocolCriterion ipProto = (IPProtocolCriterion)
294 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
295 if (ipProto == null) {
296 log.warn("OLT can only filter DHCP");
297 fail(filter, ObjectiveError.UNSUPPORTED);
298 return;
299 }
300 if (ipProto.protocol() == IPv6.PROTOCOL_UDP) {
301 UdpPortCriterion udpSrcPort = (UdpPortCriterion)
302 filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
303
304 UdpPortCriterion udpDstPort = (UdpPortCriterion)
305 filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
306
307 if ((udpSrcPort.udpPort().toInt() == 546 && udpDstPort.udpPort().toInt() == 547) ||
308 (udpSrcPort.udpPort().toInt() == 547 && udpDstPort.udpPort().toInt() == 546)) {
309 provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
310 vlanPcp.orElse(null), output);
311 } else {
312 log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
313 fail(filter, ObjectiveError.UNSUPPORTED);
314 }
315 } else {
316 log.warn("Currently supporting only DHCP filters for IPv6 packets");
317 fail(filter, ObjectiveError.UNSUPPORTED);
318 }
319 } else {
320 log.warn("\nOnly the following are Supported in OLT for filter ->\n"
321 + "ETH TYPE : EAPOL, LLDP and IPV4\n"
322 + "IPV4 TYPE: IGMP and UDP (for DHCP)"
323 + "IPV6 TYPE: UDP (for DHCP)");
324 fail(filter, ObjectiveError.UNSUPPORTED);
325 }
326
327 }
328
329
330 @Override
331 public void forward(ForwardingObjective fwd) {
332 log.debug("Installing forwarding objective {}", fwd);
333 if (checkForMulticast(fwd)) {
334 processMulticastRule(fwd);
335 return;
336 }
337
338 TrafficTreatment treatment = fwd.treatment();
339
340 List<Instruction> instructions = treatment.allInstructions();
341
342 Optional<Instruction> vlanInstruction = instructions.stream()
343 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
344 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
345 L2ModificationInstruction.L2SubType.VLAN_PUSH ||
346 ((L2ModificationInstruction) i).subtype() ==
347 L2ModificationInstruction.L2SubType.VLAN_POP)
348 .findAny();
349
350
351 if (!vlanInstruction.isPresent()) {
352 installNoModificationRules(fwd);
353 } else {
354 L2ModificationInstruction vlanIns =
355 (L2ModificationInstruction) vlanInstruction.get();
356 if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
357 installUpstreamRules(fwd);
358 } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
359 installDownstreamRules(fwd);
360 } else {
361 log.error("Unknown OLT operation: {}", fwd);
362 fail(fwd, ObjectiveError.UNSUPPORTED);
363 return;
364 }
365 }
366
367 pass(fwd);
368
369 }
370
371
372 @Override
373 public void next(NextObjective nextObjective) {
374 if (nextObjective.type() != NextObjective.Type.BROADCAST) {
375 log.error("OLT only supports broadcast groups.");
376 fail(nextObjective, ObjectiveError.BADPARAMS);
377 return;
378 }
379
380 if (nextObjective.next().size() != 1 && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
381 log.error("OLT only supports singleton broadcast groups.");
382 fail(nextObjective, ObjectiveError.BADPARAMS);
383 return;
384 }
385
386 Optional<TrafficTreatment> treatmentOpt = nextObjective.next().stream().findFirst();
387 if (treatmentOpt.isEmpty() && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
388 log.error("Next objective {} does not have a treatment", nextObjective);
389 fail(nextObjective, ObjectiveError.BADPARAMS);
390 return;
391 }
392
393 GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
394
395 pendingGroups.put(key, nextObjective);
396 log.trace("NextObjective Operation {}", nextObjective.op());
397 switch (nextObjective.op()) {
398 case ADD:
399 GroupDescription groupDesc =
400 new DefaultGroupDescription(deviceId,
401 GroupDescription.Type.ALL,
402 new GroupBuckets(
403 Collections.singletonList(
404 buildBucket(treatmentOpt.get()))),
405 key,
406 null,
407 nextObjective.appId());
408 groupService.addGroup(groupDesc);
409 break;
410 case REMOVE:
411 groupService.removeGroup(deviceId, key, nextObjective.appId());
412 break;
413 case ADD_TO_EXISTING:
414 groupService.addBucketsToGroup(deviceId, key,
415 new GroupBuckets(
416 Collections.singletonList(
417 buildBucket(treatmentOpt.get()))),
418 key, nextObjective.appId());
419 break;
420 case REMOVE_FROM_EXISTING:
421 groupService.removeBucketsFromGroup(deviceId, key,
422 new GroupBuckets(
423 Collections.singletonList(
424 buildBucket(treatmentOpt.get()))),
425 key, nextObjective.appId());
426 break;
427 default:
428 log.warn("Unknown next objective operation: {}", nextObjective.op());
429 }
430
431
432 }
433
434 private GroupBucket buildBucket(TrafficTreatment treatment) {
435 return DefaultGroupBucket.createAllGroupBucket(treatment);
436 }
437
438 private void processMulticastRule(ForwardingObjective fwd) {
439 if (fwd.nextId() == null) {
440 log.error("Multicast objective does not have a next id");
441 fail(fwd, ObjectiveError.BADPARAMS);
442 }
443
444 GroupKey key = getGroupForNextObjective(fwd.nextId());
445
446 if (key == null) {
447 log.error("Group for forwarding objective missing: {}", fwd);
448 fail(fwd, ObjectiveError.GROUPMISSING);
449 }
450
451 Group group = groupService.getGroup(deviceId, key);
452 TrafficTreatment treatment =
453 buildTreatment(Instructions.createGroup(group.id()));
454
455 TrafficSelector.Builder selectorBuilder = buildIpv4SelectorForMulticast(fwd);
456
457 FlowRule rule = DefaultFlowRule.builder()
458 .fromApp(fwd.appId())
459 .forDevice(deviceId)
460 .forTable(0)
461 .makePermanent()
462 .withPriority(fwd.priority())
463 .withSelector(selectorBuilder.build())
464 .withTreatment(treatment)
465 .build();
466
467 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
468 switch (fwd.op()) {
469
470 case ADD:
471 builder.add(rule);
472 break;
473 case REMOVE:
474 builder.remove(rule);
475 break;
476 case ADD_TO_EXISTING:
477 case REMOVE_FROM_EXISTING:
478 break;
479 default:
480 log.warn("Unknown forwarding operation: {}", fwd.op());
481 }
482
483 applyFlowRules(ImmutableList.of(fwd), builder);
484
485
486 }
487
488 private TrafficSelector.Builder buildIpv4SelectorForMulticast(ForwardingObjective fwd) {
489 TrafficSelector.Builder builderToUpdate = DefaultTrafficSelector.builder();
490
491 Optional<Criterion> vlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.VLAN_VID);
492 if (vlanIdCriterion.isPresent()) {
493 VlanId assignedVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
494 builderToUpdate.matchVlanId(assignedVlan);
495 }
496
497 Optional<Criterion> innerVlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.INNER_VLAN_VID);
498 if (innerVlanIdCriterion.isPresent()) {
499 VlanId assignedInnerVlan = ((VlanIdCriterion) innerVlanIdCriterion.get()).vlanId();
500 builderToUpdate.matchMetadata(assignedInnerVlan.toShort());
501 }
502
503 Optional<Criterion> ethTypeCriterion = readFromSelector(fwd.selector(), Criterion.Type.ETH_TYPE);
504 if (ethTypeCriterion.isPresent()) {
505 EthType ethType = ((EthTypeCriterion) ethTypeCriterion.get()).ethType();
506 builderToUpdate.matchEthType(ethType.toShort());
507 }
508
509 Optional<Criterion> ipv4DstCriterion = readFromSelector(fwd.selector(), Criterion.Type.IPV4_DST);
510 if (ipv4DstCriterion.isPresent()) {
511 IpPrefix ipv4Dst = ((IPCriterion) ipv4DstCriterion.get()).ip();
512 builderToUpdate.matchIPDst(ipv4Dst);
513 }
514
515 return builderToUpdate;
516 }
517
518 static Optional<Criterion> readFromSelector(TrafficSelector selector, Criterion.Type type) {
519 if (selector == null) {
520 return Optional.empty();
521 }
522 Criterion criterion = selector.getCriterion(type);
523 return (criterion == null)
524 ? Optional.empty() : Optional.of(criterion);
525 }
526
527 private boolean checkForMulticast(ForwardingObjective fwd) {
528
529 IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
530 Criterion.Type.IPV4_DST);
531
532 if (ip == null) {
533 return false;
534 }
535
536 return ip.ip().isMulticast();
537
538 }
539
540 private GroupKey getGroupForNextObjective(Integer nextId) {
541 NextGroup next = flowObjectiveStore.getNextGroup(nextId);
542 return appKryo.deserialize(next.data());
543
544 }
545
546 private void installNoModificationRules(ForwardingObjective fwd) {
547 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
548 Instructions.MetadataInstruction writeMetadata = fetchWriteMetadata(fwd);
yasin saplib4b8ee12021-06-13 18:25:20 +0000549 Instructions.MeterInstruction meter = fwd.treatment().metered();
Andrea Campanella37f07e42021-02-16 11:24:39 +0100550
551 TrafficSelector selector = fwd.selector();
552
553 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
554 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
555 Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
556
557 if (inport == null || output == null || innerVlan == null || outerVlan == null) {
558 // Avoid logging a non-error from lldp, bbdp and eapol core flows.
559 if (!fwd.appId().name().equals(CORE_APP_NAME)) {
560 log.error("Forwarding objective is underspecified: {}", fwd);
561 } else {
562 log.debug("Not installing unsupported core generated flow {}", fwd);
563 }
564 fail(fwd, ObjectiveError.BADPARAMS);
565 return;
566 }
567
568
569 FlowRule.Builder outer = DefaultFlowRule.builder()
570 .fromApp(fwd.appId())
571 .forDevice(deviceId)
572 .makePermanent()
573 .withPriority(fwd.priority())
574 .withSelector(buildSelector(inport, outerVlan))
575 .withTreatment(buildTreatment(output, writeMetadata, meter));
576
577 applyRules(fwd, outer);
578 }
579
580 private void installDownstreamRules(ForwardingObjective fwd) {
581 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
582
583 if (output == null) {
584 return;
585 }
586
587 TrafficSelector selector = fwd.selector();
588
589 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
590 Criterion outerPbit = selector.getCriterion(Criterion.Type.VLAN_PCP);
591 Criterion innerVlanCriterion = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
592 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
593 Criterion dstMac = selector.getCriterion(Criterion.Type.ETH_DST);
594
595 if (outerVlan == null || innerVlanCriterion == null || inport == null) {
596 // Avoid logging a non-error from lldp, bbdp and eapol core flows.
597 if (!fwd.appId().name().equals(CORE_APP_NAME)) {
598 log.error("Forwarding objective is underspecified: {}", fwd);
599 } else {
600 log.debug("Not installing unsupported core generated flow {}", fwd);
601 }
602 fail(fwd, ObjectiveError.BADPARAMS);
603 return;
604 }
605
606 VlanId innerVlan = ((VlanIdCriterion) innerVlanCriterion).vlanId();
607 Criterion innerVid = Criteria.matchVlanId(innerVlan);
608
609 // In the case where the C-tag is the same for all the subscribers,
610 // we add a metadata with the outport in the selector to make the flow unique
611 Criterion innerSelectorMeta = Criteria.matchMetadata(output.port().toLong());
612
613 if (innerVlan.toShort() == VlanId.ANY_VALUE) {
614 TrafficSelector outerSelector = buildSelector(inport, outerVlan, outerPbit, dstMac);
615 installDownstreamRulesForAnyVlan(fwd, output, outerSelector,
616 buildSelector(inport,
617 Criteria.matchVlanId(VlanId.ANY),
618 innerSelectorMeta));
619 } else {
620 // Required to differentiate the same match flows
621 // Please note that S tag and S p bit values will be same for the same service - so conflict flows!
622 // Metadata match criteria solves the conflict issue - but not used by the voltha
623 // Maybe - find a better way to solve the above problem
624 Criterion metadata = Criteria.matchMetadata(innerVlan.toShort());
625 TrafficSelector outerSelector = buildSelector(inport, metadata, outerVlan, outerPbit, dstMac);
626 installDownstreamRulesForVlans(fwd, output, outerSelector, buildSelector(inport, innerVid,
627 innerSelectorMeta));
628 }
629 }
630
631 private void installDownstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
632 TrafficSelector outerSelector, TrafficSelector innerSelector) {
633
yasin saplib4b8ee12021-06-13 18:25:20 +0000634 Instruction onuDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_ONU));
635 Instruction oltDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_OLT));
636
Andrea Campanella37f07e42021-02-16 11:24:39 +0100637 List<Pair<Instruction, Instruction>> vlanOps =
638 vlanOps(fwd,
639 L2ModificationInstruction.L2SubType.VLAN_POP);
640
641 if (vlanOps == null || vlanOps.isEmpty()) {
642 return;
643 }
644
645 Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
646
647 TrafficTreatment innerTreatment;
648 VlanId setVlanId = ((L2ModificationInstruction.ModVlanIdInstruction) popAndRewrite.getRight()).vlanId();
649 if (VlanId.NONE.equals(setVlanId)) {
yasin saplib4b8ee12021-06-13 18:25:20 +0000650 innerTreatment = (buildTreatment(popAndRewrite.getLeft(), onuDsMeter,
651 writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100652 } else {
653 innerTreatment = (buildTreatment(popAndRewrite.getRight(),
yasin saplib4b8ee12021-06-13 18:25:20 +0000654 onuDsMeter, writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100655 }
656
657 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
658 fwd.treatment().allInstructions());
659
660 Instruction innerPbitSet = null;
661
662 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
663 innerPbitSet = setVlanPcps.get(0);
664 }
665
666 VlanId remarkInnerVlan = null;
667 Optional<Criterion> vlanIdCriterion = readFromSelector(innerSelector, Criterion.Type.VLAN_VID);
668 if (vlanIdCriterion.isPresent()) {
669 remarkInnerVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
670 }
671
672 Instruction modVlanId = null;
673 if (innerPbitSet != null) {
674 modVlanId = Instructions.modVlanId(remarkInnerVlan);
675 }
676
677 //match: in port (nni), s-tag
678 //action: pop vlan (s-tag), write metadata, go to table 1, meter
679 FlowRule.Builder outer = DefaultFlowRule.builder()
680 .fromApp(fwd.appId())
681 .forDevice(deviceId)
682 .makePermanent()
683 .withPriority(fwd.priority())
684 .withSelector(outerSelector)
685 .withTreatment(buildTreatment(popAndRewrite.getLeft(), modVlanId,
yasin saplib4b8ee12021-06-13 18:25:20 +0000686 innerPbitSet, oltDsMeter,
Andrea Campanella37f07e42021-02-16 11:24:39 +0100687 fetchWriteMetadata(fwd),
688 Instructions.transition(QQ_TABLE)));
689
690 //match: in port (nni), c-tag
691 //action: immediate: write metadata and pop, meter, output
692 FlowRule.Builder inner = DefaultFlowRule.builder()
693 .fromApp(fwd.appId())
694 .forDevice(deviceId)
695 .forTable(QQ_TABLE)
696 .makePermanent()
697 .withPriority(fwd.priority())
698 .withSelector(innerSelector)
699 .withTreatment(innerTreatment);
700 applyRules(fwd, inner, outer);
701 }
702
703 private void installDownstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
704 TrafficSelector outerSelector, TrafficSelector innerSelector) {
705
yasin saplib4b8ee12021-06-13 18:25:20 +0000706 Instruction onuDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_ONU));
707 Instruction oltDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_OLT));
708
Andrea Campanella37f07e42021-02-16 11:24:39 +0100709 //match: in port (nni), s-tag
710 //action: immediate: write metadata, pop vlan, meter and go to table 1
711 FlowRule.Builder outer = DefaultFlowRule.builder()
712 .fromApp(fwd.appId())
713 .forDevice(deviceId)
714 .makePermanent()
715 .withPriority(fwd.priority())
716 .withSelector(outerSelector)
yasin saplib4b8ee12021-06-13 18:25:20 +0000717 .withTreatment(buildTreatment(Instructions.popVlan(), oltDsMeter,
Andrea Campanella37f07e42021-02-16 11:24:39 +0100718 fetchWriteMetadata(fwd), Instructions.transition(QQ_TABLE)));
719
720 //match: in port (nni) and s-tag
721 //action: immediate : write metadata, meter and output
722 FlowRule.Builder inner = DefaultFlowRule.builder()
723 .fromApp(fwd.appId())
724 .forDevice(deviceId)
725 .forTable(QQ_TABLE)
726 .makePermanent()
727 .withPriority(fwd.priority())
728 .withSelector(innerSelector)
yasin saplib4b8ee12021-06-13 18:25:20 +0000729 .withTreatment(buildTreatment(onuDsMeter, writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100730
731 applyRules(fwd, inner, outer);
732 }
733
734 private void installUpstreamRules(ForwardingObjective fwd) {
735 List<Pair<Instruction, Instruction>> vlanOps =
736 vlanOps(fwd,
737 L2ModificationInstruction.L2SubType.VLAN_PUSH);
738
739 if (vlanOps == null || vlanOps.isEmpty()) {
740 return;
741 }
742
743 Instruction output = fetchOutput(fwd, UPSTREAM);
744
745 if (output == null) {
746 return;
747 }
748
749 Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
750
751 boolean noneValueVlanStatus = checkNoneVlanCriteria(fwd);
752 boolean anyValueVlanStatus = checkAnyVlanMatchCriteria(fwd);
753
754 if (anyValueVlanStatus) {
755 installUpstreamRulesForAnyVlan(fwd, output, outerPair);
756 } else {
757 Pair<Instruction, Instruction> innerPair = outerPair;
758 outerPair = vlanOps.remove(0);
759 installUpstreamRulesForVlans(fwd, output, innerPair, outerPair, noneValueVlanStatus);
760 }
761 }
762
763 private void installUpstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
764 Pair<Instruction, Instruction> innerPair,
765 Pair<Instruction, Instruction> outerPair, Boolean noneValueVlanStatus) {
766
yasin saplib4b8ee12021-06-13 18:25:20 +0000767 Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_ONU));
768 Instruction oltUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_OLT));
769
Andrea Campanella37f07e42021-02-16 11:24:39 +0100770 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
771 fwd.treatment().allInstructions());
772
773 Instruction innerPbitSet = null;
774 Instruction outerPbitSet = null;
775
776 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
777 innerPbitSet = setVlanPcps.get(0);
778 outerPbitSet = setVlanPcps.get(1);
779 }
780
781 TrafficTreatment innerTreatment;
782 if (noneValueVlanStatus) {
yasin saplib4b8ee12021-06-13 18:25:20 +0000783 innerTreatment = buildTreatment(innerPair.getLeft(), innerPair.getRight(), onuUsMeter,
Andrea Campanella37f07e42021-02-16 11:24:39 +0100784 fetchWriteMetadata(fwd), innerPbitSet,
785 Instructions.transition(QQ_TABLE));
786 } else {
yasin saplib4b8ee12021-06-13 18:25:20 +0000787 innerTreatment = buildTreatment(innerPair.getRight(), onuUsMeter, fetchWriteMetadata(fwd),
Andrea Campanella37f07e42021-02-16 11:24:39 +0100788 innerPbitSet, Instructions.transition(QQ_TABLE));
789 }
790
791 //match: in port, vlanId (0 or None)
792 //action:
793 //if vlanId None, push & set c-tag go to table 1
794 //if vlanId 0 or any specific vlan, set c-tag, write metadata, meter and go to table 1
795 FlowRule.Builder inner = DefaultFlowRule.builder()
796 .fromApp(fwd.appId())
797 .forDevice(deviceId)
798 .makePermanent()
799 .withPriority(fwd.priority())
800 .withSelector(fwd.selector())
801 .withTreatment(innerTreatment);
802
803 PortCriterion inPort = (PortCriterion)
804 fwd.selector().getCriterion(Criterion.Type.IN_PORT);
805
806 VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
807 innerPair.getRight()).vlanId();
808
809 //match: in port, c-tag
810 //action: immediate: push s-tag, write metadata, meter and output
811 FlowRule.Builder outer = DefaultFlowRule.builder()
812 .fromApp(fwd.appId())
813 .forDevice(deviceId)
814 .forTable(QQ_TABLE)
815 .makePermanent()
816 .withPriority(fwd.priority())
817 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
yasin saplib4b8ee12021-06-13 18:25:20 +0000818 oltUsMeter, writeMetadataIncludingOnlyTp(fwd),
819 outerPbitSet, output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100820
821 if (innerPbitSet != null) {
822 byte innerPbit = ((L2ModificationInstruction.ModVlanPcpInstruction)
823 innerPbitSet).vlanPcp();
824 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId), Criteria.matchVlanPcp(innerPbit)));
825 } else {
826 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId)));
827 }
828
829 applyRules(fwd, inner, outer);
830 }
831
832 private void installUpstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
833 Pair<Instruction, Instruction> outerPair) {
834
835 log.debug("Installing upstream rules for any value vlan");
yasin saplib4b8ee12021-06-13 18:25:20 +0000836 Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_ONU));
837 Instruction oltUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_OLT));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100838
839 //match: in port and any-vlan (coming from OLT app.)
840 //action: write metadata, go to table 1 and meter
841 FlowRule.Builder inner = DefaultFlowRule.builder()
842 .fromApp(fwd.appId())
843 .forDevice(deviceId)
844 .makePermanent()
845 .withPriority(fwd.priority())
846 .withSelector(fwd.selector())
yasin saplib4b8ee12021-06-13 18:25:20 +0000847 .withTreatment(buildTreatment(Instructions.transition(QQ_TABLE), onuUsMeter, fetchWriteMetadata(fwd)));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100848
849 //match: in port and any-vlan (coming from OLT app.)
850 //action: immediate: push:QinQ, vlanId (s-tag), write metadata, meter and output
851 FlowRule.Builder outer = DefaultFlowRule.builder()
852 .fromApp(fwd.appId())
853 .forDevice(deviceId)
854 .forTable(QQ_TABLE)
855 .makePermanent()
856 .withPriority(fwd.priority())
857 .withSelector(fwd.selector())
858 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
yasin saplib4b8ee12021-06-13 18:25:20 +0000859 oltUsMeter, writeMetadataIncludingOnlyTp(fwd), output));
Andrea Campanella37f07e42021-02-16 11:24:39 +0100860
861 applyRules(fwd, inner, outer);
862 }
863
864 private boolean checkNoneVlanCriteria(ForwardingObjective fwd) {
865 // Add the VLAN_PUSH treatment if we're matching on VlanId.NONE
866 Criterion vlanMatchCriterion = filterForCriterion(fwd.selector().criteria(), Criterion.Type.VLAN_VID);
867 boolean noneValueVlanStatus = false;
868 if (vlanMatchCriterion != null) {
869 noneValueVlanStatus = ((VlanIdCriterion) vlanMatchCriterion).vlanId().equals(VlanId.NONE);
870 }
871 return noneValueVlanStatus;
872 }
873
874 private boolean checkAnyVlanMatchCriteria(ForwardingObjective fwd) {
875 Criterion anyValueVlanCriterion = fwd.selector().criteria().stream()
876 .filter(c -> c.type().equals(Criterion.Type.VLAN_VID))
877 .filter(vc -> ((VlanIdCriterion) vc).vlanId().toShort() == VlanId.ANY_VALUE)
878 .findAny().orElse(null);
879
880 if (anyValueVlanCriterion == null) {
881 log.debug("Any value vlan match criteria is not found, criteria {}",
882 fwd.selector().criteria());
883 return false;
884 }
885
886 return true;
887 }
888
889 private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
890 Instruction output = fwd.treatment().allInstructions().stream()
891 .filter(i -> i.type() == Instruction.Type.OUTPUT)
892 .findFirst().orElse(null);
893
894 if (output == null) {
895 log.error("OLT {} rule has no output", direction);
896 fail(fwd, ObjectiveError.BADPARAMS);
897 return null;
898 }
899 return output;
900 }
901
yasin saplib4b8ee12021-06-13 18:25:20 +0000902 private Instruction fetchMeterById(ForwardingObjective fwd, String meterId) {
903 Optional<Instructions.MeterInstruction> meter = fwd.treatment().meters().stream()
904 .filter(meterInstruction -> meterInstruction.meterId().toString().equals(meterId)).findAny();
905 if (meter.isEmpty()) {
906 log.debug("Meter instruction is not found for the meterId: {} ", meterId);
Andrea Campanella37f07e42021-02-16 11:24:39 +0100907 return null;
908 }
yasin saplib4b8ee12021-06-13 18:25:20 +0000909 log.debug("Meter instruction is found for the meterId: {} ", meterId);
910 return meter.get();
Andrea Campanella37f07e42021-02-16 11:24:39 +0100911 }
912
913 private Instructions.MetadataInstruction fetchWriteMetadata(ForwardingObjective fwd) {
914 Instructions.MetadataInstruction writeMetadata = fwd.treatment().writeMetadata();
915
916 if (writeMetadata == null) {
917 log.warn("Write metadata is not found for the forwarding obj");
918 fail(fwd, ObjectiveError.BADPARAMS);
919 return null;
920 }
921
922 log.debug("Write metadata is found {}", writeMetadata);
923 return writeMetadata;
924 }
925
926 private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
927 L2ModificationInstruction.L2SubType type) {
928
929 List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
930 fwd.treatment().allInstructions(), type);
931
932 if (vlanOps == null || vlanOps.isEmpty()) {
933 String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
934 ? DOWNSTREAM : UPSTREAM;
935 log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
936 fail(fwd, ObjectiveError.BADPARAMS);
937 return ImmutableList.of();
938 }
939 return vlanOps;
940 }
941
942
943 private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
944 L2ModificationInstruction.L2SubType type) {
945
946 List<Instruction> vlanOperations = findL2Instructions(
947 type,
948 instructions);
949 List<Instruction> vlanSets = findL2Instructions(
950 L2ModificationInstruction.L2SubType.VLAN_ID,
951 instructions);
952
953 if (vlanOperations.size() != vlanSets.size()) {
954 return ImmutableList.of();
955 }
956
957 List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
958
959 for (int i = 0; i < vlanOperations.size(); i++) {
960 pairs.add(new ImmutablePair<>(vlanOperations.get(i), vlanSets.get(i)));
961 }
962 return pairs;
963 }
964
965 private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
966 List<Instruction> actions) {
967 return actions.stream()
968 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
969 .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
970 .collect(Collectors.toList());
971 }
972
973 private void provisionEthTypeBasedFilter(FilteringObjective filter,
974 EthTypeCriterion ethType,
975 Instructions.OutputInstruction output,
976 L2ModificationInstruction vlanId,
977 L2ModificationInstruction vlanPush) {
978
979 Instruction meter = filter.meta().metered();
980 Instruction writeMetadata = filter.meta().writeMetadata();
981
982 TrafficSelector selector = buildSelector(filter.key(), ethType);
983 TrafficTreatment treatment;
984
985 if (vlanPush == null || vlanId == null) {
986 treatment = buildTreatment(output, meter, writeMetadata);
987 } else {
988 // we need to push the vlan because it came untagged (ATT)
989 treatment = buildTreatment(output, meter, vlanPush, vlanId, writeMetadata);
990 }
991
992 buildAndApplyRule(filter, selector, treatment);
993
994 }
995
996 private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
997 IPProtocolCriterion ipProto,
998 Instructions.OutputInstruction output,
999 Instruction vlan, Instruction pcp) {
1000
1001 Instruction meter = filter.meta().metered();
1002 Instruction writeMetadata = filter.meta().writeMetadata();
1003
1004 // uniTagMatch
1005 VlanIdCriterion vlanId = (VlanIdCriterion) filterForCriterion(filter.conditions(),
1006 Criterion.Type.VLAN_VID);
1007
1008 TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto, vlanId);
1009 TrafficTreatment treatment = buildTreatment(output, vlan, pcp, meter, writeMetadata);
1010 buildAndApplyRule(filter, selector, treatment);
1011 }
1012
1013 private void provisionDhcp(FilteringObjective filter, EthTypeCriterion ethType,
1014 IPProtocolCriterion ipProto,
1015 UdpPortCriterion udpSrcPort,
1016 UdpPortCriterion udpDstPort,
1017 Instruction vlanIdInstruction,
1018 Instruction vlanPcpInstruction,
1019 Instructions.OutputInstruction output) {
1020
1021 Instruction meter = filter.meta().metered();
1022 Instruction writeMetadata = filter.meta().writeMetadata();
1023
1024 VlanIdCriterion matchVlanId = (VlanIdCriterion)
1025 filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
1026
1027 TrafficSelector selector;
1028 TrafficTreatment treatment;
1029
1030 if (matchVlanId != null) {
1031 log.debug("Building selector with match VLAN, {}", matchVlanId);
1032 // in case of TT upstream the packet comes tagged and the vlan is swapped.
1033 selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort,
1034 udpDstPort, matchVlanId);
1035 treatment = buildTreatment(output, meter, writeMetadata,
1036 vlanIdInstruction, vlanPcpInstruction);
1037 } else {
1038 log.debug("Building selector with no VLAN");
1039 // in case of ATT upstream the packet comes in untagged and we need to push the vlan
1040 selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort, udpDstPort);
1041 treatment = buildTreatment(output, meter, vlanIdInstruction, writeMetadata);
1042 }
1043 //In case of downstream there will be no match on the VLAN, which is null,
1044 // so it will just be output, meter, writeMetadata
1045
1046 buildAndApplyRule(filter, selector, treatment);
1047 }
1048
1049 private void provisionPPPoED(FilteringObjective filter, EthTypeCriterion ethType,
1050 Instruction vlanIdInstruction,
1051 Instruction vlanPcpInstruction,
1052 Instructions.OutputInstruction output) {
1053 Instruction meter = filter.meta().metered();
1054 Instruction writeMetadata = filter.meta().writeMetadata();
1055
1056 VlanIdCriterion matchVlanId = (VlanIdCriterion)
1057 filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
1058
1059 TrafficSelector selector;
1060 TrafficTreatment treatment;
1061
1062 if (matchVlanId != null) {
1063 log.debug("Building pppoed selector with match VLAN {}.", matchVlanId);
1064 } else {
1065 log.debug("Building pppoed selector without match VLAN.");
1066 }
1067
1068 selector = buildSelector(filter.key(), ethType, matchVlanId);
1069 treatment = buildTreatment(output, meter, writeMetadata, vlanIdInstruction, vlanPcpInstruction);
1070 buildAndApplyRule(filter, selector, treatment);
1071 }
1072
1073 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
1074 TrafficTreatment treatment) {
1075 FlowRule rule = DefaultFlowRule.builder()
1076 .fromApp(filter.appId())
1077 .forDevice(deviceId)
1078 .forTable(0)
1079 .makePermanent()
1080 .withSelector(selector)
1081 .withTreatment(treatment)
1082 .withPriority(filter.priority())
1083 .build();
1084
1085 if (accumulator != null) {
1086 if (log.isDebugEnabled()) {
1087 log.debug("Adding pair to batch: {}", Pair.of(filter, rule));
1088 }
1089 accumulator.add(Pair.of(filter, rule));
1090 } else {
1091 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
1092 switch (filter.type()) {
1093 case PERMIT:
1094 opsBuilder.add(rule);
1095 break;
1096 case DENY:
1097 opsBuilder.remove(rule);
1098 break;
1099 default:
1100 log.warn("Unknown filter type : {}", filter.type());
1101 fail(filter, ObjectiveError.UNSUPPORTED);
1102 }
1103 applyFlowRules(ImmutableList.of(filter), opsBuilder);
1104 }
1105 }
1106
1107 private void applyRules(ForwardingObjective fwd, FlowRule.Builder... fwdBuilders) {
1108 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
1109 switch (fwd.op()) {
1110 case ADD:
1111 for (FlowRule.Builder fwdBuilder : fwdBuilders) {
1112 builder.add(fwdBuilder.build());
1113 }
1114 break;
1115 case REMOVE:
1116 for (FlowRule.Builder fwdBuilder : fwdBuilders) {
1117 builder.remove(fwdBuilder.build());
1118 }
1119 break;
1120 case ADD_TO_EXISTING:
1121 break;
1122 case REMOVE_FROM_EXISTING:
1123 break;
1124 default:
1125 log.warn("Unknown forwarding operation: {}", fwd.op());
1126 }
1127
1128 applyFlowRules(ImmutableList.of(fwd), builder);
1129
1130
1131 }
1132
1133 private void applyFlowRules(List<Objective> objectives, FlowRuleOperations.Builder builder) {
1134 flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
1135 @Override
1136 public void onSuccess(FlowRuleOperations ops) {
1137 objectives.forEach(obj -> {
1138 pass(obj);
1139 });
1140 }
1141
1142 @Override
1143 public void onError(FlowRuleOperations ops) {
1144 objectives.forEach(obj -> {
1145 fail(obj, ObjectiveError.FLOWINSTALLATIONFAILED);
1146 });
1147
1148 }
1149 }));
1150 }
1151
1152 // Builds the batch using the accumulated flow rules
1153 private void sendFilters(List<Pair<FilteringObjective, FlowRule>> pairs) {
1154 FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07001155 if (log.isDebugEnabled()) {
1156 log.debug("Sending batch of {} filter-objs", pairs.size());
1157 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001158 List<Objective> filterObjs = Lists.newArrayList();
1159 // Iterates over all accumulated flow rules and then build an unique batch
1160 pairs.forEach(pair -> {
1161 FilteringObjective filter = pair.getLeft();
1162 FlowRule rule = pair.getRight();
1163 switch (filter.type()) {
1164 case PERMIT:
1165 flowOpsBuilder.add(rule);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07001166 if (log.isTraceEnabled()) {
1167 log.trace("Applying add filter-obj {} to device: {}", filter.id(), deviceId);
1168 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001169 filterObjs.add(filter);
1170 break;
1171 case DENY:
1172 flowOpsBuilder.remove(rule);
Matteo Scandoloaa2adde2021-09-13 12:45:32 -07001173 if (log.isTraceEnabled()) {
1174 log.trace("Deleting flow rule {} from device: {}", rule, deviceId);
1175 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001176 filterObjs.add(filter);
1177 break;
1178 default:
1179 fail(filter, ObjectiveError.UNKNOWN);
1180 log.warn("Unknown forwarding type {}", filter.type());
1181 }
1182 });
1183 if (log.isDebugEnabled()) {
1184 log.debug("Applying batch {}", flowOpsBuilder.build());
1185 }
1186 // Finally applies the operations
1187 applyFlowRules(filterObjs, flowOpsBuilder);
1188 }
1189
1190 private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
1191 return criteria.stream()
1192 .filter(c -> c.type().equals(type))
1193 .limit(1)
1194 .findFirst().orElse(null);
1195 }
1196
1197 private TrafficSelector buildSelector(Criterion... criteria) {
1198
1199 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
1200
1201 Arrays.stream(criteria).filter(Objects::nonNull).forEach(sBuilder::add);
1202
1203 return sBuilder.build();
1204 }
1205
1206 private TrafficTreatment buildTreatment(Instruction... instructions) {
1207
1208 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
1209
1210 Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add);
1211
1212 return tBuilder.build();
1213 }
1214
1215 private Instruction writeMetadataIncludingOnlyTp(ForwardingObjective fwd) {
1216
1217 return Instructions.writeMetadata(
1218 fetchWriteMetadata(fwd).metadata() & 0xFFFF00000000L, 0L);
1219 }
1220
1221 private void fail(Objective obj, ObjectiveError error) {
1222 obj.context().ifPresent(context -> context.onError(obj, error));
1223 }
1224
1225 private void pass(Objective obj) {
1226 obj.context().ifPresent(context -> context.onSuccess(obj));
1227 }
1228
1229
1230 private class InnerGroupListener implements GroupListener {
1231 @Override
1232 public void event(GroupEvent event) {
1233 GroupKey key = event.subject().appCookie();
1234 NextObjective obj = pendingGroups.getIfPresent(key);
1235 if (obj == null) {
Andrea Campanella438e1ad2021-03-26 11:41:16 +01001236 if (log.isTraceEnabled()) {
1237 log.trace("No pending group for {}, moving on", key);
1238 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001239 return;
1240 }
Andrea Campanella438e1ad2021-03-26 11:41:16 +01001241 log.debug("Event {} for group {}, handling pending" +
Andrea Campanella37f07e42021-02-16 11:24:39 +01001242 "NextGroup {}", event.type(), key, obj.id());
1243 if (event.type() == GroupEvent.Type.GROUP_ADDED ||
1244 event.type() == GroupEvent.Type.GROUP_UPDATED) {
1245 flowObjectiveStore.putNextGroup(obj.id(), new OltPipelineGroup(key));
Andrea Campanella37f07e42021-02-16 11:24:39 +01001246 pendingGroups.invalidate(key);
Esin Karamand106e522021-03-15 14:08:48 +00001247 pass(obj);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001248 } else if (event.type() == GroupEvent.Type.GROUP_REMOVED) {
1249 flowObjectiveStore.removeNextGroup(obj.id());
Andrea Campanella37f07e42021-02-16 11:24:39 +01001250 pendingGroups.invalidate(key);
Esin Karamand106e522021-03-15 14:08:48 +00001251 pass(obj);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001252 }
1253 }
1254 }
1255
1256 private static class OltPipelineGroup implements NextGroup {
1257
1258 private final GroupKey key;
1259
1260 public OltPipelineGroup(GroupKey key) {
1261 this.key = key;
1262 }
1263
1264 public GroupKey key() {
1265 return key;
1266 }
1267
1268 @Override
1269 public byte[] data() {
1270 return appKryo.serialize(key);
1271 }
1272
1273 }
1274
1275 @Override
1276 public List<String> getNextMappings(NextGroup nextGroup) {
1277 // TODO Implementation deferred to vendor
1278 return null;
1279 }
1280
1281 // Flow rules accumulator for reducing the number of transactions required to the devices.
1282 private final class ObjectiveAccumulator
1283 extends AbstractAccumulator<Pair<FilteringObjective, FlowRule>> {
1284
1285 ObjectiveAccumulator(int maxFilter, int maxBatchMS, int maxIdleMS) {
1286 super(TIMER, maxFilter, maxBatchMS, maxIdleMS);
1287 }
1288
1289 @Override
1290 public void processItems(List<Pair<FilteringObjective, FlowRule>> pairs) {
1291 // Triggers creation of a batch using the list of flowrules generated from objs.
1292 accumulatorExecutorService.execute(new FlowRulesBuilderTask(pairs));
1293 }
1294 }
1295
1296 // Task for building batch of flow rules in a separate thread.
1297 private final class FlowRulesBuilderTask implements Runnable {
1298 private final List<Pair<FilteringObjective, FlowRule>> pairs;
1299
1300 FlowRulesBuilderTask(List<Pair<FilteringObjective, FlowRule>> pairs) {
1301 this.pairs = pairs;
1302 }
1303
1304 @Override
1305 public void run() {
1306 try {
1307 sendFilters(pairs);
1308 } catch (Exception e) {
1309 log.warn("Unable to send objectives", e);
1310 }
1311 }
1312 }
1313}