blob: a19360173ea29e4167da54be327e0ec212a079ad [file] [log] [blame]
Hyunsun Moon187bf532017-01-19 10:57:40 +09001/*
Brian O'Connor80dff972017-08-03 22:46:30 -07002 * Copyright 2017-present Open Networking Foundation
Hyunsun Moon187bf532017-01-19 10:57:40 +09003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.cordvtn.impl;
17
18import com.google.common.collect.ImmutableMap;
19import com.google.common.collect.ImmutableSet;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.core.ApplicationId;
28import org.onosproject.core.CoreService;
29import org.onosproject.store.AbstractStore;
30import org.onosproject.store.serializers.KryoNamespaces;
31import org.onosproject.store.service.ConsistentMap;
32import org.onosproject.store.service.MapEvent;
33import org.onosproject.store.service.MapEventListener;
34import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.opencord.cordvtn.api.core.ServiceNetworkEvent;
38import org.opencord.cordvtn.api.core.ServiceNetworkStore;
39import org.opencord.cordvtn.api.core.ServiceNetworkStoreDelegate;
40import org.opencord.cordvtn.api.net.AddressPair;
41import org.opencord.cordvtn.api.net.NetworkId;
42import org.opencord.cordvtn.api.net.PortId;
43import org.opencord.cordvtn.api.net.Provider;
44import org.opencord.cordvtn.api.net.SegmentId;
45import org.opencord.cordvtn.api.net.ServiceNetwork;
46import org.opencord.cordvtn.api.net.ServiceNetwork.DependencyType;
47import org.opencord.cordvtn.api.net.ServicePort;
48import org.slf4j.Logger;
49
50import java.util.Collections;
51import java.util.Map;
52import java.util.Set;
53import java.util.concurrent.ExecutorService;
54import java.util.stream.Collectors;
55
56import static com.google.common.base.Preconditions.checkArgument;
57import static java.util.concurrent.Executors.newSingleThreadExecutor;
58import static org.onlab.util.Tools.groupedThreads;
59import static org.opencord.cordvtn.api.Constants.CORDVTN_APP_ID;
60import static org.opencord.cordvtn.api.core.ServiceNetworkEvent.Type.*;
61import static org.slf4j.LoggerFactory.getLogger;
62
63/**
64 * Manages the inventory of VTN networks using a {@code ConsistentMap}.
65 */
66@Component(immediate = true)
67@Service
68public class DistributedServiceNetworkStore extends AbstractStore<ServiceNetworkEvent, ServiceNetworkStoreDelegate>
69 implements ServiceNetworkStore {
70
71 protected final Logger log = getLogger(getClass());
72
73 private static final String ERR_NOT_FOUND = " does not exist";
74 private static final String ERR_DUPLICATE = " already exists";
75
76 private static final KryoNamespace SERIALIZER_SERVICE = KryoNamespace.newBuilder()
77 .register(KryoNamespaces.API)
78 .register(ServiceNetwork.class)
79 .register(DefaultServiceNetwork.class)
80 .register(NetworkId.class)
81 .register(SegmentId.class)
82 .register(ServiceNetwork.NetworkType.class)
83 .register(DependencyType.class)
84 .register(ServicePort.class)
85 .register(DefaultServicePort.class)
86 .register(PortId.class)
87 .register(AddressPair.class)
88 .register(Collections.EMPTY_MAP.getClass())
89 .register(Collections.EMPTY_SET.getClass())
90 .build();
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected CoreService coreService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected StorageService storageService;
97
Hyunsun Moon2c3f0ee2017-04-06 16:47:21 +090098 private final ExecutorService eventExecutor = newSingleThreadExecutor(
Hyunsun Moon187bf532017-01-19 10:57:40 +090099 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
100 private final MapEventListener<PortId, ServicePort> servicePortListener =
101 new ServicePortMapListener();
102 private final MapEventListener<NetworkId, ServiceNetwork> serviceNetworkListener =
103 new ServiceNetworkMapListener();
104
105 private ConsistentMap<NetworkId, ServiceNetwork> serviceNetworkStore;
106 private ConsistentMap<PortId, ServicePort> servicePortStore;
107
108 @Activate
109 protected void activate() {
110 ApplicationId appId = coreService.registerApplication(CORDVTN_APP_ID);
Hyunsun Moon187bf532017-01-19 10:57:40 +0900111 serviceNetworkStore = storageService.<NetworkId, ServiceNetwork>consistentMapBuilder()
112 .withSerializer(Serializer.using(SERIALIZER_SERVICE))
113 .withName("cordvtn-servicenetstore")
114 .withApplicationId(appId)
115 .build();
116 serviceNetworkStore.addListener(serviceNetworkListener);
117
118 servicePortStore = storageService.<PortId, ServicePort>consistentMapBuilder()
119 .withSerializer(Serializer.using(SERIALIZER_SERVICE))
120 .withName("cordvtn-serviceportstore")
121 .withApplicationId(appId)
122 .build();
123 servicePortStore.addListener(servicePortListener);
124
125 log.info("Started");
126 }
127
128 @Deactivate
129 protected void deactivate() {
130 serviceNetworkStore.removeListener(serviceNetworkListener);
131 servicePortStore.removeListener(servicePortListener);
132
133 log.info("Stopped");
134 }
135
136 @Override
137 public void clear() {
138 synchronized (this) {
139 serviceNetworkStore.clear();
140 servicePortStore.clear();
141 }
142 }
143
144 @Override
145 public void createServiceNetwork(ServiceNetwork snet) {
146 serviceNetworkStore.compute(snet.id(), (id, existing) -> {
147 final String error = snet.name() + ERR_DUPLICATE;
148 checkArgument(existing == null || existing.equals(snet), error);
149 return snet;
150 });
151 }
152
153 @Override
154 public void updateServiceNetwork(ServiceNetwork snet) {
155 serviceNetworkStore.compute(snet.id(), (id, existing) -> {
156 final String error = snet.name() + ERR_NOT_FOUND;
157 checkArgument(existing != null, error);
158 return snet;
159 });
160 }
161
162 @Override
163 public ServiceNetwork removeServiceNetwork(NetworkId netId) {
164 synchronized (this) {
165 Versioned<ServiceNetwork> snet = serviceNetworkStore.remove(netId);
166 return snet == null ? null : snet.value();
167 }
168 }
169
170 @Override
171 public ServiceNetwork serviceNetwork(NetworkId netId) {
172 Versioned<ServiceNetwork> versioned = serviceNetworkStore.get(netId);
173 return versioned == null ? null : versioned.value();
174 }
175
176 @Override
177 public Set<ServiceNetwork> serviceNetworks() {
178 Set<ServiceNetwork> snets = serviceNetworkStore.values().stream()
179 .map(Versioned::value)
180 .collect(Collectors.toSet());
181 return ImmutableSet.copyOf(snets);
182 }
183
184 @Override
185 public void createServicePort(ServicePort sport) {
186 servicePortStore.compute(sport.id(), (id, existing) -> {
187 final String error = sport.id().id() + ERR_DUPLICATE;
188 checkArgument(existing == null || existing.equals(sport), error);
189 return sport;
190 });
191 }
192
193 @Override
194 public void updateServicePort(ServicePort sport) {
195 servicePortStore.compute(sport.id(), (id, existing) -> {
196 final String error = sport.id().id() + ERR_NOT_FOUND;
197 checkArgument(existing != null, error);
198 return sport;
199 });
200 }
201
202 @Override
203 public ServicePort removeServicePort(PortId portId) {
204 Versioned<ServicePort> sport = servicePortStore.remove(portId);
205 return sport.value();
206 }
207
208 @Override
209 public ServicePort servicePort(PortId portId) {
210 Versioned<ServicePort> versioned = servicePortStore.get(portId);
211 return versioned == null ? null : versioned.value();
212 }
213
214 @Override
215 public Set<ServicePort> servicePorts() {
216 Set<ServicePort> sports = servicePortStore.values().stream()
217 .map(Versioned::value)
218 .collect(Collectors.toSet());
219 return ImmutableSet.copyOf(sports);
220 }
221
222 private class ServiceNetworkMapListener implements MapEventListener<NetworkId, ServiceNetwork> {
223
224 @Override
225 public void event(MapEvent<NetworkId, ServiceNetwork> event) {
226 switch (event.type()) {
227 case UPDATE:
228 log.debug("Service network updated {}", event.newValue());
229 eventExecutor.execute(() -> {
230 notifyDelegate(new ServiceNetworkEvent(
231 SERVICE_NETWORK_UPDATED,
232 event.newValue().value()));
233 notifyProviderUpdate(
234 event.oldValue().value(),
235 event.newValue().value());
236 });
237 break;
238 case INSERT:
239 log.debug("Service network created {}", event.newValue());
240 eventExecutor.execute(() -> {
241 notifyDelegate(new ServiceNetworkEvent(
242 SERVICE_NETWORK_CREATED,
243 event.newValue().value()));
244 notifyProviderUpdate(null, event.newValue().value());
245 });
246 break;
247 case REMOVE:
248 log.debug("Service network removed {}", event.oldValue());
249 eventExecutor.execute(() -> {
250 notifyProviderUpdate(event.oldValue().value(), null);
251 notifyDelegate(new ServiceNetworkEvent(
252 SERVICE_NETWORK_REMOVED,
253 event.oldValue().value()));
254 });
255 break;
256 default:
257 log.error("Unsupported event type");
258 break;
259 }
260 }
261
262 private void notifyProviderUpdate(ServiceNetwork oldValue, ServiceNetwork newValue) {
263 Map<NetworkId, DependencyType> oldp =
264 oldValue != null ? oldValue.providers() : ImmutableMap.of();
265 Map<NetworkId, DependencyType> newp =
266 newValue != null ? newValue.providers() : ImmutableMap.of();
267
268 oldp.entrySet().stream().filter(p -> !newp.keySet().contains(p.getKey()))
269 .forEach(p -> {
270 Provider providerNet = Provider.builder()
271 .provider(serviceNetwork(p.getKey()))
272 .type(p.getValue())
273 .build();
274 notifyDelegate(new ServiceNetworkEvent(
275 SERVICE_NETWORK_PROVIDER_REMOVED,
276 oldValue,
277 providerNet
278 ));
279 });
280
281 newp.entrySet().stream().filter(p -> !oldp.keySet().contains(p.getKey()))
282 .forEach(p -> {
283 Provider providerNet = Provider.builder()
284 .provider(serviceNetwork(p.getKey()))
285 .type(p.getValue())
286 .build();
287 notifyDelegate(new ServiceNetworkEvent(
288 SERVICE_NETWORK_PROVIDER_ADDED,
289 newValue,
290 providerNet));
291 });
292 }
293 }
294
295 private class ServicePortMapListener implements MapEventListener<PortId, ServicePort> {
296
297 @Override
298 public void event(MapEvent<PortId, ServicePort> event) {
299 switch (event.type()) {
300 case UPDATE:
301 log.debug("Service port updated {}", event.newValue());
302 eventExecutor.execute(() -> {
303 notifyDelegate(new ServiceNetworkEvent(
304 SERVICE_PORT_UPDATED,
305 serviceNetwork(event.newValue().value().networkId()),
306 event.newValue().value()));
307 });
308 break;
309 case INSERT:
310 log.debug("Service port created {}", event.newValue());
311 eventExecutor.execute(() -> {
312 notifyDelegate(new ServiceNetworkEvent(
313 SERVICE_PORT_CREATED,
314 serviceNetwork(event.newValue().value().networkId()),
315 event.newValue().value()));
316 });
317 break;
318 case REMOVE:
319 log.debug("Service port removed {}", event.oldValue());
320 eventExecutor.execute(() -> {
321 notifyDelegate(new ServiceNetworkEvent(
322 SERVICE_PORT_REMOVED,
323 serviceNetwork(event.oldValue().value().networkId()),
324 event.oldValue().value()));
325 });
326 break;
327 default:
328 log.error("Unsupported event type");
329 break;
330 }
331 }
332 }
333}