blob: a663e381c9ba47b85557674c622b65a5150a2a8e [file] [log] [blame]
Hyunsun Moon2c3f0ee2017-04-06 16:47:21 +09001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.cordvtn.impl;
17
18import com.google.common.collect.ImmutableList;
19import com.google.common.collect.ImmutableSet;
20import com.jcraft.jsch.Session;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.packet.IpAddress;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.LeadershipService;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.Device;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.Port;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.behaviour.BridgeConfig;
39import org.onosproject.net.behaviour.BridgeDescription;
40import org.onosproject.net.behaviour.BridgeName;
41import org.onosproject.net.behaviour.ControllerInfo;
42import org.onosproject.net.behaviour.DefaultBridgeDescription;
43import org.onosproject.net.behaviour.DefaultTunnelDescription;
44import org.onosproject.net.behaviour.InterfaceConfig;
45import org.onosproject.net.behaviour.TunnelDescription;
46import org.onosproject.net.behaviour.TunnelEndPoints;
47import org.onosproject.net.behaviour.TunnelKeys;
48import org.onosproject.net.config.NetworkConfigEvent;
49import org.onosproject.net.config.NetworkConfigListener;
50import org.onosproject.net.config.NetworkConfigService;
51import org.onosproject.net.device.DeviceAdminService;
52import org.onosproject.net.device.DeviceEvent;
53import org.onosproject.net.device.DeviceListener;
54import org.onosproject.net.device.DeviceService;
55import org.onosproject.net.host.HostService;
56import org.onosproject.ovsdb.controller.OvsdbClientService;
57import org.onosproject.ovsdb.controller.OvsdbController;
58import org.onosproject.ovsdb.controller.OvsdbNodeId;
59import org.opencord.cordvtn.api.CordVtnConfig;
60import org.opencord.cordvtn.api.core.CordVtnPipeline;
61import org.opencord.cordvtn.api.core.InstanceService;
62import org.opencord.cordvtn.api.node.CordVtnNode;
63import org.opencord.cordvtn.api.node.CordVtnNodeAdminService;
64import org.opencord.cordvtn.api.node.CordVtnNodeEvent;
65import org.opencord.cordvtn.api.node.CordVtnNodeHandler;
66import org.opencord.cordvtn.api.node.CordVtnNodeListener;
67import org.opencord.cordvtn.api.node.CordVtnNodeService;
68import org.opencord.cordvtn.api.node.CordVtnNodeState;
69import org.opencord.cordvtn.api.node.DeviceHandler;
70import org.slf4j.Logger;
71
72import java.util.List;
73import java.util.Objects;
74import java.util.Set;
75import java.util.concurrent.ExecutorService;
76import java.util.stream.Collectors;
77
78import static java.lang.String.format;
79import static java.util.concurrent.Executors.newSingleThreadExecutor;
80import static org.onlab.util.Tools.groupedThreads;
81import static org.onosproject.net.AnnotationKeys.PORT_NAME;
82import static org.onosproject.net.Device.Type.SWITCH;
83import static org.onosproject.net.behaviour.TunnelDescription.Type.VXLAN;
84import static org.opencord.cordvtn.api.Constants.CORDVTN_APP_ID;
85import static org.opencord.cordvtn.api.Constants.DEFAULT_TUNNEL;
86import static org.opencord.cordvtn.api.Constants.INTEGRATION_BRIDGE;
87import static org.opencord.cordvtn.api.node.CordVtnNodeState.*;
88import static org.opencord.cordvtn.impl.DefaultCordVtnNode.updatedState;
89import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.*;
90import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.disconnect;
91import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.isInterfaceUp;
92import static org.slf4j.LoggerFactory.getLogger;
93
94/**
95 * Implementation of the {@link CordVtnNodeHandler} with OVSDB and SSH exec channel.
96 */
97@Component(immediate = true)
98@Service
99public class DefaultCordVtnNodeHandler implements CordVtnNodeHandler {
100
101 protected final Logger log = getLogger(getClass());
102
103 private static final String ERR_INCOMPLETE = "%s is %s from incomplete node, ignore it";
104 private static final String ERR_UNREGISTERED = "%s is %s from unregistered node, ignore it";
105 private static final String ERR_DETECTED = "detected";
106 private static final String ERR_VANISHED = "vanished";
107
108 private static final int DPID_BEGIN = 3;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected CoreService coreService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected LeadershipService leadershipService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected ClusterService clusterService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected NetworkConfigService configService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected DeviceService deviceService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected DeviceAdminService deviceAdminService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected HostService hostService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected OvsdbController ovsdbController;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected CordVtnNodeService nodeService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected CordVtnNodeAdminService nodeAdminService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected InstanceService instanceService;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
144 protected CordVtnPipeline pipelineService;
145
146 private final ExecutorService eventExecutor = newSingleThreadExecutor(
147 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
148 private final NetworkConfigListener configListener = new InternalConfigListener();
149 private final DeviceListener deviceListener = new InternalDeviceListener();
150 private final DeviceHandler ovsdbHandler = new OvsdbDeviceHandler();
151 private final DeviceHandler intgBridgeHandler = new IntegrationBridgeDeviceHandler();
152 private final CordVtnNodeListener nodeListener = new InternalCordVtnNodeListener();
153
154 private ApplicationId appId;
155 private NodeId localNodeId;
156 private List<ControllerInfo> controllers = ImmutableList.of();
157
158 @Activate
159 protected void activate() {
160 appId = coreService.registerApplication(CORDVTN_APP_ID);
161 leadershipService.runForLeadership(appId.name());
162 localNodeId = clusterService.getLocalNode().id();
163
164 configService.addListener(configListener);
165 deviceService.addListener(deviceListener);
166 nodeService.addListener(nodeListener);
167
168 readControllers();
169 log.info("Started");
170 }
171
172 @Deactivate
173 protected void deactivate() {
174 nodeService.removeListener(nodeListener);
175 deviceService.removeListener(deviceListener);
176 configService.removeListener(configListener);
177
178 leadershipService.withdraw(appId.name());
179 eventExecutor.shutdown();
180
181 log.info("Stopped");
182 }
183
184 @Override
185 public void processInitState(CordVtnNode node) {
186 if (!isOvsdbConnected(node)) {
187 ovsdbController.connect(node.hostManagementIp().ip(), node.ovsdbPort());
188 return;
189 }
190 createIntegrationBridge(node);
191 }
192
193 @Override
194 public void processDeviceCreatedState(CordVtnNode node) {
195 if (!isOvsdbConnected(node)) {
196 ovsdbController.connect(node.hostManagementIp().ip(), node.ovsdbPort());
197 return;
198 }
199 createTunnelInterface(node);
200 addSystemInterface(node, node.dataInterface());
201 if (node.hostManagementInterface() != null) {
202 addSystemInterface(node, node.hostManagementInterface());
203 }
204 }
205
206 @Override
207 public void processPortCreatedState(CordVtnNode node) {
208 configureInterface(node);
209 }
210
211 @Override
212 public void processCompleteState(CordVtnNode node) {
213 OvsdbClientService ovsdbClient = ovsdbController.getOvsdbClient(
214 new OvsdbNodeId(
215 node.hostManagementIp().ip(),
216 node.ovsdbPort().toInt()));
217 if (ovsdbClient != null && ovsdbClient.isConnected()) {
218 ovsdbClient.disconnect();
219 }
220 // TODO fix postInit to be done in the proper services
221 postInit(node);
222 log.info("Finished init {}", node.hostname());
223 }
224
225 private boolean isOvsdbConnected(CordVtnNode node) {
226 OvsdbClientService ovsdbClient = ovsdbController.getOvsdbClient(
227 new OvsdbNodeId(
228 node.hostManagementIp().ip(),
229 node.ovsdbPort().toInt()));
230 return deviceService.isAvailable(node.ovsdbId()) && ovsdbClient != null &&
231 ovsdbClient.isConnected();
232 }
233
234 private void createIntegrationBridge(CordVtnNode node) {
235 Device device = deviceService.getDevice(node.ovsdbId());
236 if (device == null || !device.is(BridgeConfig.class)) {
237 log.error("Failed to create integration bridge on {}", node.ovsdbId());
238 return;
239 }
240 String dpid = node.integrationBridgeId().toString().substring(DPID_BEGIN);
241 BridgeDescription bridgeDesc = DefaultBridgeDescription.builder()
242 .name(INTEGRATION_BRIDGE)
243 .failMode(BridgeDescription.FailMode.SECURE)
244 .datapathId(dpid)
245 .disableInBand()
246 .controllers(controllers)
247 .build();
248
249 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
250 bridgeConfig.addBridge(bridgeDesc);
251 }
252
253 private void createTunnelInterface(CordVtnNode node) {
254 Device device = deviceService.getDevice(node.ovsdbId());
255 if (device == null || !device.is(InterfaceConfig.class)) {
256 log.error("Failed to create tunnel interface on {}", node.ovsdbId());
257 return;
258 }
259 TunnelDescription tunnelDesc = DefaultTunnelDescription.builder()
260 .deviceId(INTEGRATION_BRIDGE)
261 .ifaceName(DEFAULT_TUNNEL)
262 .type(VXLAN)
263 .remote(TunnelEndPoints.flowTunnelEndpoint())
264 .key(TunnelKeys.flowTunnelKey())
265 .build();
266
267 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
268 ifaceConfig.addTunnelMode(DEFAULT_TUNNEL, tunnelDesc);
269 }
270
271 private void addSystemInterface(CordVtnNode node, String ifaceName) {
272 Session session = connect(node.sshInfo());
273 if (session == null || !isInterfaceUp(session, ifaceName)) {
274 log.error("Interface {} is not available on {}", ifaceName, node.hostname());
275 disconnect(session);
276 return;
277 } else {
278 disconnect(session);
279 }
280
281 Device device = deviceService.getDevice(node.ovsdbId());
282 if (!device.is(BridgeConfig.class)) {
283 log.error("BridgeConfig is not supported for {}", node.ovsdbId());
284 return;
285 }
286
287 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
288 bridgeConfig.addPort(BridgeName.bridgeName(INTEGRATION_BRIDGE), ifaceName);
289 }
290
291 private void configureInterface(CordVtnNode node) {
292 Session session = connect(node.sshInfo());
293 if (session == null) {
294 log.error("Failed to SSH to {}", node.hostname());
295 return;
296 }
297 getCurrentIps(session, INTEGRATION_BRIDGE).stream()
298 .filter(ip -> !ip.equals(node.localManagementIp().ip()))
299 .filter(ip -> !ip.equals(node.dataIp().ip()))
300 .forEach(ip -> deleteIp(session, ip, INTEGRATION_BRIDGE));
301
302 final boolean result = flushIp(session, node.dataInterface()) &&
303 setInterfaceUp(session, node.dataInterface()) &&
304 addIp(session, node.dataIp(), INTEGRATION_BRIDGE) &&
305 addIp(session, node.localManagementIp(), INTEGRATION_BRIDGE) &&
306 setInterfaceUp(session, INTEGRATION_BRIDGE);
307
308 disconnect(session);
309 if (result) {
310 bootstrapNode(node);
311 }
312 }
313
314 private void postInit(CordVtnNode node) {
315 // TODO move the below line to DefaultCordVtnPipeline
316 pipelineService.initPipeline(node);
317
318 // TODO move the logic below to InstanceManager
319 // adds existing instances to the host list
320 deviceService.getPorts(node.integrationBridgeId()).stream()
321 .filter(port -> port.isEnabled() &&
322 !port.number().equals(PortNumber.LOCAL) &&
323 !node.systemInterfaces().contains(port.annotations().value(PORT_NAME)))
324 .forEach(port -> instanceService.addInstance(new ConnectPoint(
325 port.element().id(),
326 port.number()
327 )));
328 // removes stale instances from the host list
329 hostService.getHosts().forEach(host -> {
330 if (deviceService.getPort(
331 host.location().deviceId(),
332 host.location().port()) == null) {
333 instanceService.removeInstance(host.location());
334 }
335 });
336 }
337
338 private class OvsdbDeviceHandler implements DeviceHandler {
339
340 @Override
341 public void connected(Device device) {
342 CordVtnNode node = nodeService.node(device.id());
343 if (node != null) {
344 bootstrapNode(node);
345 }
346 }
347
348 @Override
349 public void disconnected(Device device) {
350 CordVtnNode node = nodeService.node(device.id());
351 if (node != null && node.state() == COMPLETE) {
352 log.debug("Device({}) from {} disconnected", device.id(), node.hostname());
353 deviceAdminService.removeDevice(device.id());
354 }
355 }
356 }
357
358 private class IntegrationBridgeDeviceHandler implements DeviceHandler {
359
360 @Override
361 public void connected(Device device) {
362 CordVtnNode node = nodeService.node(device.id());
363 if (node != null) {
364 bootstrapNode(node);
365 }
366 }
367
368 @Override
369 public void disconnected(Device device) {
370 CordVtnNode node = nodeService.node(device.id());
371 if (node != null) {
372 log.warn("Device({}) from {} disconnected", device.id(), node.hostname());
373 setState(node, INIT);
374 }
375 }
376
377 @Override
378 public void portAdded(Port port) {
379 CordVtnNode node = nodeService.node((DeviceId) port.element().id());
380 String portName = port.annotations().value(PORT_NAME);
381 if (node == null) {
382 log.warn(format(ERR_UNREGISTERED, portName, ERR_DETECTED));
383 return;
384 }
385 if (node.systemInterfaces().contains(portName)) {
386 if (node.state() == DEVICE_CREATED) {
387 bootstrapNode(node);
388 }
389 } else if (node.state() == COMPLETE) {
390 // TODO move this logic to InstanceManager
391 instanceService.addInstance(new ConnectPoint(port.element().id(),
392 port.number()));
393 } else {
394 log.warn(format(ERR_INCOMPLETE, portName, ERR_DETECTED));
395 }
396 }
397
398 @Override
399 public void portRemoved(Port port) {
400 CordVtnNode node = nodeService.node((DeviceId) port.element().id());
401 String portName = port.annotations().value(PORT_NAME);
402 if (node == null) {
403 log.warn(format(ERR_UNREGISTERED, portName, ERR_VANISHED));
404 return;
405 }
406 if (node.systemInterfaces().contains(portName)) {
407 if (node.state() == PORT_CREATED || node.state() == COMPLETE) {
408 // always falls back to INIT state to avoid a mess caused by
409 // the multiple events received out of order
410 setState(node, INIT);
411 }
412 } else if (node.state() == COMPLETE) {
413 // TODO move this logic to InstanceManager
414 instanceService.removeInstance(new ConnectPoint(port.element().id(),
415 port.number()));
416 } else {
417 log.warn(format(ERR_INCOMPLETE, portName, ERR_VANISHED));
418 }
419 }
420 }
421
422 private class InternalDeviceListener implements DeviceListener {
423
424 @Override
425 public void event(DeviceEvent event) {
426 eventExecutor.execute(() -> {
427 NodeId leader = leadershipService.getLeader(appId.name());
428 if (!Objects.equals(localNodeId, leader)) {
429 // do not allow to proceed without leadership
430 return;
431 }
432 handle(event);
433 });
434 }
435
436 private void handle(DeviceEvent event) {
437 DeviceHandler handler = event.subject().type().equals(SWITCH) ?
438 intgBridgeHandler : ovsdbHandler;
439 Device device = event.subject();
440
441 switch (event.type()) {
442 case DEVICE_AVAILABILITY_CHANGED:
443 case DEVICE_ADDED:
444 if (deviceService.isAvailable(device.id())) {
445 log.debug("Device {} is connected", device.id());
446 handler.connected(device);
447 } else {
448 log.debug("Device {} is disconnected", device.id());
449 handler.disconnected(device);
450 }
451 break;
452 case PORT_ADDED:
453 log.debug("Port {} is added to {}",
454 event.port().annotations().value(PORT_NAME),
455 device.id());
456 handler.portAdded(event.port());
457 break;
458 case PORT_UPDATED:
459 if (event.port().isEnabled()) {
460 log.debug("Port {} is added to {}",
461 event.port().annotations().value(PORT_NAME),
462 device.id());
463 handler.portAdded(event.port());
464 } else {
465 log.debug("Port {} is removed from {}",
466 event.port().annotations().value(PORT_NAME),
467 device.id());
468 handler.portRemoved(event.port());
469 }
470 break;
471 case PORT_REMOVED:
472 log.debug("Port {} is removed from {}",
473 event.port().annotations().value(PORT_NAME),
474 device.id());
475 handler.portRemoved(event.port());
476 break;
477 default:
478 break;
479 }
480 }
481 }
482
483 private Set<String> activePorts(DeviceId deviceId) {
484 Set<String> activePorts = deviceService.getPorts(deviceId).stream()
485 .filter(Port::isEnabled)
486 .map(port -> port.annotations().value(PORT_NAME))
487 .collect(Collectors.toSet());
488 return ImmutableSet.copyOf(activePorts);
489 }
490
491 private boolean isIpAddressSet(CordVtnNode node) {
492 Session session = connect(node.sshInfo());
493 if (session == null) {
494 log.warn("Failed to SSH to {}", node.hostname());
495 return false;
496 }
497 Set<IpAddress> intBrIps = getCurrentIps(session, INTEGRATION_BRIDGE);
498 boolean result = getCurrentIps(session, node.dataInterface()).isEmpty() &&
499 isInterfaceUp(session, node.dataInterface()) &&
500 intBrIps.contains(node.dataIp().ip()) &&
501 intBrIps.contains(node.localManagementIp().ip()) &&
502 isInterfaceUp(session, INTEGRATION_BRIDGE);
503
504 disconnect(session);
505 return result;
506 }
507
508 private boolean isCurrentStateDone(CordVtnNode node) {
509 switch (node.state()) {
510 case INIT:
511 return deviceService.isAvailable(node.integrationBridgeId());
512 case DEVICE_CREATED:
513 Set<String> activePorts = activePorts(node.integrationBridgeId());
514 return node.systemInterfaces().stream().allMatch(activePorts::contains);
515 case PORT_CREATED:
516 return isIpAddressSet(node);
517 case COMPLETE:
518 return false;
519 default:
520 return false;
521 }
522 }
523
524 private void setState(CordVtnNode node, CordVtnNodeState newState) {
525 if (node.state() == newState) {
526 return;
527 }
528 CordVtnNode updated = updatedState(node, newState);
529 nodeAdminService.updateNode(updated);
530 log.info("Changed {} state: {}", node.hostname(), newState);
531 }
532
533 private void bootstrapNode(CordVtnNode node) {
534 if (isCurrentStateDone(node)) {
535 setState(node, node.state().nextState());
536 } else {
537 node.state().process(this, node);
538 }
539 }
540
541 private class InternalCordVtnNodeListener implements CordVtnNodeListener {
542
543 @Override
544 public void event(CordVtnNodeEvent event) {
545 eventExecutor.execute(() -> {
546 NodeId leader = leadershipService.getLeader(appId.name());
547 if (!Objects.equals(localNodeId, leader)) {
548 // do not allow to proceed without leadership
549 return;
550 }
551 handle(event);
552 });
553 }
554
555 private void handle(CordVtnNodeEvent event) {
556 switch (event.type()) {
557 case NODE_CREATED:
558 case NODE_UPDATED:
559 bootstrapNode(event.subject());
560 break;
561 case NODE_REMOVED:
562 case NODE_COMPLETE:
563 case NODE_INCOMPLETE:
564 default:
565 // do nothing
566 break;
567 }
568 }
569 }
570
571 private void readControllers() {
572 CordVtnConfig config = configService.getConfig(appId, CordVtnConfig.class);
573 if (config == null) {
574 log.warn("No configuration found");
575 return;
576 }
577 controllers = config.controllers();
578 controllers.forEach(ctrl -> {
579 log.debug("Added controller {}:{}", ctrl.ip(), ctrl.port());
580 });
581 }
582
583 private class InternalConfigListener implements NetworkConfigListener {
584
585 @Override
586 public void event(NetworkConfigEvent event) {
587 if (!event.configClass().equals(CordVtnConfig.class)) {
588 return;
589 }
590
591 switch (event.type()) {
592 case CONFIG_ADDED:
593 case CONFIG_UPDATED:
594 readControllers();
595 break;
596 default:
597 break;
598 }
599 }
600 }
601}