blob: 84a56cad581750126d00df3636bf9a61167775a6 [file] [log] [blame]
Andrea Campanella37f07e42021-02-16 11:24:39 +01001/*
2 * Copyright 2016-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.olt.driver;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalCause;
21import com.google.common.cache.RemovalNotification;
22import com.google.common.collect.ImmutableList;
23import com.google.common.collect.Lists;
24import org.apache.commons.lang3.tuple.ImmutablePair;
25import org.apache.commons.lang3.tuple.Pair;
26import org.onlab.osgi.ServiceDirectory;
27import org.onlab.packet.EthType;
28import org.onlab.packet.IPv4;
29import org.onlab.packet.IPv6;
30import org.onlab.packet.IpPrefix;
31import org.onlab.packet.VlanId;
32import org.onlab.util.AbstractAccumulator;
33import org.onlab.util.Accumulator;
34import org.onlab.util.KryoNamespace;
35import org.onosproject.core.ApplicationId;
36import org.onosproject.core.CoreService;
37import org.onosproject.net.DeviceId;
38import org.onosproject.net.PortNumber;
39import org.onosproject.net.behaviour.NextGroup;
40import org.onosproject.net.behaviour.Pipeliner;
41import org.onosproject.net.behaviour.PipelinerContext;
42import org.onosproject.net.driver.AbstractHandlerBehaviour;
43import org.onosproject.net.driver.Driver;
44import org.onosproject.net.flow.DefaultFlowRule;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.FlowRule;
48import org.onosproject.net.flow.FlowRuleOperations;
49import org.onosproject.net.flow.FlowRuleOperationsContext;
50import org.onosproject.net.flow.FlowRuleService;
51import org.onosproject.net.flow.TrafficSelector;
52import org.onosproject.net.flow.TrafficTreatment;
53import org.onosproject.net.flow.criteria.Criteria;
54import org.onosproject.net.flow.criteria.Criterion;
55import org.onosproject.net.flow.criteria.EthTypeCriterion;
56import org.onosproject.net.flow.criteria.IPCriterion;
57import org.onosproject.net.flow.criteria.IPProtocolCriterion;
58import org.onosproject.net.flow.criteria.PortCriterion;
59import org.onosproject.net.flow.criteria.UdpPortCriterion;
60import org.onosproject.net.flow.criteria.VlanIdCriterion;
61import org.onosproject.net.flow.instructions.Instruction;
62import org.onosproject.net.flow.instructions.Instructions;
63import org.onosproject.net.flow.instructions.L2ModificationInstruction;
64import org.onosproject.net.flowobjective.FilteringObjective;
65import org.onosproject.net.flowobjective.FlowObjectiveStore;
66import org.onosproject.net.flowobjective.ForwardingObjective;
67import org.onosproject.net.flowobjective.NextObjective;
68import org.onosproject.net.flowobjective.Objective;
69import org.onosproject.net.flowobjective.ObjectiveError;
70import org.onosproject.net.group.DefaultGroupBucket;
71import org.onosproject.net.group.DefaultGroupDescription;
72import org.onosproject.net.group.DefaultGroupKey;
73import org.onosproject.net.group.Group;
74import org.onosproject.net.group.GroupBucket;
75import org.onosproject.net.group.GroupBuckets;
76import org.onosproject.net.group.GroupDescription;
77import org.onosproject.net.group.GroupEvent;
78import org.onosproject.net.group.GroupKey;
79import org.onosproject.net.group.GroupListener;
80import org.onosproject.net.group.GroupService;
81import org.onosproject.store.serializers.KryoNamespaces;
82import org.onosproject.store.service.StorageService;
83import org.slf4j.Logger;
84
85import java.util.Arrays;
86import java.util.Collection;
87import java.util.Collections;
88import java.util.List;
89import java.util.Objects;
90import java.util.Optional;
91import java.util.concurrent.ScheduledExecutorService;
92import java.util.Timer;
93import java.util.concurrent.TimeUnit;
94import java.util.stream.Collectors;
95
96import static org.onosproject.core.CoreService.CORE_APP_NAME;
97import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
98import static org.onlab.util.Tools.groupedThreads;
99import static org.slf4j.LoggerFactory.getLogger;
100
101/**
102 * Pipeliner for OLT device.
103 */
104
105public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
106
107 private static final Integer QQ_TABLE = 1;
108 private static final int NO_ACTION_PRIORITY = 500;
109 private static final String DOWNSTREAM = "downstream";
110 private static final String UPSTREAM = "upstream";
111 private final Logger log = getLogger(getClass());
112
113 private ServiceDirectory serviceDirectory;
114 private FlowRuleService flowRuleService;
115 private GroupService groupService;
116 private CoreService coreService;
117 private StorageService storageService;
118
119 private DeviceId deviceId;
120 private ApplicationId appId;
121
122
123 protected FlowObjectiveStore flowObjectiveStore;
124
125 private Cache<GroupKey, NextObjective> pendingGroups;
126
127 protected static KryoNamespace appKryo = new KryoNamespace.Builder()
128 .register(KryoNamespaces.API)
129 .register(GroupKey.class)
130 .register(DefaultGroupKey.class)
131 .register(OltPipelineGroup.class)
132 .build("OltPipeline");
133
134 private static final Timer TIMER = new Timer("filterobj-batching");
135 private Accumulator<Pair<FilteringObjective, FlowRule>> accumulator;
136
137 // accumulator executor service
138 private ScheduledExecutorService accumulatorExecutorService
139 = newSingleThreadScheduledExecutor(groupedThreads("OltPipeliner", "acc-%d", log));
140
141 @Override
142 public void init(DeviceId deviceId, PipelinerContext context) {
143 log.debug("Initiate OLT pipeline");
144 this.serviceDirectory = context.directory();
145 this.deviceId = deviceId;
146
147 flowRuleService = serviceDirectory.get(FlowRuleService.class);
148 coreService = serviceDirectory.get(CoreService.class);
149 groupService = serviceDirectory.get(GroupService.class);
150 flowObjectiveStore = context.store();
151 storageService = serviceDirectory.get(StorageService.class);
152
153 appId = coreService.registerApplication(
154 "org.onosproject.driver.OLTPipeline");
155
156 // Init the accumulator, if enabled
157 if (isAccumulatorEnabled()) {
158 log.debug("Building accumulator with maxObjs {}, batchMs {}, idleMs {}",
159 context.accumulatorMaxObjectives(), context.accumulatorMaxBatchMillis(),
160 context.accumulatorMaxIdleMillis());
161 accumulator = new ObjectiveAccumulator(context.accumulatorMaxObjectives(),
162 context.accumulatorMaxBatchMillis(),
163 context.accumulatorMaxIdleMillis());
164 }
165
166
167 pendingGroups = CacheBuilder.newBuilder()
168 .expireAfterWrite(20, TimeUnit.SECONDS)
169 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
170 if (notification.getCause() == RemovalCause.EXPIRED) {
171 fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
172 }
173 }).build();
174
175 groupService.addListener(new InnerGroupListener());
176
177 }
178
179 public boolean isAccumulatorEnabled() {
180 Driver driver = super.data().driver();
181 // we cannot determine the property
182 if (driver == null) {
183 return false;
184 }
185 return Boolean.parseBoolean(driver.getProperty(ACCUMULATOR_ENABLED));
186 }
187
188 @Override
189 public void filter(FilteringObjective filter) {
190 Instructions.OutputInstruction output;
191
192 if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
193 output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
194 .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
195 .limit(1)
196 .findFirst().get();
197
198 if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
199 log.warn("OLT can only filter packet to controller");
200 fail(filter, ObjectiveError.UNSUPPORTED);
201 return;
202 }
203 } else {
204 fail(filter, ObjectiveError.BADPARAMS);
205 return;
206 }
207
208 if (filter.key().type() != Criterion.Type.IN_PORT) {
209 fail(filter, ObjectiveError.BADPARAMS);
210 return;
211 }
212
213 EthTypeCriterion ethType = (EthTypeCriterion)
214 filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
215
216 if (ethType == null) {
217 fail(filter, ObjectiveError.BADPARAMS);
218 return;
219 }
220 Optional<Instruction> vlanId = filter.meta().immediate().stream()
221 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
222 && ((L2ModificationInstruction) t).subtype()
223 .equals(L2ModificationInstruction.L2SubType.VLAN_ID))
224 .limit(1)
225 .findFirst();
226
227 Optional<Instruction> vlanPcp = filter.meta().immediate().stream()
228 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
229 && ((L2ModificationInstruction) t).subtype()
230 .equals(L2ModificationInstruction.L2SubType.VLAN_PCP))
231 .limit(1)
232 .findFirst();
233
234 Optional<Instruction> vlanPush = filter.meta().immediate().stream()
235 .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
236 && ((L2ModificationInstruction) t).subtype()
237 .equals(L2ModificationInstruction.L2SubType.VLAN_PUSH))
238 .limit(1)
239 .findFirst();
240
241 if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
242
243 if (vlanId.isEmpty() || vlanPush.isEmpty()) {
244 log.warn("Missing EAPOL vlan or vlanPush");
245 fail(filter, ObjectiveError.BADPARAMS);
246 return;
247 }
248 provisionEthTypeBasedFilter(filter, ethType, output,
249 (L2ModificationInstruction) vlanId.get(),
250 (L2ModificationInstruction) vlanPush.get());
251 } else if (ethType.ethType().equals(EthType.EtherType.PPPoED.ethType())) {
252 provisionPPPoED(filter, ethType, vlanId.orElse(null), vlanPcp.orElse(null), output);
253 } else if (ethType.ethType().equals(EthType.EtherType.LLDP.ethType())) {
254 provisionEthTypeBasedFilter(filter, ethType, output, null, null);
255 } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
256 IPProtocolCriterion ipProto = (IPProtocolCriterion)
257 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
258 if (ipProto == null) {
259 log.warn("OLT can only filter IGMP and DHCP");
260 fail(filter, ObjectiveError.UNSUPPORTED);
261 return;
262 }
263 if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
264 provisionIgmp(filter, ethType, ipProto, output,
265 vlanId.orElse(null),
266 vlanPcp.orElse(null));
267 } else if (ipProto.protocol() == IPv4.PROTOCOL_UDP) {
268 UdpPortCriterion udpSrcPort = (UdpPortCriterion)
269 filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
270
271 UdpPortCriterion udpDstPort = (UdpPortCriterion)
272 filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
273
274 if ((udpSrcPort.udpPort().toInt() == 67 && udpDstPort.udpPort().toInt() == 68) ||
275 (udpSrcPort.udpPort().toInt() == 68 && udpDstPort.udpPort().toInt() == 67)) {
276 provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
277 vlanPcp.orElse(null), output);
278 } else {
279 log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
280 fail(filter, ObjectiveError.UNSUPPORTED);
281 }
282 } else {
283 log.warn("Currently supporting only IGMP and DHCP filters for IPv4 packets");
284 fail(filter, ObjectiveError.UNSUPPORTED);
285 }
286 } else if (ethType.ethType().equals(EthType.EtherType.IPV6.ethType())) {
287 IPProtocolCriterion ipProto = (IPProtocolCriterion)
288 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
289 if (ipProto == null) {
290 log.warn("OLT can only filter DHCP");
291 fail(filter, ObjectiveError.UNSUPPORTED);
292 return;
293 }
294 if (ipProto.protocol() == IPv6.PROTOCOL_UDP) {
295 UdpPortCriterion udpSrcPort = (UdpPortCriterion)
296 filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
297
298 UdpPortCriterion udpDstPort = (UdpPortCriterion)
299 filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
300
301 if ((udpSrcPort.udpPort().toInt() == 546 && udpDstPort.udpPort().toInt() == 547) ||
302 (udpSrcPort.udpPort().toInt() == 547 && udpDstPort.udpPort().toInt() == 546)) {
303 provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
304 vlanPcp.orElse(null), output);
305 } else {
306 log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
307 fail(filter, ObjectiveError.UNSUPPORTED);
308 }
309 } else {
310 log.warn("Currently supporting only DHCP filters for IPv6 packets");
311 fail(filter, ObjectiveError.UNSUPPORTED);
312 }
313 } else {
314 log.warn("\nOnly the following are Supported in OLT for filter ->\n"
315 + "ETH TYPE : EAPOL, LLDP and IPV4\n"
316 + "IPV4 TYPE: IGMP and UDP (for DHCP)"
317 + "IPV6 TYPE: UDP (for DHCP)");
318 fail(filter, ObjectiveError.UNSUPPORTED);
319 }
320
321 }
322
323
324 @Override
325 public void forward(ForwardingObjective fwd) {
326 log.debug("Installing forwarding objective {}", fwd);
327 if (checkForMulticast(fwd)) {
328 processMulticastRule(fwd);
329 return;
330 }
331
332 TrafficTreatment treatment = fwd.treatment();
333
334 List<Instruction> instructions = treatment.allInstructions();
335
336 Optional<Instruction> vlanInstruction = instructions.stream()
337 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
338 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
339 L2ModificationInstruction.L2SubType.VLAN_PUSH ||
340 ((L2ModificationInstruction) i).subtype() ==
341 L2ModificationInstruction.L2SubType.VLAN_POP)
342 .findAny();
343
344
345 if (!vlanInstruction.isPresent()) {
346 installNoModificationRules(fwd);
347 } else {
348 L2ModificationInstruction vlanIns =
349 (L2ModificationInstruction) vlanInstruction.get();
350 if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
351 installUpstreamRules(fwd);
352 } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
353 installDownstreamRules(fwd);
354 } else {
355 log.error("Unknown OLT operation: {}", fwd);
356 fail(fwd, ObjectiveError.UNSUPPORTED);
357 return;
358 }
359 }
360
361 pass(fwd);
362
363 }
364
365
366 @Override
367 public void next(NextObjective nextObjective) {
368 if (nextObjective.type() != NextObjective.Type.BROADCAST) {
369 log.error("OLT only supports broadcast groups.");
370 fail(nextObjective, ObjectiveError.BADPARAMS);
371 return;
372 }
373
374 if (nextObjective.next().size() != 1 && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
375 log.error("OLT only supports singleton broadcast groups.");
376 fail(nextObjective, ObjectiveError.BADPARAMS);
377 return;
378 }
379
380 Optional<TrafficTreatment> treatmentOpt = nextObjective.next().stream().findFirst();
381 if (treatmentOpt.isEmpty() && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
382 log.error("Next objective {} does not have a treatment", nextObjective);
383 fail(nextObjective, ObjectiveError.BADPARAMS);
384 return;
385 }
386
387 GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
388
389 pendingGroups.put(key, nextObjective);
390 log.trace("NextObjective Operation {}", nextObjective.op());
391 switch (nextObjective.op()) {
392 case ADD:
393 GroupDescription groupDesc =
394 new DefaultGroupDescription(deviceId,
395 GroupDescription.Type.ALL,
396 new GroupBuckets(
397 Collections.singletonList(
398 buildBucket(treatmentOpt.get()))),
399 key,
400 null,
401 nextObjective.appId());
402 groupService.addGroup(groupDesc);
403 break;
404 case REMOVE:
405 groupService.removeGroup(deviceId, key, nextObjective.appId());
406 break;
407 case ADD_TO_EXISTING:
408 groupService.addBucketsToGroup(deviceId, key,
409 new GroupBuckets(
410 Collections.singletonList(
411 buildBucket(treatmentOpt.get()))),
412 key, nextObjective.appId());
413 break;
414 case REMOVE_FROM_EXISTING:
415 groupService.removeBucketsFromGroup(deviceId, key,
416 new GroupBuckets(
417 Collections.singletonList(
418 buildBucket(treatmentOpt.get()))),
419 key, nextObjective.appId());
420 break;
421 default:
422 log.warn("Unknown next objective operation: {}", nextObjective.op());
423 }
424
425
426 }
427
428 private GroupBucket buildBucket(TrafficTreatment treatment) {
429 return DefaultGroupBucket.createAllGroupBucket(treatment);
430 }
431
432 private void processMulticastRule(ForwardingObjective fwd) {
433 if (fwd.nextId() == null) {
434 log.error("Multicast objective does not have a next id");
435 fail(fwd, ObjectiveError.BADPARAMS);
436 }
437
438 GroupKey key = getGroupForNextObjective(fwd.nextId());
439
440 if (key == null) {
441 log.error("Group for forwarding objective missing: {}", fwd);
442 fail(fwd, ObjectiveError.GROUPMISSING);
443 }
444
445 Group group = groupService.getGroup(deviceId, key);
446 TrafficTreatment treatment =
447 buildTreatment(Instructions.createGroup(group.id()));
448
449 TrafficSelector.Builder selectorBuilder = buildIpv4SelectorForMulticast(fwd);
450
451 FlowRule rule = DefaultFlowRule.builder()
452 .fromApp(fwd.appId())
453 .forDevice(deviceId)
454 .forTable(0)
455 .makePermanent()
456 .withPriority(fwd.priority())
457 .withSelector(selectorBuilder.build())
458 .withTreatment(treatment)
459 .build();
460
461 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
462 switch (fwd.op()) {
463
464 case ADD:
465 builder.add(rule);
466 break;
467 case REMOVE:
468 builder.remove(rule);
469 break;
470 case ADD_TO_EXISTING:
471 case REMOVE_FROM_EXISTING:
472 break;
473 default:
474 log.warn("Unknown forwarding operation: {}", fwd.op());
475 }
476
477 applyFlowRules(ImmutableList.of(fwd), builder);
478
479
480 }
481
482 private TrafficSelector.Builder buildIpv4SelectorForMulticast(ForwardingObjective fwd) {
483 TrafficSelector.Builder builderToUpdate = DefaultTrafficSelector.builder();
484
485 Optional<Criterion> vlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.VLAN_VID);
486 if (vlanIdCriterion.isPresent()) {
487 VlanId assignedVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
488 builderToUpdate.matchVlanId(assignedVlan);
489 }
490
491 Optional<Criterion> innerVlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.INNER_VLAN_VID);
492 if (innerVlanIdCriterion.isPresent()) {
493 VlanId assignedInnerVlan = ((VlanIdCriterion) innerVlanIdCriterion.get()).vlanId();
494 builderToUpdate.matchMetadata(assignedInnerVlan.toShort());
495 }
496
497 Optional<Criterion> ethTypeCriterion = readFromSelector(fwd.selector(), Criterion.Type.ETH_TYPE);
498 if (ethTypeCriterion.isPresent()) {
499 EthType ethType = ((EthTypeCriterion) ethTypeCriterion.get()).ethType();
500 builderToUpdate.matchEthType(ethType.toShort());
501 }
502
503 Optional<Criterion> ipv4DstCriterion = readFromSelector(fwd.selector(), Criterion.Type.IPV4_DST);
504 if (ipv4DstCriterion.isPresent()) {
505 IpPrefix ipv4Dst = ((IPCriterion) ipv4DstCriterion.get()).ip();
506 builderToUpdate.matchIPDst(ipv4Dst);
507 }
508
509 return builderToUpdate;
510 }
511
512 static Optional<Criterion> readFromSelector(TrafficSelector selector, Criterion.Type type) {
513 if (selector == null) {
514 return Optional.empty();
515 }
516 Criterion criterion = selector.getCriterion(type);
517 return (criterion == null)
518 ? Optional.empty() : Optional.of(criterion);
519 }
520
521 private boolean checkForMulticast(ForwardingObjective fwd) {
522
523 IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
524 Criterion.Type.IPV4_DST);
525
526 if (ip == null) {
527 return false;
528 }
529
530 return ip.ip().isMulticast();
531
532 }
533
534 private GroupKey getGroupForNextObjective(Integer nextId) {
535 NextGroup next = flowObjectiveStore.getNextGroup(nextId);
536 return appKryo.deserialize(next.data());
537
538 }
539
540 private void installNoModificationRules(ForwardingObjective fwd) {
541 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
542 Instructions.MetadataInstruction writeMetadata = fetchWriteMetadata(fwd);
543 Instructions.MeterInstruction meter = (Instructions.MeterInstruction) fetchMeter(fwd);
544
545 TrafficSelector selector = fwd.selector();
546
547 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
548 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
549 Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
550
551 if (inport == null || output == null || innerVlan == null || outerVlan == null) {
552 // Avoid logging a non-error from lldp, bbdp and eapol core flows.
553 if (!fwd.appId().name().equals(CORE_APP_NAME)) {
554 log.error("Forwarding objective is underspecified: {}", fwd);
555 } else {
556 log.debug("Not installing unsupported core generated flow {}", fwd);
557 }
558 fail(fwd, ObjectiveError.BADPARAMS);
559 return;
560 }
561
562
563 FlowRule.Builder outer = DefaultFlowRule.builder()
564 .fromApp(fwd.appId())
565 .forDevice(deviceId)
566 .makePermanent()
567 .withPriority(fwd.priority())
568 .withSelector(buildSelector(inport, outerVlan))
569 .withTreatment(buildTreatment(output, writeMetadata, meter));
570
571 applyRules(fwd, outer);
572 }
573
574 private void installDownstreamRules(ForwardingObjective fwd) {
575 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
576
577 if (output == null) {
578 return;
579 }
580
581 TrafficSelector selector = fwd.selector();
582
583 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
584 Criterion outerPbit = selector.getCriterion(Criterion.Type.VLAN_PCP);
585 Criterion innerVlanCriterion = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
586 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
587 Criterion dstMac = selector.getCriterion(Criterion.Type.ETH_DST);
588
589 if (outerVlan == null || innerVlanCriterion == null || inport == null) {
590 // Avoid logging a non-error from lldp, bbdp and eapol core flows.
591 if (!fwd.appId().name().equals(CORE_APP_NAME)) {
592 log.error("Forwarding objective is underspecified: {}", fwd);
593 } else {
594 log.debug("Not installing unsupported core generated flow {}", fwd);
595 }
596 fail(fwd, ObjectiveError.BADPARAMS);
597 return;
598 }
599
600 VlanId innerVlan = ((VlanIdCriterion) innerVlanCriterion).vlanId();
601 Criterion innerVid = Criteria.matchVlanId(innerVlan);
602
603 // In the case where the C-tag is the same for all the subscribers,
604 // we add a metadata with the outport in the selector to make the flow unique
605 Criterion innerSelectorMeta = Criteria.matchMetadata(output.port().toLong());
606
607 if (innerVlan.toShort() == VlanId.ANY_VALUE) {
608 TrafficSelector outerSelector = buildSelector(inport, outerVlan, outerPbit, dstMac);
609 installDownstreamRulesForAnyVlan(fwd, output, outerSelector,
610 buildSelector(inport,
611 Criteria.matchVlanId(VlanId.ANY),
612 innerSelectorMeta));
613 } else {
614 // Required to differentiate the same match flows
615 // Please note that S tag and S p bit values will be same for the same service - so conflict flows!
616 // Metadata match criteria solves the conflict issue - but not used by the voltha
617 // Maybe - find a better way to solve the above problem
618 Criterion metadata = Criteria.matchMetadata(innerVlan.toShort());
619 TrafficSelector outerSelector = buildSelector(inport, metadata, outerVlan, outerPbit, dstMac);
620 installDownstreamRulesForVlans(fwd, output, outerSelector, buildSelector(inport, innerVid,
621 innerSelectorMeta));
622 }
623 }
624
625 private void installDownstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
626 TrafficSelector outerSelector, TrafficSelector innerSelector) {
627
628 List<Pair<Instruction, Instruction>> vlanOps =
629 vlanOps(fwd,
630 L2ModificationInstruction.L2SubType.VLAN_POP);
631
632 if (vlanOps == null || vlanOps.isEmpty()) {
633 return;
634 }
635
636 Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
637
638 TrafficTreatment innerTreatment;
639 VlanId setVlanId = ((L2ModificationInstruction.ModVlanIdInstruction) popAndRewrite.getRight()).vlanId();
640 if (VlanId.NONE.equals(setVlanId)) {
641 innerTreatment = (buildTreatment(popAndRewrite.getLeft(), fetchMeter(fwd),
642 writeMetadataIncludingOnlyTp(fwd), output));
643 } else {
644 innerTreatment = (buildTreatment(popAndRewrite.getRight(),
645 fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd), output));
646 }
647
648 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
649 fwd.treatment().allInstructions());
650
651 Instruction innerPbitSet = null;
652
653 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
654 innerPbitSet = setVlanPcps.get(0);
655 }
656
657 VlanId remarkInnerVlan = null;
658 Optional<Criterion> vlanIdCriterion = readFromSelector(innerSelector, Criterion.Type.VLAN_VID);
659 if (vlanIdCriterion.isPresent()) {
660 remarkInnerVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
661 }
662
663 Instruction modVlanId = null;
664 if (innerPbitSet != null) {
665 modVlanId = Instructions.modVlanId(remarkInnerVlan);
666 }
667
668 //match: in port (nni), s-tag
669 //action: pop vlan (s-tag), write metadata, go to table 1, meter
670 FlowRule.Builder outer = DefaultFlowRule.builder()
671 .fromApp(fwd.appId())
672 .forDevice(deviceId)
673 .makePermanent()
674 .withPriority(fwd.priority())
675 .withSelector(outerSelector)
676 .withTreatment(buildTreatment(popAndRewrite.getLeft(), modVlanId,
677 innerPbitSet, fetchMeter(fwd),
678 fetchWriteMetadata(fwd),
679 Instructions.transition(QQ_TABLE)));
680
681 //match: in port (nni), c-tag
682 //action: immediate: write metadata and pop, meter, output
683 FlowRule.Builder inner = DefaultFlowRule.builder()
684 .fromApp(fwd.appId())
685 .forDevice(deviceId)
686 .forTable(QQ_TABLE)
687 .makePermanent()
688 .withPriority(fwd.priority())
689 .withSelector(innerSelector)
690 .withTreatment(innerTreatment);
691 applyRules(fwd, inner, outer);
692 }
693
694 private void installDownstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
695 TrafficSelector outerSelector, TrafficSelector innerSelector) {
696
697 //match: in port (nni), s-tag
698 //action: immediate: write metadata, pop vlan, meter and go to table 1
699 FlowRule.Builder outer = DefaultFlowRule.builder()
700 .fromApp(fwd.appId())
701 .forDevice(deviceId)
702 .makePermanent()
703 .withPriority(fwd.priority())
704 .withSelector(outerSelector)
705 .withTreatment(buildTreatment(Instructions.popVlan(), fetchMeter(fwd),
706 fetchWriteMetadata(fwd), Instructions.transition(QQ_TABLE)));
707
708 //match: in port (nni) and s-tag
709 //action: immediate : write metadata, meter and output
710 FlowRule.Builder inner = DefaultFlowRule.builder()
711 .fromApp(fwd.appId())
712 .forDevice(deviceId)
713 .forTable(QQ_TABLE)
714 .makePermanent()
715 .withPriority(fwd.priority())
716 .withSelector(innerSelector)
717 .withTreatment(buildTreatment(fetchMeter(fwd),
718 writeMetadataIncludingOnlyTp(fwd), output));
719
720 applyRules(fwd, inner, outer);
721 }
722
723 private void installUpstreamRules(ForwardingObjective fwd) {
724 List<Pair<Instruction, Instruction>> vlanOps =
725 vlanOps(fwd,
726 L2ModificationInstruction.L2SubType.VLAN_PUSH);
727
728 if (vlanOps == null || vlanOps.isEmpty()) {
729 return;
730 }
731
732 Instruction output = fetchOutput(fwd, UPSTREAM);
733
734 if (output == null) {
735 return;
736 }
737
738 Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
739
740 boolean noneValueVlanStatus = checkNoneVlanCriteria(fwd);
741 boolean anyValueVlanStatus = checkAnyVlanMatchCriteria(fwd);
742
743 if (anyValueVlanStatus) {
744 installUpstreamRulesForAnyVlan(fwd, output, outerPair);
745 } else {
746 Pair<Instruction, Instruction> innerPair = outerPair;
747 outerPair = vlanOps.remove(0);
748 installUpstreamRulesForVlans(fwd, output, innerPair, outerPair, noneValueVlanStatus);
749 }
750 }
751
752 private void installUpstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
753 Pair<Instruction, Instruction> innerPair,
754 Pair<Instruction, Instruction> outerPair, Boolean noneValueVlanStatus) {
755
756 List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
757 fwd.treatment().allInstructions());
758
759 Instruction innerPbitSet = null;
760 Instruction outerPbitSet = null;
761
762 if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
763 innerPbitSet = setVlanPcps.get(0);
764 outerPbitSet = setVlanPcps.get(1);
765 }
766
767 TrafficTreatment innerTreatment;
768 if (noneValueVlanStatus) {
769 innerTreatment = buildTreatment(innerPair.getLeft(), innerPair.getRight(), fetchMeter(fwd),
770 fetchWriteMetadata(fwd), innerPbitSet,
771 Instructions.transition(QQ_TABLE));
772 } else {
773 innerTreatment = buildTreatment(innerPair.getRight(), fetchMeter(fwd), fetchWriteMetadata(fwd),
774 innerPbitSet, Instructions.transition(QQ_TABLE));
775 }
776
777 //match: in port, vlanId (0 or None)
778 //action:
779 //if vlanId None, push & set c-tag go to table 1
780 //if vlanId 0 or any specific vlan, set c-tag, write metadata, meter and go to table 1
781 FlowRule.Builder inner = DefaultFlowRule.builder()
782 .fromApp(fwd.appId())
783 .forDevice(deviceId)
784 .makePermanent()
785 .withPriority(fwd.priority())
786 .withSelector(fwd.selector())
787 .withTreatment(innerTreatment);
788
789 PortCriterion inPort = (PortCriterion)
790 fwd.selector().getCriterion(Criterion.Type.IN_PORT);
791
792 VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
793 innerPair.getRight()).vlanId();
794
795 //match: in port, c-tag
796 //action: immediate: push s-tag, write metadata, meter and output
797 FlowRule.Builder outer = DefaultFlowRule.builder()
798 .fromApp(fwd.appId())
799 .forDevice(deviceId)
800 .forTable(QQ_TABLE)
801 .makePermanent()
802 .withPriority(fwd.priority())
803 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
804 fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd),
805 outerPbitSet, output));
806
807 if (innerPbitSet != null) {
808 byte innerPbit = ((L2ModificationInstruction.ModVlanPcpInstruction)
809 innerPbitSet).vlanPcp();
810 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId), Criteria.matchVlanPcp(innerPbit)));
811 } else {
812 outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId)));
813 }
814
815 applyRules(fwd, inner, outer);
816 }
817
818 private void installUpstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
819 Pair<Instruction, Instruction> outerPair) {
820
821 log.debug("Installing upstream rules for any value vlan");
822
823 //match: in port and any-vlan (coming from OLT app.)
824 //action: write metadata, go to table 1 and meter
825 FlowRule.Builder inner = DefaultFlowRule.builder()
826 .fromApp(fwd.appId())
827 .forDevice(deviceId)
828 .makePermanent()
829 .withPriority(fwd.priority())
830 .withSelector(fwd.selector())
831 .withTreatment(buildTreatment(Instructions.transition(QQ_TABLE), fetchMeter(fwd),
832 fetchWriteMetadata(fwd)));
833
834 //match: in port and any-vlan (coming from OLT app.)
835 //action: immediate: push:QinQ, vlanId (s-tag), write metadata, meter and output
836 FlowRule.Builder outer = DefaultFlowRule.builder()
837 .fromApp(fwd.appId())
838 .forDevice(deviceId)
839 .forTable(QQ_TABLE)
840 .makePermanent()
841 .withPriority(fwd.priority())
842 .withSelector(fwd.selector())
843 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
844 fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd), output));
845
846 applyRules(fwd, inner, outer);
847 }
848
849 private boolean checkNoneVlanCriteria(ForwardingObjective fwd) {
850 // Add the VLAN_PUSH treatment if we're matching on VlanId.NONE
851 Criterion vlanMatchCriterion = filterForCriterion(fwd.selector().criteria(), Criterion.Type.VLAN_VID);
852 boolean noneValueVlanStatus = false;
853 if (vlanMatchCriterion != null) {
854 noneValueVlanStatus = ((VlanIdCriterion) vlanMatchCriterion).vlanId().equals(VlanId.NONE);
855 }
856 return noneValueVlanStatus;
857 }
858
859 private boolean checkAnyVlanMatchCriteria(ForwardingObjective fwd) {
860 Criterion anyValueVlanCriterion = fwd.selector().criteria().stream()
861 .filter(c -> c.type().equals(Criterion.Type.VLAN_VID))
862 .filter(vc -> ((VlanIdCriterion) vc).vlanId().toShort() == VlanId.ANY_VALUE)
863 .findAny().orElse(null);
864
865 if (anyValueVlanCriterion == null) {
866 log.debug("Any value vlan match criteria is not found, criteria {}",
867 fwd.selector().criteria());
868 return false;
869 }
870
871 return true;
872 }
873
874 private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
875 Instruction output = fwd.treatment().allInstructions().stream()
876 .filter(i -> i.type() == Instruction.Type.OUTPUT)
877 .findFirst().orElse(null);
878
879 if (output == null) {
880 log.error("OLT {} rule has no output", direction);
881 fail(fwd, ObjectiveError.BADPARAMS);
882 return null;
883 }
884 return output;
885 }
886
887 private Instruction fetchMeter(ForwardingObjective fwd) {
888 Instruction meter = fwd.treatment().metered();
889
890 if (meter == null) {
891 log.debug("Meter instruction is not found for the forwarding objective {}", fwd);
892 return null;
893 }
894
895 log.debug("Meter instruction is found.");
896 return meter;
897 }
898
899 private Instructions.MetadataInstruction fetchWriteMetadata(ForwardingObjective fwd) {
900 Instructions.MetadataInstruction writeMetadata = fwd.treatment().writeMetadata();
901
902 if (writeMetadata == null) {
903 log.warn("Write metadata is not found for the forwarding obj");
904 fail(fwd, ObjectiveError.BADPARAMS);
905 return null;
906 }
907
908 log.debug("Write metadata is found {}", writeMetadata);
909 return writeMetadata;
910 }
911
912 private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
913 L2ModificationInstruction.L2SubType type) {
914
915 List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
916 fwd.treatment().allInstructions(), type);
917
918 if (vlanOps == null || vlanOps.isEmpty()) {
919 String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
920 ? DOWNSTREAM : UPSTREAM;
921 log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
922 fail(fwd, ObjectiveError.BADPARAMS);
923 return ImmutableList.of();
924 }
925 return vlanOps;
926 }
927
928
929 private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
930 L2ModificationInstruction.L2SubType type) {
931
932 List<Instruction> vlanOperations = findL2Instructions(
933 type,
934 instructions);
935 List<Instruction> vlanSets = findL2Instructions(
936 L2ModificationInstruction.L2SubType.VLAN_ID,
937 instructions);
938
939 if (vlanOperations.size() != vlanSets.size()) {
940 return ImmutableList.of();
941 }
942
943 List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
944
945 for (int i = 0; i < vlanOperations.size(); i++) {
946 pairs.add(new ImmutablePair<>(vlanOperations.get(i), vlanSets.get(i)));
947 }
948 return pairs;
949 }
950
951 private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
952 List<Instruction> actions) {
953 return actions.stream()
954 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
955 .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
956 .collect(Collectors.toList());
957 }
958
959 private void provisionEthTypeBasedFilter(FilteringObjective filter,
960 EthTypeCriterion ethType,
961 Instructions.OutputInstruction output,
962 L2ModificationInstruction vlanId,
963 L2ModificationInstruction vlanPush) {
964
965 Instruction meter = filter.meta().metered();
966 Instruction writeMetadata = filter.meta().writeMetadata();
967
968 TrafficSelector selector = buildSelector(filter.key(), ethType);
969 TrafficTreatment treatment;
970
971 if (vlanPush == null || vlanId == null) {
972 treatment = buildTreatment(output, meter, writeMetadata);
973 } else {
974 // we need to push the vlan because it came untagged (ATT)
975 treatment = buildTreatment(output, meter, vlanPush, vlanId, writeMetadata);
976 }
977
978 buildAndApplyRule(filter, selector, treatment);
979
980 }
981
982 private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
983 IPProtocolCriterion ipProto,
984 Instructions.OutputInstruction output,
985 Instruction vlan, Instruction pcp) {
986
987 Instruction meter = filter.meta().metered();
988 Instruction writeMetadata = filter.meta().writeMetadata();
989
990 // uniTagMatch
991 VlanIdCriterion vlanId = (VlanIdCriterion) filterForCriterion(filter.conditions(),
992 Criterion.Type.VLAN_VID);
993
994 TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto, vlanId);
995 TrafficTreatment treatment = buildTreatment(output, vlan, pcp, meter, writeMetadata);
996 buildAndApplyRule(filter, selector, treatment);
997 }
998
999 private void provisionDhcp(FilteringObjective filter, EthTypeCriterion ethType,
1000 IPProtocolCriterion ipProto,
1001 UdpPortCriterion udpSrcPort,
1002 UdpPortCriterion udpDstPort,
1003 Instruction vlanIdInstruction,
1004 Instruction vlanPcpInstruction,
1005 Instructions.OutputInstruction output) {
1006
1007 Instruction meter = filter.meta().metered();
1008 Instruction writeMetadata = filter.meta().writeMetadata();
1009
1010 VlanIdCriterion matchVlanId = (VlanIdCriterion)
1011 filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
1012
1013 TrafficSelector selector;
1014 TrafficTreatment treatment;
1015
1016 if (matchVlanId != null) {
1017 log.debug("Building selector with match VLAN, {}", matchVlanId);
1018 // in case of TT upstream the packet comes tagged and the vlan is swapped.
1019 selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort,
1020 udpDstPort, matchVlanId);
1021 treatment = buildTreatment(output, meter, writeMetadata,
1022 vlanIdInstruction, vlanPcpInstruction);
1023 } else {
1024 log.debug("Building selector with no VLAN");
1025 // in case of ATT upstream the packet comes in untagged and we need to push the vlan
1026 selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort, udpDstPort);
1027 treatment = buildTreatment(output, meter, vlanIdInstruction, writeMetadata);
1028 }
1029 //In case of downstream there will be no match on the VLAN, which is null,
1030 // so it will just be output, meter, writeMetadata
1031
1032 buildAndApplyRule(filter, selector, treatment);
1033 }
1034
1035 private void provisionPPPoED(FilteringObjective filter, EthTypeCriterion ethType,
1036 Instruction vlanIdInstruction,
1037 Instruction vlanPcpInstruction,
1038 Instructions.OutputInstruction output) {
1039 Instruction meter = filter.meta().metered();
1040 Instruction writeMetadata = filter.meta().writeMetadata();
1041
1042 VlanIdCriterion matchVlanId = (VlanIdCriterion)
1043 filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
1044
1045 TrafficSelector selector;
1046 TrafficTreatment treatment;
1047
1048 if (matchVlanId != null) {
1049 log.debug("Building pppoed selector with match VLAN {}.", matchVlanId);
1050 } else {
1051 log.debug("Building pppoed selector without match VLAN.");
1052 }
1053
1054 selector = buildSelector(filter.key(), ethType, matchVlanId);
1055 treatment = buildTreatment(output, meter, writeMetadata, vlanIdInstruction, vlanPcpInstruction);
1056 buildAndApplyRule(filter, selector, treatment);
1057 }
1058
1059 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
1060 TrafficTreatment treatment) {
1061 FlowRule rule = DefaultFlowRule.builder()
1062 .fromApp(filter.appId())
1063 .forDevice(deviceId)
1064 .forTable(0)
1065 .makePermanent()
1066 .withSelector(selector)
1067 .withTreatment(treatment)
1068 .withPriority(filter.priority())
1069 .build();
1070
1071 if (accumulator != null) {
1072 if (log.isDebugEnabled()) {
1073 log.debug("Adding pair to batch: {}", Pair.of(filter, rule));
1074 }
1075 accumulator.add(Pair.of(filter, rule));
1076 } else {
1077 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
1078 switch (filter.type()) {
1079 case PERMIT:
1080 opsBuilder.add(rule);
1081 break;
1082 case DENY:
1083 opsBuilder.remove(rule);
1084 break;
1085 default:
1086 log.warn("Unknown filter type : {}", filter.type());
1087 fail(filter, ObjectiveError.UNSUPPORTED);
1088 }
1089 applyFlowRules(ImmutableList.of(filter), opsBuilder);
1090 }
1091 }
1092
1093 private void applyRules(ForwardingObjective fwd, FlowRule.Builder... fwdBuilders) {
1094 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
1095 switch (fwd.op()) {
1096 case ADD:
1097 for (FlowRule.Builder fwdBuilder : fwdBuilders) {
1098 builder.add(fwdBuilder.build());
1099 }
1100 break;
1101 case REMOVE:
1102 for (FlowRule.Builder fwdBuilder : fwdBuilders) {
1103 builder.remove(fwdBuilder.build());
1104 }
1105 break;
1106 case ADD_TO_EXISTING:
1107 break;
1108 case REMOVE_FROM_EXISTING:
1109 break;
1110 default:
1111 log.warn("Unknown forwarding operation: {}", fwd.op());
1112 }
1113
1114 applyFlowRules(ImmutableList.of(fwd), builder);
1115
1116
1117 }
1118
1119 private void applyFlowRules(List<Objective> objectives, FlowRuleOperations.Builder builder) {
1120 flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
1121 @Override
1122 public void onSuccess(FlowRuleOperations ops) {
1123 objectives.forEach(obj -> {
1124 pass(obj);
1125 });
1126 }
1127
1128 @Override
1129 public void onError(FlowRuleOperations ops) {
1130 objectives.forEach(obj -> {
1131 fail(obj, ObjectiveError.FLOWINSTALLATIONFAILED);
1132 });
1133
1134 }
1135 }));
1136 }
1137
1138 // Builds the batch using the accumulated flow rules
1139 private void sendFilters(List<Pair<FilteringObjective, FlowRule>> pairs) {
1140 FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
1141 log.debug("Sending batch of {} filter-objs", pairs.size());
1142 List<Objective> filterObjs = Lists.newArrayList();
1143 // Iterates over all accumulated flow rules and then build an unique batch
1144 pairs.forEach(pair -> {
1145 FilteringObjective filter = pair.getLeft();
1146 FlowRule rule = pair.getRight();
1147 switch (filter.type()) {
1148 case PERMIT:
1149 flowOpsBuilder.add(rule);
1150 log.debug("Applying add filter-obj {} to device: {}", filter.id(), deviceId);
1151 filterObjs.add(filter);
1152 break;
1153 case DENY:
1154 flowOpsBuilder.remove(rule);
1155 log.debug("Deleting flow rule {} to device: {}", rule, deviceId);
1156 filterObjs.add(filter);
1157 break;
1158 default:
1159 fail(filter, ObjectiveError.UNKNOWN);
1160 log.warn("Unknown forwarding type {}", filter.type());
1161 }
1162 });
1163 if (log.isDebugEnabled()) {
1164 log.debug("Applying batch {}", flowOpsBuilder.build());
1165 }
1166 // Finally applies the operations
1167 applyFlowRules(filterObjs, flowOpsBuilder);
1168 }
1169
1170 private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
1171 return criteria.stream()
1172 .filter(c -> c.type().equals(type))
1173 .limit(1)
1174 .findFirst().orElse(null);
1175 }
1176
1177 private TrafficSelector buildSelector(Criterion... criteria) {
1178
1179 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
1180
1181 Arrays.stream(criteria).filter(Objects::nonNull).forEach(sBuilder::add);
1182
1183 return sBuilder.build();
1184 }
1185
1186 private TrafficTreatment buildTreatment(Instruction... instructions) {
1187
1188 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
1189
1190 Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add);
1191
1192 return tBuilder.build();
1193 }
1194
1195 private Instruction writeMetadataIncludingOnlyTp(ForwardingObjective fwd) {
1196
1197 return Instructions.writeMetadata(
1198 fetchWriteMetadata(fwd).metadata() & 0xFFFF00000000L, 0L);
1199 }
1200
1201 private void fail(Objective obj, ObjectiveError error) {
1202 obj.context().ifPresent(context -> context.onError(obj, error));
1203 }
1204
1205 private void pass(Objective obj) {
1206 obj.context().ifPresent(context -> context.onSuccess(obj));
1207 }
1208
1209
1210 private class InnerGroupListener implements GroupListener {
1211 @Override
1212 public void event(GroupEvent event) {
1213 GroupKey key = event.subject().appCookie();
1214 NextObjective obj = pendingGroups.getIfPresent(key);
1215 if (obj == null) {
1216 log.debug("No pending group for {}, moving on", key);
1217 return;
1218 }
1219 log.trace("Event {} for group {}, handling pending" +
1220 "NextGroup {}", event.type(), key, obj.id());
1221 if (event.type() == GroupEvent.Type.GROUP_ADDED ||
1222 event.type() == GroupEvent.Type.GROUP_UPDATED) {
1223 flowObjectiveStore.putNextGroup(obj.id(), new OltPipelineGroup(key));
Andrea Campanella37f07e42021-02-16 11:24:39 +01001224 pendingGroups.invalidate(key);
Esin Karamand106e522021-03-15 14:08:48 +00001225 pass(obj);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001226 } else if (event.type() == GroupEvent.Type.GROUP_REMOVED) {
1227 flowObjectiveStore.removeNextGroup(obj.id());
Andrea Campanella37f07e42021-02-16 11:24:39 +01001228 pendingGroups.invalidate(key);
Esin Karamand106e522021-03-15 14:08:48 +00001229 pass(obj);
Andrea Campanella37f07e42021-02-16 11:24:39 +01001230 }
1231 }
1232 }
1233
1234 private static class OltPipelineGroup implements NextGroup {
1235
1236 private final GroupKey key;
1237
1238 public OltPipelineGroup(GroupKey key) {
1239 this.key = key;
1240 }
1241
1242 public GroupKey key() {
1243 return key;
1244 }
1245
1246 @Override
1247 public byte[] data() {
1248 return appKryo.serialize(key);
1249 }
1250
1251 }
1252
1253 @Override
1254 public List<String> getNextMappings(NextGroup nextGroup) {
1255 // TODO Implementation deferred to vendor
1256 return null;
1257 }
1258
1259 // Flow rules accumulator for reducing the number of transactions required to the devices.
1260 private final class ObjectiveAccumulator
1261 extends AbstractAccumulator<Pair<FilteringObjective, FlowRule>> {
1262
1263 ObjectiveAccumulator(int maxFilter, int maxBatchMS, int maxIdleMS) {
1264 super(TIMER, maxFilter, maxBatchMS, maxIdleMS);
1265 }
1266
1267 @Override
1268 public void processItems(List<Pair<FilteringObjective, FlowRule>> pairs) {
1269 // Triggers creation of a batch using the list of flowrules generated from objs.
1270 accumulatorExecutorService.execute(new FlowRulesBuilderTask(pairs));
1271 }
1272 }
1273
1274 // Task for building batch of flow rules in a separate thread.
1275 private final class FlowRulesBuilderTask implements Runnable {
1276 private final List<Pair<FilteringObjective, FlowRule>> pairs;
1277
1278 FlowRulesBuilderTask(List<Pair<FilteringObjective, FlowRule>> pairs) {
1279 this.pairs = pairs;
1280 }
1281
1282 @Override
1283 public void run() {
1284 try {
1285 sendFilters(pairs);
1286 } catch (Exception e) {
1287 log.warn("Unable to send objectives", e);
1288 }
1289 }
1290 }
1291}