blob: 9c7175a6b594ea87c293b3e89f2d15ea52b3272d [file] [log] [blame]
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opencord.cordvtn.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.jcraft.jsch.Session;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.behaviour.BridgeConfig;
import org.onosproject.net.behaviour.BridgeDescription;
import org.onosproject.net.behaviour.BridgeName;
import org.onosproject.net.behaviour.ControllerInfo;
import org.onosproject.net.behaviour.DefaultBridgeDescription;
import org.onosproject.net.behaviour.DefaultTunnelDescription;
import org.onosproject.net.behaviour.InterfaceConfig;
import org.onosproject.net.behaviour.TunnelDescription;
import org.onosproject.net.behaviour.TunnelEndPoints;
import org.onosproject.net.behaviour.TunnelKeys;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigService;
import org.onosproject.net.device.DeviceAdminService;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.host.HostService;
import org.onosproject.ovsdb.controller.OvsdbClientService;
import org.onosproject.ovsdb.controller.OvsdbController;
import org.onosproject.ovsdb.controller.OvsdbNodeId;
import org.opencord.cordvtn.api.CordVtnConfig;
import org.opencord.cordvtn.api.core.CordVtnPipeline;
import org.opencord.cordvtn.api.core.InstanceService;
import org.opencord.cordvtn.api.node.CordVtnNode;
import org.opencord.cordvtn.api.node.CordVtnNodeAdminService;
import org.opencord.cordvtn.api.node.CordVtnNodeEvent;
import org.opencord.cordvtn.api.node.CordVtnNodeHandler;
import org.opencord.cordvtn.api.node.CordVtnNodeListener;
import org.opencord.cordvtn.api.node.CordVtnNodeService;
import org.opencord.cordvtn.api.node.CordVtnNodeState;
import org.opencord.cordvtn.api.node.DeviceHandler;
import org.slf4j.Logger;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.AnnotationKeys.PORT_NAME;
import static org.onosproject.net.Device.Type.SWITCH;
import static org.onosproject.net.behaviour.TunnelDescription.Type.VXLAN;
import static org.opencord.cordvtn.api.Constants.CORDVTN_APP_ID;
import static org.opencord.cordvtn.api.Constants.DEFAULT_TUNNEL;
import static org.opencord.cordvtn.api.Constants.INTEGRATION_BRIDGE;
import static org.opencord.cordvtn.api.node.CordVtnNodeState.*;
import static org.opencord.cordvtn.impl.DefaultCordVtnNode.updatedState;
import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.*;
import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.disconnect;
import static org.opencord.cordvtn.impl.RemoteIpCommandUtil.isInterfaceUp;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of the {@link CordVtnNodeHandler} with OVSDB and SSH exec channel.
*/
@Component(immediate = true)
@Service
public class DefaultCordVtnNodeHandler implements CordVtnNodeHandler {
protected final Logger log = getLogger(getClass());
private static final String ERR_INCOMPLETE = "%s is %s from incomplete node, ignore it";
private static final String ERR_UNREGISTERED = "%s is %s from unregistered node, ignore it";
private static final String ERR_DETECTED = "detected";
private static final String ERR_VANISHED = "vanished";
private static final int DPID_BEGIN = 3;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigService configService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceAdminService deviceAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OvsdbController ovsdbController;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CordVtnNodeService nodeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CordVtnNodeAdminService nodeAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected InstanceService instanceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CordVtnPipeline pipelineService;
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final NetworkConfigListener configListener = new InternalConfigListener();
private final DeviceListener deviceListener = new InternalDeviceListener();
private final DeviceHandler ovsdbHandler = new OvsdbDeviceHandler();
private final DeviceHandler intgBridgeHandler = new IntegrationBridgeDeviceHandler();
private final CordVtnNodeListener nodeListener = new InternalCordVtnNodeListener();
private ApplicationId appId;
private NodeId localNodeId;
private List<ControllerInfo> controllers = ImmutableList.of();
@Activate
protected void activate() {
appId = coreService.registerApplication(CORDVTN_APP_ID);
leadershipService.runForLeadership(appId.name());
localNodeId = clusterService.getLocalNode().id();
configService.addListener(configListener);
deviceService.addListener(deviceListener);
nodeService.addListener(nodeListener);
readControllers();
log.info("Started");
}
@Deactivate
protected void deactivate() {
nodeService.removeListener(nodeListener);
deviceService.removeListener(deviceListener);
configService.removeListener(configListener);
leadershipService.withdraw(appId.name());
eventExecutor.shutdown();
log.info("Stopped");
}
@Override
public void processInitState(CordVtnNode node) {
if (!isOvsdbConnected(node)) {
ovsdbController.connect(node.hostManagementIp().ip(), node.ovsdbPort());
return;
}
createIntegrationBridge(node);
}
@Override
public void processDeviceCreatedState(CordVtnNode node) {
if (!isOvsdbConnected(node)) {
ovsdbController.connect(node.hostManagementIp().ip(), node.ovsdbPort());
return;
}
createTunnelInterface(node);
addSystemInterface(node, node.dataInterface());
if (node.hostManagementInterface() != null) {
addSystemInterface(node, node.hostManagementInterface());
}
}
@Override
public void processPortCreatedState(CordVtnNode node) {
configureInterface(node);
}
@Override
public void processCompleteState(CordVtnNode node) {
OvsdbClientService ovsdbClient = ovsdbController.getOvsdbClient(
new OvsdbNodeId(
node.hostManagementIp().ip(),
node.ovsdbPort().toInt()));
if (ovsdbClient != null && ovsdbClient.isConnected()) {
ovsdbClient.disconnect();
}
// TODO fix postInit to be done in the proper services
postInit(node);
log.info("Finished init {}", node.hostname());
}
private boolean isOvsdbConnected(CordVtnNode node) {
OvsdbClientService ovsdbClient = ovsdbController.getOvsdbClient(
new OvsdbNodeId(
node.hostManagementIp().ip(),
node.ovsdbPort().toInt()));
return deviceService.isAvailable(node.ovsdbId()) && ovsdbClient != null &&
ovsdbClient.isConnected();
}
private void createIntegrationBridge(CordVtnNode node) {
Device device = deviceService.getDevice(node.ovsdbId());
if (device == null || !device.is(BridgeConfig.class)) {
log.error("Failed to create integration bridge on {}", node.ovsdbId());
return;
}
String dpid = node.integrationBridgeId().toString().substring(DPID_BEGIN);
BridgeDescription bridgeDesc = DefaultBridgeDescription.builder()
.name(INTEGRATION_BRIDGE)
.failMode(BridgeDescription.FailMode.SECURE)
.datapathId(dpid)
.disableInBand()
.controllers(controllers)
.build();
BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
bridgeConfig.addBridge(bridgeDesc);
}
private void createTunnelInterface(CordVtnNode node) {
Device device = deviceService.getDevice(node.ovsdbId());
if (device == null || !device.is(InterfaceConfig.class)) {
log.error("Failed to create tunnel interface on {}", node.ovsdbId());
return;
}
TunnelDescription tunnelDesc = DefaultTunnelDescription.builder()
.deviceId(INTEGRATION_BRIDGE)
.ifaceName(DEFAULT_TUNNEL)
.type(VXLAN)
.remote(TunnelEndPoints.flowTunnelEndpoint())
.key(TunnelKeys.flowTunnelKey())
.build();
InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
ifaceConfig.addTunnelMode(DEFAULT_TUNNEL, tunnelDesc);
}
private void addSystemInterface(CordVtnNode node, String ifaceName) {
Session session = connect(node.sshInfo());
if (session == null || !isInterfaceUp(session, ifaceName)) {
log.error("Interface {} is not available on {}", ifaceName, node.hostname());
disconnect(session);
return;
} else {
disconnect(session);
}
Device device = deviceService.getDevice(node.ovsdbId());
if (!device.is(BridgeConfig.class)) {
log.error("BridgeConfig is not supported for {}", node.ovsdbId());
return;
}
BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
bridgeConfig.addPort(BridgeName.bridgeName(INTEGRATION_BRIDGE), ifaceName);
}
private void configureInterface(CordVtnNode node) {
Session session = connect(node.sshInfo());
if (session == null) {
log.error("Failed to SSH to {}", node.hostname());
return;
}
getCurrentIps(session, INTEGRATION_BRIDGE).stream()
.filter(ip -> !ip.equals(node.localManagementIp().ip()))
.filter(ip -> !ip.equals(node.dataIp().ip()))
.forEach(ip -> deleteIp(session, ip, INTEGRATION_BRIDGE));
final boolean result = flushIp(session, node.dataInterface()) &&
setInterfaceUp(session, node.dataInterface()) &&
addIp(session, node.dataIp(), INTEGRATION_BRIDGE) &&
addIp(session, node.localManagementIp(), INTEGRATION_BRIDGE) &&
setInterfaceUp(session, INTEGRATION_BRIDGE);
disconnect(session);
if (result) {
bootstrapNode(node);
}
}
private void postInit(CordVtnNode node) {
// TODO move the below line to DefaultCordVtnPipeline
pipelineService.initPipeline(node);
// TODO move the logic below to InstanceManager
// adds existing instances to the host list
deviceService.getPorts(node.integrationBridgeId()).stream()
.filter(port -> port.isEnabled() &&
!port.number().equals(PortNumber.LOCAL) &&
!node.systemInterfaces().contains(port.annotations().value(PORT_NAME)))
.forEach(port -> instanceService.addInstance(new ConnectPoint(
port.element().id(),
port.number()
)));
// removes stale instances from the host list
hostService.getHosts().forEach(host -> {
if (deviceService.getPort(
host.location().deviceId(),
host.location().port()) == null) {
instanceService.removeInstance(host.location());
}
});
}
private class OvsdbDeviceHandler implements DeviceHandler {
@Override
public void connected(Device device) {
CordVtnNode node = nodeService.node(device.id());
if (node != null) {
bootstrapNode(node);
}
}
@Override
public void disconnected(Device device) {
CordVtnNode node = nodeService.node(device.id());
if (node != null && node.state() == COMPLETE) {
log.debug("Device({}) from {} disconnected", device.id(), node.hostname());
deviceAdminService.removeDevice(device.id());
}
}
}
private class IntegrationBridgeDeviceHandler implements DeviceHandler {
@Override
public void connected(Device device) {
CordVtnNode node = nodeService.node(device.id());
if (node != null) {
bootstrapNode(node);
}
}
@Override
public void disconnected(Device device) {
CordVtnNode node = nodeService.node(device.id());
if (node != null) {
log.warn("Device({}) from {} disconnected", device.id(), node.hostname());
setState(node, INIT);
}
}
@Override
public void portAdded(Port port) {
CordVtnNode node = nodeService.node((DeviceId) port.element().id());
String portName = port.annotations().value(PORT_NAME);
if (node == null) {
log.warn(format(ERR_UNREGISTERED, portName, ERR_DETECTED));
return;
}
if (node.systemInterfaces().contains(portName)) {
if (node.state() == DEVICE_CREATED) {
bootstrapNode(node);
}
} else if (node.state() == COMPLETE) {
// TODO move this logic to InstanceManager
instanceService.addInstance(new ConnectPoint(port.element().id(),
port.number()));
} else {
log.warn(format(ERR_INCOMPLETE, portName, ERR_DETECTED));
}
}
@Override
public void portRemoved(Port port) {
CordVtnNode node = nodeService.node((DeviceId) port.element().id());
String portName = port.annotations().value(PORT_NAME);
if (node == null) {
log.warn(format(ERR_UNREGISTERED, portName, ERR_VANISHED));
return;
}
if (node.systemInterfaces().contains(portName)) {
if (node.state() == PORT_CREATED || node.state() == COMPLETE) {
// always falls back to INIT state to avoid a mess caused by
// the multiple events received out of order
setState(node, INIT);
}
} else if (node.state() == COMPLETE) {
// TODO move this logic to InstanceManager
instanceService.removeInstance(new ConnectPoint(port.element().id(),
port.number()));
} else {
log.warn(format(ERR_INCOMPLETE, portName, ERR_VANISHED));
}
}
}
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
eventExecutor.execute(() -> {
NodeId leader = leadershipService.getLeader(appId.name());
if (!Objects.equals(localNodeId, leader)) {
// do not allow to proceed without leadership
return;
}
handle(event);
});
}
private void handle(DeviceEvent event) {
DeviceHandler handler = event.subject().type().equals(SWITCH) ?
intgBridgeHandler : ovsdbHandler;
Device device = event.subject();
switch (event.type()) {
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_ADDED:
if (deviceService.isAvailable(device.id())) {
log.debug("Device {} is connected", device.id());
handler.connected(device);
} else {
log.debug("Device {} is disconnected", device.id());
handler.disconnected(device);
}
break;
case PORT_ADDED:
log.debug("Port {} is added to {}",
event.port().annotations().value(PORT_NAME),
device.id());
handler.portAdded(event.port());
break;
case PORT_UPDATED:
if (event.port().isEnabled()) {
log.debug("Port {} is added to {}",
event.port().annotations().value(PORT_NAME),
device.id());
handler.portAdded(event.port());
} else {
log.debug("Port {} is removed from {}",
event.port().annotations().value(PORT_NAME),
device.id());
handler.portRemoved(event.port());
}
break;
case PORT_REMOVED:
log.debug("Port {} is removed from {}",
event.port().annotations().value(PORT_NAME),
device.id());
handler.portRemoved(event.port());
break;
default:
break;
}
}
}
private Set<String> activePorts(DeviceId deviceId) {
Set<String> activePorts = deviceService.getPorts(deviceId).stream()
.filter(Port::isEnabled)
.map(port -> port.annotations().value(PORT_NAME))
.collect(Collectors.toSet());
return ImmutableSet.copyOf(activePorts);
}
private boolean isIpAddressSet(CordVtnNode node) {
Session session = connect(node.sshInfo());
if (session == null) {
log.warn("Failed to SSH to {}", node.hostname());
return false;
}
Set<IpAddress> intBrIps = getCurrentIps(session, INTEGRATION_BRIDGE);
boolean result = getCurrentIps(session, node.dataInterface()).isEmpty() &&
isInterfaceUp(session, node.dataInterface()) &&
intBrIps.contains(node.dataIp().ip()) &&
intBrIps.contains(node.localManagementIp().ip()) &&
isInterfaceUp(session, INTEGRATION_BRIDGE);
disconnect(session);
return result;
}
private boolean isCurrentStateDone(CordVtnNode node) {
switch (node.state()) {
case INIT:
return deviceService.isAvailable(node.integrationBridgeId());
case DEVICE_CREATED:
Set<String> activePorts = activePorts(node.integrationBridgeId());
return node.systemInterfaces().stream().allMatch(activePorts::contains);
case PORT_CREATED:
return isIpAddressSet(node);
case COMPLETE:
return false;
default:
return false;
}
}
private void setState(CordVtnNode node, CordVtnNodeState newState) {
if (node.state() == newState) {
return;
}
CordVtnNode updated = updatedState(node, newState);
nodeAdminService.updateNode(updated);
log.info("Changed {} state: {}", node.hostname(), newState);
}
private void bootstrapNode(CordVtnNode node) {
if (isCurrentStateDone(node)) {
setState(node, node.state().nextState());
} else {
node.state().process(this, node);
}
}
private class InternalCordVtnNodeListener implements CordVtnNodeListener {
@Override
public void event(CordVtnNodeEvent event) {
eventExecutor.execute(() -> {
NodeId leader = leadershipService.getLeader(appId.name());
if (!Objects.equals(localNodeId, leader)) {
// do not allow to proceed without leadership
return;
}
handle(event);
});
}
private void handle(CordVtnNodeEvent event) {
switch (event.type()) {
case NODE_CREATED:
case NODE_UPDATED:
bootstrapNode(event.subject());
break;
case NODE_REMOVED:
case NODE_COMPLETE:
case NODE_INCOMPLETE:
default:
// do nothing
break;
}
}
}
private void readControllers() {
CordVtnConfig config = configService.getConfig(appId, CordVtnConfig.class);
if (config == null) {
return;
}
controllers = config.controllers();
controllers.forEach(ctrl -> {
log.debug("Added controller {}:{}", ctrl.ip(), ctrl.port());
});
}
private class InternalConfigListener implements NetworkConfigListener {
@Override
public void event(NetworkConfigEvent event) {
if (!event.configClass().equals(CordVtnConfig.class)) {
return;
}
switch (event.type()) {
case CONFIG_ADDED:
case CONFIG_UPDATED:
readControllers();
break;
default:
break;
}
}
}
}