blob: f8b9009ec2f14056264b3a914caa71dbb0c1b2d9 [file] [log] [blame]
Andrea Campanella37f07e42021-02-16 11:24:39 +01001/*
2 * Copyright 2016-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.olt.driver;
17
18import com.google.common.cache.Cache;
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.RemovalCause;
21import com.google.common.cache.RemovalNotification;
22import com.google.common.collect.Lists;
23import org.apache.commons.lang3.tuple.ImmutablePair;
24import org.apache.commons.lang3.tuple.Pair;
25import org.onlab.osgi.ServiceDirectory;
26import org.onlab.packet.EthType;
27import org.onlab.packet.IPv4;
28import org.onlab.packet.VlanId;
29import org.onlab.util.KryoNamespace;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.PortNumber;
34import org.onosproject.net.behaviour.NextGroup;
35import org.onosproject.net.behaviour.Pipeliner;
36import org.onosproject.net.behaviour.PipelinerContext;
37import org.onosproject.net.driver.AbstractHandlerBehaviour;
38import org.onosproject.net.flow.DefaultFlowRule;
39import org.onosproject.net.flow.DefaultTrafficSelector;
40import org.onosproject.net.flow.DefaultTrafficTreatment;
41import org.onosproject.net.flow.FlowRule;
42import org.onosproject.net.flow.FlowRuleOperations;
43import org.onosproject.net.flow.FlowRuleOperationsContext;
44import org.onosproject.net.flow.FlowRuleService;
45import org.onosproject.net.flow.TrafficSelector;
46import org.onosproject.net.flow.TrafficTreatment;
47import org.onosproject.net.flow.criteria.Criteria;
48import org.onosproject.net.flow.criteria.Criterion;
49import org.onosproject.net.flow.criteria.EthTypeCriterion;
50import org.onosproject.net.flow.criteria.IPCriterion;
51import org.onosproject.net.flow.criteria.IPProtocolCriterion;
52import org.onosproject.net.flow.criteria.PortCriterion;
53import org.onosproject.net.flow.criteria.VlanIdCriterion;
54import org.onosproject.net.flow.instructions.Instruction;
55import org.onosproject.net.flow.instructions.Instructions;
56import org.onosproject.net.flow.instructions.L2ModificationInstruction;
57import org.onosproject.net.flowobjective.FilteringObjective;
58import org.onosproject.net.flowobjective.FlowObjectiveStore;
59import org.onosproject.net.flowobjective.ForwardingObjective;
60import org.onosproject.net.flowobjective.NextObjective;
61import org.onosproject.net.flowobjective.Objective;
62import org.onosproject.net.flowobjective.ObjectiveError;
63import org.onosproject.net.group.DefaultGroupBucket;
64import org.onosproject.net.group.DefaultGroupDescription;
65import org.onosproject.net.group.DefaultGroupKey;
66import org.onosproject.net.group.Group;
67import org.onosproject.net.group.GroupBucket;
68import org.onosproject.net.group.GroupBuckets;
69import org.onosproject.net.group.GroupDescription;
70import org.onosproject.net.group.GroupEvent;
71import org.onosproject.net.group.GroupKey;
72import org.onosproject.net.group.GroupListener;
73import org.onosproject.net.group.GroupService;
74import org.onosproject.store.serializers.KryoNamespaces;
75import org.onosproject.store.service.StorageService;
76import org.slf4j.Logger;
77
78import java.util.Collection;
79import java.util.Collections;
80import java.util.Iterator;
81import java.util.List;
82import java.util.Optional;
83import java.util.concurrent.TimeUnit;
84import java.util.stream.Collectors;
85
86import static org.slf4j.LoggerFactory.getLogger;
87
88/**
89 * Pipeliner for OLT device.
90 */
91
92public class NokiaOltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
93
94 private static final Integer QQ_TABLE = 1;
95 private static final short MCAST_VLAN = 4000;
96 private static final String OLTCOOKIES = "olt-cookies-must-be-unique";
97 private static final int EAPOL_FLOW_PRIORITY = 1200;
98 private final Logger log = getLogger(getClass());
99
100 private ServiceDirectory serviceDirectory;
101 private FlowRuleService flowRuleService;
102 private GroupService groupService;
103 private CoreService coreService;
104 private StorageService storageService;
105
106 private DeviceId deviceId;
107 private ApplicationId appId;
108
109
110 protected FlowObjectiveStore flowObjectiveStore;
111
112 private Cache<GroupKey, NextObjective> pendingGroups;
113
114 protected static KryoNamespace appKryo = new KryoNamespace.Builder()
115 .register(KryoNamespaces.API)
116 .register(GroupKey.class)
117 .register(DefaultGroupKey.class)
118 .register(OltPipelineGroup.class)
119 .build("OltPipeline");
120 @Override
121 public void init(DeviceId deviceId, PipelinerContext context) {
122 log.debug("Initiate OLT pipeline");
123 this.serviceDirectory = context.directory();
124 this.deviceId = deviceId;
125
126 flowRuleService = serviceDirectory.get(FlowRuleService.class);
127 coreService = serviceDirectory.get(CoreService.class);
128 groupService = serviceDirectory.get(GroupService.class);
129 flowObjectiveStore = context.store();
130 storageService = serviceDirectory.get(StorageService.class);
131
132 appId = coreService.registerApplication(
133 "org.onosproject.driver.OLTPipeline");
134
135
136 pendingGroups = CacheBuilder.newBuilder()
137 .expireAfterWrite(20, TimeUnit.SECONDS)
138 .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
139 if (notification.getCause() == RemovalCause.EXPIRED) {
140 fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
141 }
142 }).build();
143
144 groupService.addListener(new InnerGroupListener());
145
146 }
147
148 @Override
149 public void filter(FilteringObjective filter) {
150 Instructions.OutputInstruction output;
151
152 if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
153 output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
154 .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
155 .limit(1)
156 .findFirst().get();
157
158 if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
159 log.error("OLT can only filter packet to controller");
160 fail(filter, ObjectiveError.UNSUPPORTED);
161 return;
162 }
163 } else {
164 fail(filter, ObjectiveError.BADPARAMS);
165 return;
166 }
167
168 if (filter.key().type() != Criterion.Type.IN_PORT) {
169 fail(filter, ObjectiveError.BADPARAMS);
170 return;
171 }
172
173 EthTypeCriterion ethType = (EthTypeCriterion)
174 filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
175
176 if (ethType == null) {
177 fail(filter, ObjectiveError.BADPARAMS);
178 return;
179 }
180
181 if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
182 provisionEapol(filter, ethType, output);
183 } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
184 IPProtocolCriterion ipProto = (IPProtocolCriterion)
185 filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
186 if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
187 provisionIgmp(filter, ethType, ipProto, output);
188 } else {
189 log.error("OLT can only filter igmp");
190 fail(filter, ObjectiveError.UNSUPPORTED);
191 }
192 } else {
193 log.error("OLT can only filter eapol and igmp");
194 fail(filter, ObjectiveError.UNSUPPORTED);
195 }
196
197 }
198
199 private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
200 FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
201 switch (objective.op()) {
202
203 case ADD:
204 flowBuilder.add(ruleBuilder.build());
205 break;
206 case REMOVE:
207 flowBuilder.remove(ruleBuilder.build());
208 break;
209 default:
210 log.warn("Unknown operation {}", objective.op());
211 }
212
213 flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
214 @Override
215 public void onSuccess(FlowRuleOperations ops) {
216 objective.context().ifPresent(context -> context.onSuccess(objective));
217 }
218
219 @Override
220 public void onError(FlowRuleOperations ops) {
221 objective.context()
222 .ifPresent(context -> context.onError(objective, ObjectiveError.FLOWINSTALLATIONFAILED));
223 }
224 }));
225 }
226
227 @Override
228 public void forward(ForwardingObjective fwd) {
229
230 if (checkForMulticast(fwd)) {
231 processMulticastRule(fwd);
232 return;
233 }
234
235 if (checkForEapol(fwd)) {
236 log.warn("Discarding EAPOL flow which is not supported on this pipeline");
237 return;
238 }
239
240 TrafficTreatment treatment = fwd.treatment();
241
242 List<Instruction> instructions = treatment.allInstructions();
243
244 Optional<Instruction> vlanIntruction = instructions.stream()
245 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
246 .filter(i -> ((L2ModificationInstruction) i).subtype() ==
247 L2ModificationInstruction.L2SubType.VLAN_PUSH ||
248 ((L2ModificationInstruction) i).subtype() ==
249 L2ModificationInstruction.L2SubType.VLAN_POP)
250 .findAny();
251
252 if (vlanIntruction.isPresent()) {
253 L2ModificationInstruction vlanIns =
254 (L2ModificationInstruction) vlanIntruction.get();
255
256 if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
257 installUpstreamRules(fwd);
258 } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
259 installDownstreamRules(fwd);
260 } else {
261 log.error("Unknown OLT operation: {}", fwd);
262 fail(fwd, ObjectiveError.UNSUPPORTED);
263 return;
264 }
265
266 pass(fwd);
267 } else {
268 TrafficSelector selector = fwd.selector();
269
270 if (fwd.treatment() != null) {
271 // Deal with SPECIFIC and VERSATILE in the same manner.
272 FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
273 .forDevice(deviceId)
274 .withSelector(selector)
275 .fromApp(fwd.appId())
276 .withPriority(fwd.priority())
277 .withTreatment(fwd.treatment());
278
279 if (fwd.permanent()) {
280 ruleBuilder.makePermanent();
281 } else {
282 ruleBuilder.makeTemporary(fwd.timeout());
283 }
284 installObjective(ruleBuilder, fwd);
285
286 } else {
287 log.error("No treatment error: {}", fwd);
288 fail(fwd, ObjectiveError.UNSUPPORTED);
289 }
290 }
291
292 }
293
294
295 @Override
296 public void next(NextObjective nextObjective) {
297 if (nextObjective.type() != NextObjective.Type.BROADCAST) {
298 log.error("OLT only supports broadcast groups.");
299 fail(nextObjective, ObjectiveError.BADPARAMS);
300 }
301
302 if (nextObjective.next().size() != 1) {
303 log.error("OLT only supports singleton broadcast groups.");
304 fail(nextObjective, ObjectiveError.BADPARAMS);
305 }
306
307 TrafficTreatment treatment = nextObjective.next().stream().findFirst().get();
308
309
310 GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
311 GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
312
313
314 pendingGroups.put(key, nextObjective);
315
316 switch (nextObjective.op()) {
317 case ADD:
318 GroupDescription groupDesc =
319 new DefaultGroupDescription(deviceId,
320 GroupDescription.Type.ALL,
321 new GroupBuckets(Collections.singletonList(bucket)),
322 key,
323 null,
324 nextObjective.appId());
325 groupService.addGroup(groupDesc);
326 break;
327 case REMOVE:
328 groupService.removeGroup(deviceId, key, nextObjective.appId());
329 break;
330 case ADD_TO_EXISTING:
331 groupService.addBucketsToGroup(deviceId, key,
332 new GroupBuckets(Collections.singletonList(bucket)),
333 key, nextObjective.appId());
334 break;
335 case REMOVE_FROM_EXISTING:
336 groupService.removeBucketsFromGroup(deviceId, key,
337 new GroupBuckets(Collections.singletonList(bucket)),
338 key, nextObjective.appId());
339 break;
340 default:
341 log.warn("Unknown next objective operation: {}", nextObjective.op());
342 }
343
344
345 }
346
347 private void processMulticastRule(ForwardingObjective fwd) {
348 if (fwd.nextId() == null) {
349 log.error("Multicast objective does not have a next id");
350 fail(fwd, ObjectiveError.BADPARAMS);
351 }
352
353 GroupKey key = getGroupForNextObjective(fwd.nextId());
354
355 if (key == null) {
356 log.error("Group for forwarding objective missing: {}", fwd);
357 fail(fwd, ObjectiveError.GROUPMISSING);
358 }
359
360 Group group = groupService.getGroup(deviceId, key);
361 TrafficTreatment treatment =
362 buildTreatment(Instructions.createGroup(group.id()));
363
364 FlowRule rule = DefaultFlowRule.builder()
365 .fromApp(fwd.appId())
366 .forDevice(deviceId)
367 .forTable(0)
368 .makePermanent()
369 .withPriority(fwd.priority())
370 .withSelector(fwd.selector())
371 .withTreatment(treatment)
372 .build();
373
374 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
375 switch (fwd.op()) {
376
377 case ADD:
378 builder.add(rule);
379 break;
380 case REMOVE:
381 builder.remove(rule);
382 break;
383 case ADD_TO_EXISTING:
384 case REMOVE_FROM_EXISTING:
385 break;
386 default:
387 log.warn("Unknown forwarding operation: {}", fwd.op());
388 }
389
390 applyFlowRules(builder, fwd);
391
392 }
393
394 private boolean checkForMulticast(ForwardingObjective fwd) {
395
396 IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
397 Criterion.Type.IPV4_DST);
398
399 if (ip == null) {
400 return false;
401 }
402
403 return ip.ip().isMulticast();
404
405 }
406
407 private boolean checkForEapol(ForwardingObjective fwd) {
408 EthTypeCriterion ethType = (EthTypeCriterion)
409 filterForCriterion(fwd.selector().criteria(), Criterion.Type.ETH_TYPE);
410
411 return ethType != null && ethType.ethType().equals(EthType.EtherType.EAPOL.ethType());
412 }
413 private GroupKey getGroupForNextObjective(Integer nextId) {
414 NextGroup next = flowObjectiveStore.getNextGroup(nextId);
415 return appKryo.deserialize(next.data());
416
417 }
418
419 private void installDownstreamRules(ForwardingObjective fwd) {
420 List<Pair<Instruction, Instruction>> vlanOps =
421 vlanOps(fwd,
422 L2ModificationInstruction.L2SubType.VLAN_POP);
423
424 if (vlanOps == null) {
425 return;
426 }
427
428 Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, "downstream");
429
430 if (output == null) {
431 return;
432 }
433
434 Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
435
436 TrafficSelector selector = fwd.selector();
437
438 Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
439 Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
440 Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
441 Criterion bullshit = Criteria.matchMetadata(output.port().toLong());
442
443 if (outerVlan == null || innerVlan == null || inport == null) {
444 log.error("Forwarding objective is underspecified: {}", fwd);
445 fail(fwd, ObjectiveError.BADPARAMS);
446 return;
447 }
448
449 Criterion innerVid = Criteria.matchVlanId(((VlanIdCriterion) innerVlan).vlanId());
450
451 FlowRule.Builder outer = DefaultFlowRule.builder()
452 .fromApp(fwd.appId())
453 .forDevice(deviceId)
454 .makePermanent()
455 .withPriority(fwd.priority())
456 .withSelector(buildSelector(inport, outerVlan, bullshit))
457 .withTreatment(buildTreatment(popAndRewrite.getLeft(),
458 Instructions.transition(QQ_TABLE)));
459
460 FlowRule.Builder inner = DefaultFlowRule.builder()
461 .fromApp(fwd.appId())
462 .forDevice(deviceId)
463 .forTable(QQ_TABLE)
464 .makePermanent()
465 .withPriority(fwd.priority())
466 .withSelector(buildSelector(inport, innerVid))
467 .withTreatment(buildTreatment(popAndRewrite.getLeft(),
468 output));
469
470 applyRules(fwd, inner, outer);
471
472 }
473
474 private boolean hasUntaggedVlanTag(TrafficSelector selector) {
475 Iterator<Criterion> iter = selector.criteria().iterator();
476
477 while (iter.hasNext()) {
478 Criterion criterion = iter.next();
479 if (criterion.type() == Criterion.Type.VLAN_VID &&
480 ((VlanIdCriterion) criterion).vlanId().toShort() == VlanId.UNTAGGED) {
481 return true;
482 }
483 }
484
485 return false;
486 }
487
488 private void installUpstreamRules(ForwardingObjective fwd) {
489 List<Pair<Instruction, Instruction>> vlanOps =
490 vlanOps(fwd,
491 L2ModificationInstruction.L2SubType.VLAN_PUSH);
492 FlowRule.Builder inner;
493
494 if (vlanOps == null) {
495 return;
496 }
497
498 Instruction output = fetchOutput(fwd, "upstream");
499
500 if (output == null) {
501 return;
502 }
503
504 Pair<Instruction, Instruction> innerPair = vlanOps.remove(0);
505
506 Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
507
508
509 if (hasUntaggedVlanTag(fwd.selector())) {
510 inner = DefaultFlowRule.builder()
511 .fromApp(fwd.appId())
512 .forDevice(deviceId)
513 .makePermanent()
514 .withPriority(fwd.priority())
515 .withSelector(fwd.selector())
516 .withTreatment(buildTreatment(innerPair.getLeft(),
517 innerPair.getRight(),
518 Instructions.transition(QQ_TABLE)));
519 } else {
520 inner = DefaultFlowRule.builder()
521 .fromApp(fwd.appId())
522 .forDevice(deviceId)
523 .makePermanent()
524 .withPriority(fwd.priority())
525 .withSelector(fwd.selector())
526 .withTreatment(buildTreatment(
527 innerPair.getRight(),
528 Instructions.transition(QQ_TABLE)));
529 }
530
531
532 PortCriterion inPort = (PortCriterion)
533 fwd.selector().getCriterion(Criterion.Type.IN_PORT);
534
535 VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
536 innerPair.getRight()).vlanId();
537
538 FlowRule.Builder outer = DefaultFlowRule.builder()
539 .fromApp(fwd.appId())
540 .forDevice(deviceId)
541 .forTable(QQ_TABLE)
542 .makePermanent()
543 .withPriority(fwd.priority())
544 .withSelector(buildSelector(inPort,
545 Criteria.matchVlanId(cVlanId)))
546 .withTreatment(buildTreatment(outerPair.getLeft(),
547 outerPair.getRight(),
548 output));
549
550 applyRules(fwd, inner, outer);
551
552 }
553
554 private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
555 Instruction output = fwd.treatment().allInstructions().stream()
556 .filter(i -> i.type() == Instruction.Type.OUTPUT)
557 .findFirst().orElse(null);
558
559 if (output == null) {
560 log.error("OLT {} rule has no output", direction);
561 fail(fwd, ObjectiveError.BADPARAMS);
562 return null;
563 }
564 return output;
565 }
566
567 private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
568 L2ModificationInstruction.L2SubType type) {
569
570 List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
571 fwd.treatment().allInstructions(), type);
572
573 if (vlanOps == null) {
574 String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
575 ? "downstream" : "upstream";
576 log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
577 fail(fwd, ObjectiveError.BADPARAMS);
578 return null;
579 }
580 return vlanOps;
581 }
582
583
584 private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
585 L2ModificationInstruction.L2SubType type) {
586
587 List<Instruction> vlanPushs = findL2Instructions(
588 type,
589 instructions);
590 List<Instruction> vlanSets = findL2Instructions(
591 L2ModificationInstruction.L2SubType.VLAN_ID,
592 instructions);
593
594 if (vlanPushs.size() != vlanSets.size()) {
595 return null;
596 }
597
598 List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
599
600 for (int i = 0; i < vlanPushs.size(); i++) {
601 pairs.add(new ImmutablePair<>(vlanPushs.get(i), vlanSets.get(i)));
602 }
603 return pairs;
604 }
605
606 private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
607 List<Instruction> actions) {
608 return actions.stream()
609 .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
610 .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
611 .collect(Collectors.toList());
612 }
613
614 private void provisionEapol(FilteringObjective filter,
615 EthTypeCriterion ethType,
616 Instructions.OutputInstruction output) {
617
618 TrafficSelector selector = buildSelector(filter.key(), ethType);
619 TrafficTreatment treatment = buildTreatment(output);
620 buildAndApplyRule(filter, selector, treatment, EAPOL_FLOW_PRIORITY);
621
622 }
623
624 private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
625 IPProtocolCriterion ipProto,
626 Instructions.OutputInstruction output) {
627 TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto);
628 TrafficTreatment treatment = buildTreatment(output);
629 buildAndApplyRule(filter, selector, treatment);
630 }
631
632 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
633 TrafficTreatment treatment) {
634 buildAndApplyRule(filter, selector, treatment, filter.priority());
635 }
636
637 private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
638 TrafficTreatment treatment, int priority) {
639 FlowRule rule = DefaultFlowRule.builder()
640 .fromApp(filter.appId())
641 .forDevice(deviceId)
642 .forTable(0)
643 .makePermanent()
644 .withSelector(selector)
645 .withTreatment(treatment)
646 .withPriority(priority)
647 .build();
648
649 FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
650
651 switch (filter.type()) {
652 case PERMIT:
653 opsBuilder.add(rule);
654 break;
655 case DENY:
656 opsBuilder.remove(rule);
657 break;
658 default:
659 log.warn("Unknown filter type : {}", filter.type());
660 fail(filter, ObjectiveError.UNSUPPORTED);
661 }
662
663 applyFlowRules(opsBuilder, filter);
664 }
665
666 private void applyRules(ForwardingObjective fwd,
667 FlowRule.Builder inner, FlowRule.Builder outer) {
668 FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
669 switch (fwd.op()) {
670 case ADD:
671 builder.add(inner.build()).add(outer.build());
672 break;
673 case REMOVE:
674 builder.remove(inner.build()).remove(outer.build());
675 break;
676 case ADD_TO_EXISTING:
677 break;
678 case REMOVE_FROM_EXISTING:
679 break;
680 default:
681 log.warn("Unknown forwarding operation: {}", fwd.op());
682 }
683
684 applyFlowRules(builder, fwd);
685 }
686
687 private void applyFlowRules(FlowRuleOperations.Builder builder,
688 Objective objective) {
689 flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
690 @Override
691 public void onSuccess(FlowRuleOperations ops) {
692 pass(objective);
693 }
694
695 @Override
696 public void onError(FlowRuleOperations ops) {
697 fail(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
698 }
699 }));
700 }
701
702 private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
703 return criteria.stream()
704 .filter(c -> c.type().equals(type))
705 .limit(1)
706 .findFirst().orElse(null);
707 }
708
709 private TrafficSelector buildSelector(Criterion... criteria) {
710
711
712 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
713
714 for (Criterion c : criteria) {
715 sBuilder.add(c);
716 }
717
718 return sBuilder.build();
719 }
720
721 private TrafficTreatment buildTreatment(Instruction... instructions) {
722
723
724 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
725
726 for (Instruction i : instructions) {
727 tBuilder.add(i);
728 }
729
730 return tBuilder.build();
731 }
732
733
734 private void fail(Objective obj, ObjectiveError error) {
735 obj.context().ifPresent(context -> context.onError(obj, error));
736 }
737
738 private void pass(Objective obj) {
739 obj.context().ifPresent(context -> context.onSuccess(obj));
740 }
741
742
743 private class InnerGroupListener implements GroupListener {
744 @Override
745 public void event(GroupEvent event) {
746 if (event.type() == GroupEvent.Type.GROUP_ADDED || event.type() == GroupEvent.Type.GROUP_UPDATED) {
747 GroupKey key = event.subject().appCookie();
748
749 NextObjective obj = pendingGroups.getIfPresent(key);
750 if (obj != null) {
751 flowObjectiveStore.putNextGroup(obj.id(), new OltPipelineGroup(key));
752 pass(obj);
753 pendingGroups.invalidate(key);
754 }
755 }
756 }
757 }
758
759 private static class OltPipelineGroup implements NextGroup {
760
761 private final GroupKey key;
762
763 public OltPipelineGroup(GroupKey key) {
764 this.key = key;
765 }
766
767 public GroupKey key() {
768 return key;
769 }
770
771 @Override
772 public byte[] data() {
773 return appKryo.serialize(key);
774 }
775
776 }
777
778 @Override
779 public List<String> getNextMappings(NextGroup nextGroup) {
780 // TODO Implementation deferred to vendor
781 return null;
782 }
783}