blob: 5232eb896b06483b5391ee002159dc26fbcfa4cf [file] [log] [blame]
Andrea Campanella37f07e42021-02-16 11:24:39 +01001/*
2 * Copyright 2016-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.olt.driver;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalCause;
21import com.google.common.cache.RemovalNotification;
22import com.google.common.collect.ImmutableList;
23import com.google.common.collect.Lists;
24import org.apache.commons.lang3.tuple.ImmutablePair;
25import org.apache.commons.lang3.tuple.Pair;
26import org.onlab.osgi.ServiceDirectory;
27import org.onlab.packet.EthType;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IPv6;
30import org.onlab.packet.IpPrefix;
31import org.onlab.packet.VlanId;
32import org.onlab.util.AbstractAccumulator;
33import org.onlab.util.Accumulator;
34import org.onlab.util.KryoNamespace;
35import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
37import org.onosproject.net.DeviceId;
38import org.onosproject.net.PortNumber;
39import org.onosproject.net.behaviour.NextGroup;
40import org.onosproject.net.behaviour.Pipeliner;
41import org.onosproject.net.behaviour.PipelinerContext;
42import org.onosproject.net.driver.AbstractHandlerBehaviour;
43import org.onosproject.net.driver.Driver;
44import org.onosproject.net.flow.DefaultFlowRule;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.FlowRule;
48import org.onosproject.net.flow.FlowRuleOperations;
49import org.onosproject.net.flow.FlowRuleOperationsContext;
50import org.onosproject.net.flow.FlowRuleService;
51import org.onosproject.net.flow.TrafficSelector;
52import org.onosproject.net.flow.TrafficTreatment;
53import org.onosproject.net.flow.criteria.Criteria;
54import org.onosproject.net.flow.criteria.Criterion;
55import org.onosproject.net.flow.criteria.EthTypeCriterion;
56import org.onosproject.net.flow.criteria.IPCriterion;
57import org.onosproject.net.flow.criteria.IPProtocolCriterion;
58import org.onosproject.net.flow.criteria.PortCriterion;
59import org.onosproject.net.flow.criteria.UdpPortCriterion;
60import org.onosproject.net.flow.criteria.VlanIdCriterion;
61import org.onosproject.net.flow.instructions.Instruction;
62import org.onosproject.net.flow.instructions.Instructions;
63import org.onosproject.net.flow.instructions.L2ModificationInstruction;
64import org.onosproject.net.flowobjective.FilteringObjective;
65import org.onosproject.net.flowobjective.FlowObjectiveStore;
66import org.onosproject.net.flowobjective.ForwardingObjective;
67import org.onosproject.net.flowobjective.NextObjective;
68import org.onosproject.net.flowobjective.Objective;
69import org.onosproject.net.flowobjective.ObjectiveError;
70import org.onosproject.net.group.DefaultGroupBucket;
71import org.onosproject.net.group.DefaultGroupDescription;
72import org.onosproject.net.group.DefaultGroupKey;
73import org.onosproject.net.group.Group;
74import org.onosproject.net.group.GroupBucket;
75import org.onosproject.net.group.GroupBuckets;
76import org.onosproject.net.group.GroupDescription;
77import org.onosproject.net.group.GroupEvent;
78import org.onosproject.net.group.GroupKey;
79import org.onosproject.net.group.GroupListener;
80import org.onosproject.net.group.GroupService;
81import org.onosproject.store.serializers.KryoNamespaces;
82import org.onosproject.store.service.StorageService;
83import org.slf4j.Logger;
84
85import java.util.Arrays;
86import java.util.Collection;
87import java.util.Collections;
88import java.util.List;
89import java.util.Objects;
90import java.util.Optional;
91import java.util.concurrent.ScheduledExecutorService;
92import java.util.Timer;
93import java.util.concurrent.TimeUnit;
94import java.util.stream.Collectors;
95
96import static org.onosproject.core.CoreService.CORE_APP_NAME;
97import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
98import static org.onlab.util.Tools.groupedThreads;
99import static org.slf4j.LoggerFactory.getLogger;
100
101/**
102 * Pipeliner for OLT device.
103 */
104
105public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
106
107 private static final Integer QQ_TABLE = 1;
108 private static final int NO_ACTION_PRIORITY = 500;
109 private static final String DOWNSTREAM = "downstream";
110 private static final String UPSTREAM = "upstream";
111 private final Logger log = getLogger(getClass());
112
113 private ServiceDirectory serviceDirectory;
114 private FlowRuleService flowRuleService;
115 private GroupService groupService;
116 private CoreService coreService;
117 private StorageService storageService;
118
119 private DeviceId deviceId;
120 private ApplicationId appId;
121
122
123 protected FlowObjectiveStore flowObjectiveStore;
124
125 private Cache<GroupKey, NextObjective> pendingGroups;
126
127 protected static KryoNamespace appKryo = new KryoNamespace.Builder()
128 .register(KryoNamespaces.API)
129 .register(GroupKey.class)
130 .register(DefaultGroupKey.class)
131 .register(OltPipelineGroup.class)
132 .build("OltPipeline");
133
134 private static final Timer TIMER = new Timer("filterobj-batching");
135 private Accumulator<Pair<FilteringObjective, FlowRule>> accumulator;
136
137 // accumulator executor service
138 private ScheduledExecutorService accumulatorExecutorService
139 = newSingleThreadScheduledExecutor(groupedThreads("OltPipeliner", "acc-%d", log));
140
141 @Override
142 public void init(DeviceId deviceId, PipelinerContext context) {
143 log.debug("Initiate OLT pipeline");
144 this.serviceDirectory = context.directory();
145 this.deviceId = deviceId;
146
147 flowRuleService = serviceDirectory.get(FlowRuleService.class);
148 coreService = serviceDirectory.get(CoreService.class);
149 groupService = serviceDirectory.get(GroupService.class);
150 flowObjectiveStore = context.store();
151 storageService = serviceDirectory.get(StorageService.class);
152
153 appId = coreService.registerApplication(
154 "org.onosproject.driver.OLTPipeline");
155
156 // Init the accumulator, if enabled
157 if (isAccumulatorEnabled()) {
158 log.debug("Building accumulator with maxObjs {}, batchMs {}, idleMs {}",
159 context.accumulatorMaxObjectives(), context.accumulatorMaxBatchMillis(),
160 context.accumulatorMaxIdleMillis());
161 accumulator = new ObjectiveAccumulator(context.accumulatorMaxObjectives(),
162 context.accumulatorMaxBatchMillis(),
163 context.accumulatorMaxIdleMillis());
Andrea Campanella438e1ad2021-03-26 11:41:16 +0100164 } else {
165 log.debug("Olt Pipeliner accumulator is disabled, processing immediately");
Andrea Campanella37f07e42021-02-16 11:24:39 +0100166 }
167
168
169 pendingGroups = CacheBuilder.newBuilder()
170 .expireAfterWrite(20, TimeUnit.SECONDS)
171 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
172 if (notification.getCause() == RemovalCause.EXPIRED) {
173 fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
174 }
175 }).build();
176
177 groupService.addListener(new InnerGroupListener());
178
179 }
180
181 public boolean isAccumulatorEnabled() {
182 Driver driver = super.data().driver();
183 // we cannot determine the property
184 if (driver == null) {
185 return false;
186 }
187 return Boolean.parseBoolean(driver.getProperty(ACCUMULATOR_ENABLED));
188 }
189
190 @Override
191 public void filter(FilteringObjective filter) {
192 Instructions.OutputInstruction output;
193
194 if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
195 output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
196 .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
197 .limit(1)
198 .findFirst().get();
199
200 if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
201 log.warn("OLT can only filter packet to controller");
202 fail(filter, ObjectiveError.UNSUPPORTED);
203 return;
204 }
205 } else {
206 fail(filter, ObjectiveError.BADPARAMS);
207 return;
208 }
209
210 if (filter.key().type() != Criterion.Type.IN_PORT) {
211 fail(filter, ObjectiveError.BADPARAMS);
212 return;
213 }
214
215 EthTypeCriterion ethType = (EthTypeCriterion)
216 filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
217
218 if (ethType == null) {
219 fail(filter, ObjectiveError.BADPARAMS);
220 return;
221 }
222 Optional<Instruction> vlanId = filter.meta().immediate().stream()
223 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
224 && ((L2ModificationInstruction) t).subtype()
225 .equals(L2ModificationInstruction.L2SubType.VLAN_ID))
226 .limit(1)
227 .findFirst();
228
229 Optional<Instruction> vlanPcp = filter.meta().immediate().stream()
230 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
231 && ((L2ModificationInstruction) t).subtype()
232 .equals(L2ModificationInstruction.L2SubType.VLAN_PCP))
233 .limit(1)
234 .findFirst();
235
236 Optional<Instruction> vlanPush = filter.meta().immediate().stream()
237 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
238 && ((L2ModificationInstruction) t).subtype()
239 .equals(L2ModificationInstruction.L2SubType.VLAN_PUSH))
240 .limit(1)
241 .findFirst();
242
243 if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
244
245 if (vlanId.isEmpty() || vlanPush.isEmpty()) {
246 log.warn("Missing EAPOL vlan or vlanPush");
247 fail(filter, ObjectiveError.BADPARAMS);
248 return;
249 }
250 provisionEthTypeBasedFilter(filter, ethType, output,
251 (L2ModificationInstruction) vlanId.get(),
252 (L2ModificationInstruction) vlanPush.get());
253 } else if (ethType.ethType().equals(EthType.EtherType.PPPoED.ethType())) {
254 provisionPPPoED(filter, ethType, vlanId.orElse(null), vlanPcp.orElse(null), output);
255 } else if (ethType.ethType().equals(EthType.EtherType.LLDP.ethType())) {
256 provisionEthTypeBasedFilter(filter, ethType, output, null, null);
257 } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
258 IPProtocolCriterion ipProto = (IPProtocolCriterion)
259 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
260 if (ipProto == null) {
261 log.warn("OLT can only filter IGMP and DHCP");
262 fail(filter, ObjectiveError.UNSUPPORTED);
263 return;
264 }
265 if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
266 provisionIgmp(filter, ethType, ipProto, output,
267 vlanId.orElse(null),
268 vlanPcp.orElse(null));
269 } else if (ipProto.protocol() == IPv4.PROTOCOL_UDP) {
270 UdpPortCriterion udpSrcPort = (UdpPortCriterion)
271 filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
272
273 UdpPortCriterion udpDstPort = (UdpPortCriterion)
274 filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
275
276 if ((udpSrcPort.udpPort().toInt() == 67 && udpDstPort.udpPort().toInt() == 68) ||
277 (udpSrcPort.udpPort().toInt() == 68 && udpDstPort.udpPort().toInt() == 67)) {
278 provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
279 vlanPcp.orElse(null), output);
280 } else {
281 log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
282 fail(filter, ObjectiveError.UNSUPPORTED);
283 }
284 } else {
285 log.warn("Currently supporting only IGMP and DHCP filters for IPv4 packets");
286 fail(filter, ObjectiveError.UNSUPPORTED);
287 }
288 } else if (ethType.ethType().equals(EthType.EtherType.IPV6.ethType())) {
289 IPProtocolCriterion ipProto = (IPProtocolCriterion)
290 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
291 if (ipProto == null) {
292 log.warn("OLT can only filter DHCP");
293 fail(filter, ObjectiveError.UNSUPPORTED);
294 return;
295 }
296 if (ipProto.protocol() == IPv6.PROTOCOL_UDP) {
297 UdpPortCriterion udpSrcPort = (UdpPortCriterion)
298 filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
299
300 UdpPortCriterion udpDstPort = (UdpPortCriterion)
301 filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
302
303 if ((udpSrcPort.udpPort().toInt() == 546 && udpDstPort.udpPort().toInt() == 547) ||
304 (udpSrcPort.udpPort().toInt() == 547 && udpDstPort.udpPort().toInt() == 546)) {
305 provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
306 vlanPcp.orElse(null), output);
307 } else {
308 log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
309 fail(filter, ObjectiveError.UNSUPPORTED);
310 }
311 } else {
312 log.warn("Currently supporting only DHCP filters for IPv6 packets");
313 fail(filter, ObjectiveError.UNSUPPORTED);
314 }
315 } else {
316 log.warn("\nOnly the following are Supported in OLT for filter ->\n"
317 + "ETH TYPE : EAPOL, LLDP and IPV4\n"
318 + "IPV4 TYPE: IGMP and UDP (for DHCP)"
319 + "IPV6 TYPE: UDP (for DHCP)");
320 fail(filter, ObjectiveError.UNSUPPORTED);
321 }
322
323 }
324
325
326 @Override
327 public void forward(ForwardingObjective fwd) {
328 log.debug("Installing forwarding objective {}", fwd);
329 if (checkForMulticast(fwd)) {
330 processMulticastRule(fwd);
331 return;
332 }
333
334 TrafficTreatment treatment = fwd.treatment();
335
336 List<Instruction> instructions = treatment.allInstructions();
337
338 Optional<Instruction> vlanInstruction = instructions.stream()
339 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
340 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
341 L2ModificationInstruction.L2SubType.VLAN_PUSH ||
342 ((L2ModificationInstruction) i).subtype() ==
343 L2ModificationInstruction.L2SubType.VLAN_POP)
344 .findAny();
345
346
347 if (!vlanInstruction.isPresent()) {
348 installNoModificationRules(fwd);
349 } else {
350 L2ModificationInstruction vlanIns =
351 (L2ModificationInstruction) vlanInstruction.get();
352 if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
353 installUpstreamRules(fwd);
354 } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
355 installDownstreamRules(fwd);
356 } else {
357 log.error("Unknown OLT operation: {}", fwd);
358 fail(fwd, ObjectiveError.UNSUPPORTED);
359 return;
360 }
361 }
362
363 pass(fwd);
364
365 }
366
367
368 @Override
369 public void next(NextObjective nextObjective) {
370 if (nextObjective.type() != NextObjective.Type.BROADCAST) {
371 log.error("OLT only supports broadcast groups.");
372 fail(nextObjective, ObjectiveError.BADPARAMS);
373 return;
374 }
375
376 if (nextObjective.next().size() != 1 && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
377 log.error("OLT only supports singleton broadcast groups.");
378 fail(nextObjective, ObjectiveError.BADPARAMS);
379 return;
380 }
381
382 Optional<TrafficTreatment> treatmentOpt = nextObjective.next().stream().findFirst();
383 if (treatmentOpt.isEmpty() && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
384 log.error("Next objective {} does not have a treatment", nextObjective);
385 fail(nextObjective, ObjectiveError.BADPARAMS);
386 return;
387 }
388
389 GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
390
391 pendingGroups.put(key, nextObjective);
392 log.trace("NextObjective Operation {}", nextObjective.op());
393 switch (nextObjective.op()) {
394 case ADD:
395 GroupDescription groupDesc =
396 new DefaultGroupDescription(deviceId,
397 GroupDescription.Type.ALL,
398 new GroupBuckets(
399 Collections.singletonList(
400 buildBucket(treatmentOpt.get()))),
401 key,
402 null,
403 nextObjective.appId());
404 groupService.addGroup(groupDesc);
405 break;
406 case REMOVE:
407 groupService.removeGroup(deviceId, key, nextObjective.appId());
408 break;
409 case ADD_TO_EXISTING:
410 groupService.addBucketsToGroup(deviceId, key,
411 new GroupBuckets(
412 Collections.singletonList(
413 buildBucket(treatmentOpt.get()))),
414 key, nextObjective.appId());
415 break;
416 case REMOVE_FROM_EXISTING:
417 groupService.removeBucketsFromGroup(deviceId, key,
418 new GroupBuckets(
419 Collections.singletonList(
420 buildBucket(treatmentOpt.get()))),
421 key, nextObjective.appId());
422 break;
423 default:
424 log.warn("Unknown next objective operation: {}", nextObjective.op());
425 }
426
427
428 }
429
430 private GroupBucket buildBucket(TrafficTreatment treatment) {
431 return DefaultGroupBucket.createAllGroupBucket(treatment);
432 }
433
434 private void processMulticastRule(ForwardingObjective fwd) {
435 if (fwd.nextId() == null) {
436 log.error("Multicast objective does not have a next id");
437 fail(fwd, ObjectiveError.BADPARAMS);
438 }
439
440 GroupKey key = getGroupForNextObjective(fwd.nextId());
441
442 if (key == null) {
443 log.error("Group for forwarding objective missing: {}", fwd);
444 fail(fwd, ObjectiveError.GROUPMISSING);
445 }
446
447 Group group = groupService.getGroup(deviceId, key);
448 TrafficTreatment treatment =
449 buildTreatment(Instructions.createGroup(group.id()));
450
451 TrafficSelector.Builder selectorBuilder = buildIpv4SelectorForMulticast(fwd);
452
453 FlowRule rule = DefaultFlowRule.builder()
454 .fromApp(fwd.appId())
455 .forDevice(deviceId)
456 .forTable(0)
457 .makePermanent()
458 .withPriority(fwd.priority())
459 .withSelector(selectorBuilder.build())
460 .withTreatment(treatment)
461 .build();
462
463 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
464 switch (fwd.op()) {
465
466 case ADD:
467 builder.add(rule);
468 break;
469 case REMOVE:
470 builder.remove(rule);
471 break;
472 case ADD_TO_EXISTING:
473 case REMOVE_FROM_EXISTING:
474 break;
475 default:
476 log.warn("Unknown forwarding operation: {}", fwd.op());
477 }
478
479 applyFlowRules(ImmutableList.of(fwd), builder);
480
481
482 }
483
484 private TrafficSelector.Builder buildIpv4SelectorForMulticast(ForwardingObjective fwd) {
485 TrafficSelector.Builder builderToUpdate = DefaultTrafficSelector.builder();
486
487 Optional<Criterion> vlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.VLAN_VID);
488 if (vlanIdCriterion.isPresent()) {
489 VlanId assignedVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
490 builderToUpdate.matchVlanId(assignedVlan);
491 }
492
493 Optional<Criterion> innerVlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.INNER_VLAN_VID);
494 if (innerVlanIdCriterion.isPresent()) {
495 VlanId assignedInnerVlan = ((VlanIdCriterion) innerVlanIdCriterion.get()).vlanId();
496 builderToUpdate.matchMetadata(assignedInnerVlan.toShort());
497 }
498
499 Optional<Criterion> ethTypeCriterion = readFromSelector(fwd.selector(), Criterion.Type.ETH_TYPE);
500 if (ethTypeCriterion.isPresent()) {
501 EthType ethType = ((EthTypeCriterion) ethTypeCriterion.get()).ethType();
502 builderToUpdate.matchEthType(ethType.toShort());
503 }
504
505 Optional<Criterion> ipv4DstCriterion = readFromSelector(fwd.selector(), Criterion.Type.IPV4_DST);
506 if (ipv4DstCriterion.isPresent()) {
507 IpPrefix ipv4Dst = ((IPCriterion) ipv4DstCriterion.get()).ip();
508 builderToUpdate.matchIPDst(ipv4Dst);
509 }
510
511 return builderToUpdate;
512 }
513
514 static Optional<Criterion> readFromSelector(TrafficSelector selector, Criterion.Type type) {
515 if (selector == null) {
516 return Optional.empty();
517 }
518 Criterion criterion = selector.getCriterion(type);
519 return (criterion == null)
520 ? Optional.empty() : Optional.of(criterion);
521 }
522
523 private boolean checkForMulticast(ForwardingObjective fwd) {
524
525 IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
526 Criterion.Type.IPV4_DST);
527
528 if (ip == null) {
529 return false;
530 }
531
532 return ip.ip().isMulticast();
533
534 }
535
536 private GroupKey getGroupForNextObjective(Integer nextId) {
537 NextGroup next = flowObjectiveStore.getNextGroup(nextId);
538 return appKryo.deserialize(next.data());
539
540 }
541
542 private void installNoModificationRules(ForwardingObjective fwd) {
543 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
544 Instructions.MetadataInstruction writeMetadata = fetchWriteMetadata(fwd);
545 Instructions.MeterInstruction meter = (Instructions.MeterInstruction) fetchMeter(fwd);
546
547 TrafficSelector selector = fwd.selector();
548
549 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
550 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
551 Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
552
553 if (inport == null || output == null || innerVlan == null || outerVlan == null) {
554 // Avoid logging a non-error from lldp, bbdp and eapol core flows.
555 if (!fwd.appId().name().equals(CORE_APP_NAME)) {
556 log.error("Forwarding objective is underspecified: {}", fwd);
557 } else {
558 log.debug("Not installing unsupported core generated flow {}", fwd);
559 }
560 fail(fwd, ObjectiveError.BADPARAMS);
561 return;
562 }
563
564
565 FlowRule.Builder outer = DefaultFlowRule.builder()
566 .fromApp(fwd.appId())
567 .forDevice(deviceId)
568 .makePermanent()
569 .withPriority(fwd.priority())
570 .withSelector(buildSelector(inport, outerVlan))
571 .withTreatment(buildTreatment(output, writeMetadata, meter));
572
573 applyRules(fwd, outer);
574 }
575
576 private void installDownstreamRules(ForwardingObjective fwd) {
577 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
578
579 if (output == null) {
580 return;
581 }
582
583 TrafficSelector selector = fwd.selector();
584
585 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
586 Criterion outerPbit = selector.getCriterion(Criterion.Type.VLAN_PCP);
587 Criterion innerVlanCriterion = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
588 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
589 Criterion dstMac = selector.getCriterion(Criterion.Type.ETH_DST);
590
591 if (outerVlan == null || innerVlanCriterion == null || inport == null) {
592 // Avoid logging a non-error from lldp, bbdp and eapol core flows.
593 if (!fwd.appId().name().equals(CORE_APP_NAME)) {
594 log.error("Forwarding objective is underspecified: {}", fwd);
595 } else {
596 log.debug("Not installing unsupported core generated flow {}", fwd);
597 }
598 fail(fwd, ObjectiveError.BADPARAMS);
599 return;
600 }
601
602 VlanId innerVlan = ((VlanIdCriterion) innerVlanCriterion).vlanId();
603 Criterion innerVid = Criteria.matchVlanId(innerVlan);
604
605 // In the case where the C-tag is the same for all the subscribers,
606 // we add a metadata with the outport in the selector to make the flow unique
607 Criterion innerSelectorMeta = Criteria.matchMetadata(output.port().toLong());
608
609 if (innerVlan.toShort() == VlanId.ANY_VALUE) {
610 TrafficSelector outerSelector = buildSelector(inport, outerVlan, outerPbit, dstMac);
611 installDownstreamRulesForAnyVlan(fwd, output, outerSelector,
612 buildSelector(inport,
613 Criteria.matchVlanId(VlanId.ANY),
614 innerSelectorMeta));
615 } else {
616 // Required to differentiate the same match flows
617 // Please note that S tag and S p bit values will be same for the same service - so conflict flows!
618 // Metadata match criteria solves the conflict issue - but not used by the voltha
619 // Maybe - find a better way to solve the above problem
620 Criterion metadata = Criteria.matchMetadata(innerVlan.toShort());
621 TrafficSelector outerSelector = buildSelector(inport, metadata, outerVlan, outerPbit, dstMac);
622 installDownstreamRulesForVlans(fwd, output, outerSelector, buildSelector(inport, innerVid,
623 innerSelectorMeta));
624 }
625 }
626
627 private void installDownstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
628 TrafficSelector outerSelector, TrafficSelector innerSelector) {
629
630 List<Pair<Instruction, Instruction>> vlanOps =
631 vlanOps(fwd,
632 L2ModificationInstruction.L2SubType.VLAN_POP);
633
634 if (vlanOps == null || vlanOps.isEmpty()) {
635 return;
636 }
637
638 Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
639
640 TrafficTreatment innerTreatment;
641 VlanId setVlanId = ((L2ModificationInstruction.ModVlanIdInstruction) popAndRewrite.getRight()).vlanId();
642 if (VlanId.NONE.equals(setVlanId)) {
643 innerTreatment = (buildTreatment(popAndRewrite.getLeft(), fetchMeter(fwd),
644 writeMetadataIncludingOnlyTp(fwd), output));
645 } else {
646 innerTreatment = (buildTreatment(popAndRewrite.getRight(),
647 fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd), output));
648 }
649
650 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
651 fwd.treatment().allInstructions());
652
653 Instruction innerPbitSet = null;
654
655 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
656 innerPbitSet = setVlanPcps.get(0);
657 }
658
659 VlanId remarkInnerVlan = null;
660 Optional<Criterion> vlanIdCriterion = readFromSelector(innerSelector, Criterion.Type.VLAN_VID);
661 if (vlanIdCriterion.isPresent()) {
662 remarkInnerVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
663 }
664
665 Instruction modVlanId = null;
666 if (innerPbitSet != null) {
667 modVlanId = Instructions.modVlanId(remarkInnerVlan);
668 }
669
670 //match: in port (nni), s-tag
671 //action: pop vlan (s-tag), write metadata, go to table 1, meter
672 FlowRule.Builder outer = DefaultFlowRule.builder()
673 .fromApp(fwd.appId())
674 .forDevice(deviceId)
675 .makePermanent()
676 .withPriority(fwd.priority())
677 .withSelector(outerSelector)
678 .withTreatment(buildTreatment(popAndRewrite.getLeft(), modVlanId,
679 innerPbitSet, fetchMeter(fwd),
680 fetchWriteMetadata(fwd),
681 Instructions.transition(QQ_TABLE)));
682
683 //match: in port (nni), c-tag
684 //action: immediate: write metadata and pop, meter, output
685 FlowRule.Builder inner = DefaultFlowRule.builder()
686 .fromApp(fwd.appId())
687 .forDevice(deviceId)
688 .forTable(QQ_TABLE)
689 .makePermanent()
690 .withPriority(fwd.priority())
691 .withSelector(innerSelector)
692 .withTreatment(innerTreatment);
693 applyRules(fwd, inner, outer);
694 }
695
696 private void installDownstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
697 TrafficSelector outerSelector, TrafficSelector innerSelector) {
698
699 //match: in port (nni), s-tag
700 //action: immediate: write metadata, pop vlan, meter and go to table 1
701 FlowRule.Builder outer = DefaultFlowRule.builder()
702 .fromApp(fwd.appId())
703 .forDevice(deviceId)
704 .makePermanent()
705 .withPriority(fwd.priority())
706 .withSelector(outerSelector)
707 .withTreatment(buildTreatment(Instructions.popVlan(), fetchMeter(fwd),
708 fetchWriteMetadata(fwd), Instructions.transition(QQ_TABLE)));
709
710 //match: in port (nni) and s-tag
711 //action: immediate : write metadata, meter and output
712 FlowRule.Builder inner = DefaultFlowRule.builder()
713 .fromApp(fwd.appId())
714 .forDevice(deviceId)
715 .forTable(QQ_TABLE)
716 .makePermanent()
717 .withPriority(fwd.priority())
718 .withSelector(innerSelector)
719 .withTreatment(buildTreatment(fetchMeter(fwd),
720 writeMetadataIncludingOnlyTp(fwd), output));
721
722 applyRules(fwd, inner, outer);
723 }
724
725 private void installUpstreamRules(ForwardingObjective fwd) {
726 List<Pair<Instruction, Instruction>> vlanOps =
727 vlanOps(fwd,
728 L2ModificationInstruction.L2SubType.VLAN_PUSH);
729
730 if (vlanOps == null || vlanOps.isEmpty()) {
731 return;
732 }
733
734 Instruction output = fetchOutput(fwd, UPSTREAM);
735
736 if (output == null) {
737 return;
738 }
739
740 Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
741
742 boolean noneValueVlanStatus = checkNoneVlanCriteria(fwd);
743 boolean anyValueVlanStatus = checkAnyVlanMatchCriteria(fwd);
744
745 if (anyValueVlanStatus) {
746 installUpstreamRulesForAnyVlan(fwd, output, outerPair);
747 } else {
748 Pair<Instruction, Instruction> innerPair = outerPair;
749 outerPair = vlanOps.remove(0);
750 installUpstreamRulesForVlans(fwd, output, innerPair, outerPair, noneValueVlanStatus);
751 }
752 }
753
754 private void installUpstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
755 Pair<Instruction, Instruction> innerPair,
756 Pair<Instruction, Instruction> outerPair, Boolean noneValueVlanStatus) {
757
758 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
759 fwd.treatment().allInstructions());
760
761 Instruction innerPbitSet = null;
762 Instruction outerPbitSet = null;
763
764 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
765 innerPbitSet = setVlanPcps.get(0);
766 outerPbitSet = setVlanPcps.get(1);
767 }
768
769 TrafficTreatment innerTreatment;
770 if (noneValueVlanStatus) {
771 innerTreatment = buildTreatment(innerPair.getLeft(), innerPair.getRight(), fetchMeter(fwd),
772 fetchWriteMetadata(fwd), innerPbitSet,
773 Instructions.transition(QQ_TABLE));
774 } else {
775 innerTreatment = buildTreatment(innerPair.getRight(), fetchMeter(fwd), fetchWriteMetadata(fwd),
776 innerPbitSet, Instructions.transition(QQ_TABLE));
777 }
778
779 //match: in port, vlanId (0 or None)
780 //action:
781 //if vlanId None, push & set c-tag go to table 1
782 //if vlanId 0 or any specific vlan, set c-tag, write metadata, meter and go to table 1
783 FlowRule.Builder inner = DefaultFlowRule.builder()
784 .fromApp(fwd.appId())
785 .forDevice(deviceId)
786 .makePermanent()
787 .withPriority(fwd.priority())
788 .withSelector(fwd.selector())
789 .withTreatment(innerTreatment);
790
791 PortCriterion inPort = (PortCriterion)
792 fwd.selector().getCriterion(Criterion.Type.IN_PORT);
793
794 VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
795 innerPair.getRight()).vlanId();
796
797 //match: in port, c-tag
798 //action: immediate: push s-tag, write metadata, meter and output
799 FlowRule.Builder outer = DefaultFlowRule.builder()
800 .fromApp(fwd.appId())
801 .forDevice(deviceId)
802 .forTable(QQ_TABLE)
803 .makePermanent()
804 .withPriority(fwd.priority())
805 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
806 fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd),
807 outerPbitSet, output));
808
809 if (innerPbitSet != null) {
810 byte innerPbit = ((L2ModificationInstruction.ModVlanPcpInstruction)
811 innerPbitSet).vlanPcp();
812 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId), Criteria.matchVlanPcp(innerPbit)));
813 } else {
814 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId)));
815 }
816
817 applyRules(fwd, inner, outer);
818 }
819
820 private void installUpstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
821 Pair<Instruction, Instruction> outerPair) {
822
823 log.debug("Installing upstream rules for any value vlan");
824
825 //match: in port and any-vlan (coming from OLT app.)
826 //action: write metadata, go to table 1 and meter
827 FlowRule.Builder inner = DefaultFlowRule.builder()
828 .fromApp(fwd.appId())
829 .forDevice(deviceId)
830 .makePermanent()
831 .withPriority(fwd.priority())
832 .withSelector(fwd.selector())
833 .withTreatment(buildTreatment(Instructions.transition(QQ_TABLE), fetchMeter(fwd),
834 fetchWriteMetadata(fwd)));
835
836 //match: in port and any-vlan (coming from OLT app.)
837 //action: immediate: push:QinQ, vlanId (s-tag), write metadata, meter and output
838 FlowRule.Builder outer = DefaultFlowRule.builder()
839 .fromApp(fwd.appId())
840 .forDevice(deviceId)
841 .forTable(QQ_TABLE)
842 .makePermanent()
843 .withPriority(fwd.priority())
844 .withSelector(fwd.selector())
845 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
846 fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd), output));
847
848 applyRules(fwd, inner, outer);
849 }
850
851 private boolean checkNoneVlanCriteria(ForwardingObjective fwd) {
852 // Add the VLAN_PUSH treatment if we're matching on VlanId.NONE
853 Criterion vlanMatchCriterion = filterForCriterion(fwd.selector().criteria(), Criterion.Type.VLAN_VID);
854 boolean noneValueVlanStatus = false;
855 if (vlanMatchCriterion != null) {
856 noneValueVlanStatus = ((VlanIdCriterion) vlanMatchCriterion).vlanId().equals(VlanId.NONE);
857 }
858 return noneValueVlanStatus;
859 }
860
861 private boolean checkAnyVlanMatchCriteria(ForwardingObjective fwd) {
862 Criterion anyValueVlanCriterion = fwd.selector().criteria().stream()
863 .filter(c -> c.type().equals(Criterion.Type.VLAN_VID))
864 .filter(vc -> ((VlanIdCriterion) vc).vlanId().toShort() == VlanId.ANY_VALUE)
865 .findAny().orElse(null);
866
867 if (anyValueVlanCriterion == null) {
868 log.debug("Any value vlan match criteria is not found, criteria {}",
869 fwd.selector().criteria());
870 return false;
871 }
872
873 return true;
874 }
875
876 private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
877 Instruction output = fwd.treatment().allInstructions().stream()
878 .filter(i -> i.type() == Instruction.Type.OUTPUT)
879 .findFirst().orElse(null);
880
881 if (output == null) {
882 log.error("OLT {} rule has no output", direction);
883 fail(fwd, ObjectiveError.BADPARAMS);
884 return null;
885 }
886 return output;
887 }
888
889 private Instruction fetchMeter(ForwardingObjective fwd) {
890 Instruction meter = fwd.treatment().metered();
891
892 if (meter == null) {
893 log.debug("Meter instruction is not found for the forwarding objective {}", fwd);
894 return null;
895 }
896
897 log.debug("Meter instruction is found.");
898 return meter;
899 }
900
901 private Instructions.MetadataInstruction fetchWriteMetadata(ForwardingObjective fwd) {
902 Instructions.MetadataInstruction writeMetadata = fwd.treatment().writeMetadata();
903
904 if (writeMetadata == null) {
905 log.warn("Write metadata is not found for the forwarding obj");
906 fail(fwd, ObjectiveError.BADPARAMS);
907 return null;
908 }
909
910 log.debug("Write metadata is found {}", writeMetadata);
911 return writeMetadata;
912 }
913
914 private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
915 L2ModificationInstruction.L2SubType type) {
916
917 List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
918 fwd.treatment().allInstructions(), type);
919
920 if (vlanOps == null || vlanOps.isEmpty()) {
921 String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
922 ? DOWNSTREAM : UPSTREAM;
923 log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
924 fail(fwd, ObjectiveError.BADPARAMS);
925 return ImmutableList.of();
926 }
927 return vlanOps;
928 }
929
930
931 private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
932 L2ModificationInstruction.L2SubType type) {
933
934 List<Instruction> vlanOperations = findL2Instructions(
935 type,
936 instructions);
937 List<Instruction> vlanSets = findL2Instructions(
938 L2ModificationInstruction.L2SubType.VLAN_ID,
939 instructions);
940
941 if (vlanOperations.size() != vlanSets.size()) {
942 return ImmutableList.of();
943 }
944
945 List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
946
947 for (int i = 0; i < vlanOperations.size(); i++) {
948 pairs.add(new ImmutablePair<>(vlanOperations.get(i), vlanSets.get(i)));
949 }
950 return pairs;
951 }
952
953 private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
954 List<Instruction> actions) {
955 return actions.stream()
956 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
957 .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
958 .collect(Collectors.toList());
959 }
960
961 private void provisionEthTypeBasedFilter(FilteringObjective filter,
962 EthTypeCriterion ethType,
963 Instructions.OutputInstruction output,
964 L2ModificationInstruction vlanId,
965 L2ModificationInstruction vlanPush) {
966
967 Instruction meter = filter.meta().metered();
968 Instruction writeMetadata = filter.meta().writeMetadata();
969
970 TrafficSelector selector = buildSelector(filter.key(), ethType);
971 TrafficTreatment treatment;
972
973 if (vlanPush == null || vlanId == null) {
974 treatment = buildTreatment(output, meter, writeMetadata);
975 } else {
976 // we need to push the vlan because it came untagged (ATT)
977 treatment = buildTreatment(output, meter, vlanPush, vlanId, writeMetadata);
978 }
979
980 buildAndApplyRule(filter, selector, treatment);
981
982 }
983
984 private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
985 IPProtocolCriterion ipProto,
986 Instructions.OutputInstruction output,
987 Instruction vlan, Instruction pcp) {
988
989 Instruction meter = filter.meta().metered();
990 Instruction writeMetadata = filter.meta().writeMetadata();
991
992 // uniTagMatch
993 VlanIdCriterion vlanId = (VlanIdCriterion) filterForCriterion(filter.conditions(),
994 Criterion.Type.VLAN_VID);
995
996 TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto, vlanId);
997 TrafficTreatment treatment = buildTreatment(output, vlan, pcp, meter, writeMetadata);
998 buildAndApplyRule(filter, selector, treatment);
999 }
1000
1001 private void provisionDhcp(FilteringObjective filter, EthTypeCriterion ethType,
1002 IPProtocolCriterion ipProto,
1003 UdpPortCriterion udpSrcPort,
1004 UdpPortCriterion udpDstPort,
1005 Instruction vlanIdInstruction,
1006 Instruction vlanPcpInstruction,
1007 Instructions.OutputInstruction output) {
1008
1009 Instruction meter = filter.meta().metered();
1010 Instruction writeMetadata = filter.meta().writeMetadata();
1011
1012 VlanIdCriterion matchVlanId = (VlanIdCriterion)
1013 filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
1014
1015 TrafficSelector selector;
1016 TrafficTreatment treatment;
1017
1018 if (matchVlanId != null) {
1019 log.debug("Building selector with match VLAN, {}", matchVlanId);
1020 // in case of TT upstream the packet comes tagged and the vlan is swapped.
1021 selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort,
1022 udpDstPort, matchVlanId);
1023 treatment = buildTreatment(output, meter, writeMetadata,
1024 vlanIdInstruction, vlanPcpInstruction);
1025 } else {
1026 log.debug("Building selector with no VLAN");
1027 // in case of ATT upstream the packet comes in untagged and we need to push the vlan
1028 selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort, udpDstPort);
1029 treatment = buildTreatment(output, meter, vlanIdInstruction, writeMetadata);
1030 }
1031 //In case of downstream there will be no match on the VLAN, which is null,
1032 // so it will just be output, meter, writeMetadata
1033
1034 buildAndApplyRule(filter, selector, treatment);
1035 }
1036
1037 private void provisionPPPoED(FilteringObjective filter, EthTypeCriterion ethType,
1038 Instruction vlanIdInstruction,
1039 Instruction vlanPcpInstruction,
1040 Instructions.OutputInstruction output) {
1041 Instruction meter = filter.meta().metered();
1042 Instruction writeMetadata = filter.meta().writeMetadata();
1043
1044 VlanIdCriterion matchVlanId = (VlanIdCriterion)
1045 filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
1046
1047 TrafficSelector selector;
1048 TrafficTreatment treatment;
1049
1050 if (matchVlanId != null) {
1051 log.debug("Building pppoed selector with match VLAN {}.", matchVlanId);
1052 } else {
1053 log.debug("Building pppoed selector without match VLAN.");
1054 }
1055
1056 selector = buildSelector(filter.key(), ethType, matchVlanId);
1057 treatment = buildTreatment(output, meter, writeMetadata, vlanIdInstruction, vlanPcpInstruction);
1058 buildAndApplyRule(filter, selector, treatment);
1059 }
1060
1061 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
1062 TrafficTreatment treatment) {
1063 FlowRule rule = DefaultFlowRule.builder()
1064 .fromApp(filter.appId())
1065 .forDevice(deviceId)
1066 .forTable(0)
1067 .makePermanent()
1068 .withSelector(selector)
1069 .withTreatment(treatment)
1070 .withPriority(filter.priority())
1071 .build();
1072
1073 if (accumulator != null) {
1074 if (log.isDebugEnabled()) {
1075 log.debug("Adding pair to batch: {}", Pair.of(filter, rule));
1076 }
1077 accumulator.add(Pair.of(filter, rule));
1078 } else {
1079 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
1080 switch (filter.type()) {
1081 case PERMIT:
1082 opsBuilder.add(rule);
1083 break;
1084 case DENY:
1085 opsBuilder.remove(rule);
1086 break;
1087 default:
1088 log.warn("Unknown filter type : {}", filter.type());
1089 fail(filter, ObjectiveError.UNSUPPORTED);
1090 }
1091 applyFlowRules(ImmutableList.of(filter), opsBuilder);
1092 }
1093 }
1094
1095 private void applyRules(ForwardingObjective fwd, FlowRule.Builder... fwdBuilders) {
1096 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
1097 switch (fwd.op()) {
1098 case ADD:
1099 for (FlowRule.Builder fwdBuilder : fwdBuilders) {
1100 builder.add(fwdBuilder.build());
1101 }
1102 break;
1103 case REMOVE:
1104 for (FlowRule.Builder fwdBuilder : fwdBuilders) {
1105 builder.remove(fwdBuilder.build());
1106 }
1107 break;
1108 case ADD_TO_EXISTING:
1109 break;
1110 case REMOVE_FROM_EXISTING:
1111 break;
1112 default:
1113 log.warn("Unknown forwarding operation: {}", fwd.op());
1114 }
1115
1116 applyFlowRules(ImmutableList.of(fwd), builder);
1117
1118
1119 }
1120
1121 private void applyFlowRules(List<Objective> objectives, FlowRuleOperations.Builder builder) {
1122 flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
1123 @Override
1124 public void onSuccess(FlowRuleOperations ops) {
1125 objectives.forEach(obj -> {
1126 pass(obj);
1127 });
1128 }
1129
1130 @Override
1131 public void onError(FlowRuleOperations ops) {
1132 objectives.forEach(obj -> {
1133 fail(obj, ObjectiveError.FLOWINSTALLATIONFAILED);
1134 });
1135
1136 }
1137 }));
1138 }
1139
1140 // Builds the batch using the accumulated flow rules
1141 private void sendFilters(List<Pair<FilteringObjective, FlowRule>> pairs) {
1142 FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
1143 log.debug("Sending batch of {} filter-objs", pairs.size());
1144 List<Objective> filterObjs = Lists.newArrayList();
1145 // Iterates over all accumulated flow rules and then build an unique batch
1146 pairs.forEach(pair -> {
1147 FilteringObjective filter = pair.getLeft();
1148 FlowRule rule = pair.getRight();
1149 switch (filter.type()) {
1150 case PERMIT:
1151 flowOpsBuilder.add(rule);
1152 log.debug("Applying add filter-obj {} to device: {}", filter.id(), deviceId);
1153 filterObjs.add(filter);
1154 break;
1155 case DENY:
1156 flowOpsBuilder.remove(rule);
1157 log.debug("Deleting flow rule {} to device: {}", rule, deviceId);
1158 filterObjs.add(filter);
1159 break;
1160 default:
1161 fail(filter, ObjectiveError.UNKNOWN);
1162 log.warn("Unknown forwarding type {}", filter.type());
1163 }
1164 });
1165 if (log.isDebugEnabled()) {
1166 log.debug("Applying batch {}", flowOpsBuilder.build());
1167 }
1168 // Finally applies the operations
1169 applyFlowRules(filterObjs, flowOpsBuilder);
1170 }
1171
1172 private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
1173 return criteria.stream()
1174 .filter(c -> c.type().equals(type))
1175 .limit(1)
1176 .findFirst().orElse(null);
1177 }
1178
1179 private TrafficSelector buildSelector(Criterion... criteria) {
1180
1181 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
1182
1183 Arrays.stream(criteria).filter(Objects::nonNull).forEach(sBuilder::add);
1184
1185 return sBuilder.build();
1186 }
1187
1188 private TrafficTreatment buildTreatment(Instruction... instructions) {
1189
1190 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
1191
1192 Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add);
1193
1194 return tBuilder.build();
1195 }
1196
1197 private Instruction writeMetadataIncludingOnlyTp(ForwardingObjective fwd) {
1198
1199 return Instructions.writeMetadata(
1200 fetchWriteMetadata(fwd).metadata() & 0xFFFF00000000L, 0L);
1201 }
1202
1203 private void fail(Objective obj, ObjectiveError error) {
1204 obj.context().ifPresent(context -> context.onError(obj, error));
1205 }
1206
1207 private void pass(Objective obj) {
1208 obj.context().ifPresent(context -> context.onSuccess(obj));
1209 }
1210
1211
1212 private class InnerGroupListener implements GroupListener {
1213 @Override
1214 public void event(GroupEvent event) {
1215 GroupKey key = event.subject().appCookie();
1216 NextObjective obj = pendingGroups.getIfPresent(key);
1217 if (obj == null) {
Andrea Campanella438e1ad2021-03-26 11:41:16 +01001218 if (log.isTraceEnabled()) {
1219 log.trace("No pending group for {}, moving on", key);
1220 }
Andrea Campanella37f07e42021-02-16 11:24:39 +01001221 return;
1222 }
Andrea Campanella438e1ad2021-03-26 11:41:16 +01001223 log.debug("Event {} for group {}, handling pending" +
Andrea Campanella37f07e42021-02-16 11:24:39 +01001224 "NextGroup {}", event.type(), key, obj.id());
1225 if (event.type() == GroupEvent.Type.GROUP_ADDED ||
1226 event.type() == GroupEvent.Type.GROUP_UPDATED) {
1227 flowObjectiveStore.putNextGroup(obj.id(), new OltPipelineGroup(key));
Andrea Campanella37f07e42021-02-16 11:24:39 +01001228 pendingGroups.invalidate(key);
Esin Karamand106e522021-03-15 14:08:48 +00001229 pass(obj);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001230 } else if (event.type() == GroupEvent.Type.GROUP_REMOVED) {
1231 flowObjectiveStore.removeNextGroup(obj.id());
Andrea Campanella37f07e42021-02-16 11:24:39 +01001232 pendingGroups.invalidate(key);
Esin Karamand106e522021-03-15 14:08:48 +00001233 pass(obj);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001234 }
1235 }
1236 }
1237
1238 private static class OltPipelineGroup implements NextGroup {
1239
1240 private final GroupKey key;
1241
1242 public OltPipelineGroup(GroupKey key) {
1243 this.key = key;
1244 }
1245
1246 public GroupKey key() {
1247 return key;
1248 }
1249
1250 @Override
1251 public byte[] data() {
1252 return appKryo.serialize(key);
1253 }
1254
1255 }
1256
1257 @Override
1258 public List<String> getNextMappings(NextGroup nextGroup) {
1259 // TODO Implementation deferred to vendor
1260 return null;
1261 }
1262
1263 // Flow rules accumulator for reducing the number of transactions required to the devices.
1264 private final class ObjectiveAccumulator
1265 extends AbstractAccumulator<Pair<FilteringObjective, FlowRule>> {
1266
1267 ObjectiveAccumulator(int maxFilter, int maxBatchMS, int maxIdleMS) {
1268 super(TIMER, maxFilter, maxBatchMS, maxIdleMS);
1269 }
1270
1271 @Override
1272 public void processItems(List<Pair<FilteringObjective, FlowRule>> pairs) {
1273 // Triggers creation of a batch using the list of flowrules generated from objs.
1274 accumulatorExecutorService.execute(new FlowRulesBuilderTask(pairs));
1275 }
1276 }
1277
1278 // Task for building batch of flow rules in a separate thread.
1279 private final class FlowRulesBuilderTask implements Runnable {
1280 private final List<Pair<FilteringObjective, FlowRule>> pairs;
1281
1282 FlowRulesBuilderTask(List<Pair<FilteringObjective, FlowRule>> pairs) {
1283 this.pairs = pairs;
1284 }
1285
1286 @Override
1287 public void run() {
1288 try {
1289 sendFilters(pairs);
1290 } catch (Exception e) {
1291 log.warn("Unable to send objectives", e);
1292 }
1293 }
1294 }
1295}