Add support to CordMcast for sending multicast sink ports to remote cluster.
Change-Id: Ib915c68218033e1dcfa6f738a629c2d1d8442261
diff --git a/src/main/java/org/onosproject/cordmcast/CordMcast.java b/src/main/java/org/onosproject/cordmcast/CordMcast.java
index 19b4e2d..13500dd 100644
--- a/src/main/java/org/onosproject/cordmcast/CordMcast.java
+++ b/src/main/java/org/onosproject/cordmcast/CordMcast.java
@@ -37,17 +37,12 @@
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.codec.CodecService;
import org.onosproject.codec.JsonCodec;
-import org.onosproject.cordconfig.access.AccessDeviceConfig;
+import org.onosproject.cordconfig.access.AccessAgentData;
import org.onosproject.cordconfig.access.AccessDeviceData;
+import org.onosproject.cordconfig.access.CordConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.config.ConfigFactory;
-import org.onosproject.net.config.NetworkConfigEvent;
-import org.onosproject.net.config.NetworkConfigListener;
-import org.onosproject.net.config.NetworkConfigRegistry;
-import org.onosproject.net.config.basics.SubjectFactories;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficSelector;
@@ -64,7 +59,6 @@
import org.onosproject.net.mcast.McastRoute;
import org.onosproject.net.mcast.McastRouteInfo;
import org.onosproject.net.mcast.MulticastRouteService;
-
import org.onosproject.rest.AbstractWebResource;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
@@ -80,8 +74,8 @@
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -98,8 +92,9 @@
@Component(immediate = true)
public class CordMcast {
+ private final Logger log = getLogger(getClass());
- private static final int DEFAULT_REST_TIMEOUT_MS = 2000;
+ private static final int DEFAULT_REST_TIMEOUT_MS = 1000;
private static final int DEFAULT_PRIORITY = 500;
private static final short DEFAULT_MCAST_VLAN = 4000;
private static final String DEFAULT_SYNC_HOST = "10.90.0.8:8181";
@@ -107,8 +102,6 @@
private static final String DEFAULT_PASSWORD = "karaf";
private static final boolean DEFAULT_VLAN_ENABLED = true;
- private final Logger log = getLogger(getClass());
-
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MulticastRouteService mcastService;
@@ -125,11 +118,9 @@
protected ComponentConfigService componentConfigService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected NetworkConfigRegistry networkConfig;
+ protected CordConfigService cordConfigService;
protected McastListener listener = new InternalMulticastListener();
- private InternalNetworkConfigListener configListener =
- new InternalNetworkConfigListener();
//TODO: move this to a ec map
private Map<IpAddress, Integer> groups = Maps.newConcurrentMap();
@@ -162,20 +153,6 @@
private String fabricOnosUrl;
- private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
-
- private static final Class<AccessDeviceConfig> CONFIG_CLASS =
- AccessDeviceConfig.class;
-
- private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory =
- new ConfigFactory<DeviceId, AccessDeviceConfig>(
- SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
- @Override
- public AccessDeviceConfig createConfig() {
- return new AccessDeviceConfig();
- }
- };
-
@Activate
public void activate(ComponentContext context) {
componentConfigService.registerProperties(getClass());
@@ -183,23 +160,8 @@
appId = coreService.registerApplication("org.onosproject.cordmcast");
-
clearRemoteRoutes();
- networkConfig.registerConfigFactory(configFactory);
- networkConfig.addListener(configListener);
-
- networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
- subject -> {
- AccessDeviceConfig config = networkConfig.getConfig(subject, AccessDeviceConfig.class);
- if (config != null) {
- AccessDeviceData data = config.getOlt();
- oltData.put(data.deviceId(), data);
- }
- }
- );
-
-
mcastService.addListener(listener);
mcastService.getRoutes().stream()
@@ -215,8 +177,6 @@
public void deactivate() {
componentConfigService.unregisterProperties(getClass(), false);
mcastService.removeListener(listener);
- networkConfig.unregisterConfigFactory(configFactory);
- networkConfig.removeListener(configListener);
log.info("Stopped");
}
@@ -323,9 +283,9 @@
checkNotNull(route, "Route cannot be null");
checkNotNull(sink, "Sink cannot be null");
- AccessDeviceData oltInfo = oltData.get(sink.deviceId());
+ Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
- if (oltInfo == null) {
+ if (!oltInfo.isPresent()) {
log.warn("Unknown OLT device : {}", sink.deviceId());
return;
}
@@ -359,7 +319,7 @@
flowObjectiveService.next(sink.deviceId(), next);
TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
- .matchInPort(oltInfo.uplink())
+ .matchInPort(oltInfo.get().uplink())
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPDst(g.toIpPrefix());
@@ -420,17 +380,29 @@
flowObjectiveService.next(sink.deviceId(), next);
}
-
- addRemoteRoute(route);
+ addRemoteRoute(route, sink);
}
- private void addRemoteRoute(McastRoute route) {
+ private void addRemoteRoute(McastRoute route, ConnectPoint inPort) {
checkNotNull(route);
if (syncHost == null) {
log.warn("No host configured for synchronization; route will be dropped");
return;
}
+ Optional<AccessAgentData> accessAgent = cordConfigService.getAccessAgent(inPort.deviceId());
+ if (!accessAgent.isPresent()) {
+ log.warn("No accessAgent config found for in port {}", inPort);
+ return;
+ }
+
+ if (!accessAgent.get().getOltConnectPoint(inPort).isPresent()) {
+ log.warn("No OLT configured for in port {}", inPort);
+ return;
+ }
+
+ ConnectPoint oltConnectPoint = accessAgent.get().getOltConnectPoint(inPort).get();
+
log.debug("Sending route {} to other ONOS {}", route, fabricOnosUrl);
Invocation.Builder builder = getClientBuilder(fabricOnosUrl);
@@ -440,6 +412,13 @@
try {
builder.post(Entity.json(json.toString()));
+
+ builder = getClientBuilder(fabricOnosUrl + "/sinks/" + route.group() + "/" + route.source());
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode obj = mapper.createObjectNode();
+ obj.putArray("sinks").add(oltConnectPoint.deviceId() + "/" + oltConnectPoint.port());
+
+ builder.post(Entity.json(obj.toString()));
} catch (ProcessingException e) {
log.warn("Unable to send route to remote controller: {}", e.getMessage());
}
@@ -489,7 +468,7 @@
list.forEach(n -> mcastRoutes.add(
routeCodec.decode((ObjectNode) n, new AbstractWebResource())));
- } catch (IOException e) {
+ } catch (IOException | ProcessingException e) {
log.warn("Error clearing remote routes", e);
}
@@ -508,34 +487,4 @@
return wt.request(JSON_UTF_8.toString());
}
- private class InternalNetworkConfigListener implements NetworkConfigListener {
- @Override
- public void event(NetworkConfigEvent event) {
- switch (event.type()) {
-
- case CONFIG_ADDED:
- case CONFIG_UPDATED:
- AccessDeviceConfig config =
- networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
- if (config != null) {
- oltData.put(config.getOlt().deviceId(), config.getOlt());
- }
-
- break;
- case CONFIG_REGISTERED:
- case CONFIG_UNREGISTERED:
- break;
- case CONFIG_REMOVED:
- oltData.remove(event.subject());
- break;
- default:
- break;
- }
- }
-
- @Override
- public boolean isRelevant(NetworkConfigEvent event) {
- return event.configClass().equals(CONFIG_CLASS);
- }
- }
}