blob: dc98cedd9e79e33901d596a9d5a5da7b19c9ccbf [file] [log] [blame]
Hyunsun Moon187bf532017-01-19 10:57:40 +09001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.cordvtn.impl;
17
18import com.google.common.collect.ImmutableMap;
19import com.google.common.collect.ImmutableSet;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.core.ApplicationId;
28import org.onosproject.core.CoreService;
29import org.onosproject.store.AbstractStore;
30import org.onosproject.store.serializers.KryoNamespaces;
31import org.onosproject.store.service.ConsistentMap;
32import org.onosproject.store.service.MapEvent;
33import org.onosproject.store.service.MapEventListener;
34import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.opencord.cordvtn.api.core.ServiceNetworkEvent;
38import org.opencord.cordvtn.api.core.ServiceNetworkStore;
39import org.opencord.cordvtn.api.core.ServiceNetworkStoreDelegate;
40import org.opencord.cordvtn.api.net.AddressPair;
41import org.opencord.cordvtn.api.net.NetworkId;
42import org.opencord.cordvtn.api.net.PortId;
43import org.opencord.cordvtn.api.net.Provider;
44import org.opencord.cordvtn.api.net.SegmentId;
45import org.opencord.cordvtn.api.net.ServiceNetwork;
46import org.opencord.cordvtn.api.net.ServiceNetwork.DependencyType;
47import org.opencord.cordvtn.api.net.ServicePort;
48import org.slf4j.Logger;
49
50import java.util.Collections;
51import java.util.Map;
52import java.util.Set;
53import java.util.concurrent.ExecutorService;
54import java.util.stream.Collectors;
55
56import static com.google.common.base.Preconditions.checkArgument;
57import static java.util.concurrent.Executors.newSingleThreadExecutor;
58import static org.onlab.util.Tools.groupedThreads;
59import static org.opencord.cordvtn.api.Constants.CORDVTN_APP_ID;
60import static org.opencord.cordvtn.api.core.ServiceNetworkEvent.Type.*;
61import static org.slf4j.LoggerFactory.getLogger;
62
63/**
64 * Manages the inventory of VTN networks using a {@code ConsistentMap}.
65 */
66@Component(immediate = true)
67@Service
68public class DistributedServiceNetworkStore extends AbstractStore<ServiceNetworkEvent, ServiceNetworkStoreDelegate>
69 implements ServiceNetworkStore {
70
71 protected final Logger log = getLogger(getClass());
72
73 private static final String ERR_NOT_FOUND = " does not exist";
74 private static final String ERR_DUPLICATE = " already exists";
75
76 private static final KryoNamespace SERIALIZER_SERVICE = KryoNamespace.newBuilder()
77 .register(KryoNamespaces.API)
78 .register(ServiceNetwork.class)
79 .register(DefaultServiceNetwork.class)
80 .register(NetworkId.class)
81 .register(SegmentId.class)
82 .register(ServiceNetwork.NetworkType.class)
83 .register(DependencyType.class)
84 .register(ServicePort.class)
85 .register(DefaultServicePort.class)
86 .register(PortId.class)
87 .register(AddressPair.class)
88 .register(Collections.EMPTY_MAP.getClass())
89 .register(Collections.EMPTY_SET.getClass())
90 .build();
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected CoreService coreService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected StorageService storageService;
97
98 private final ExecutorService eventExecutor = newSingleThreadExecutor(
99 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
100 private final MapEventListener<PortId, ServicePort> servicePortListener =
101 new ServicePortMapListener();
102 private final MapEventListener<NetworkId, ServiceNetwork> serviceNetworkListener =
103 new ServiceNetworkMapListener();
104
105 private ConsistentMap<NetworkId, ServiceNetwork> serviceNetworkStore;
106 private ConsistentMap<PortId, ServicePort> servicePortStore;
107
108 @Activate
109 protected void activate() {
110 ApplicationId appId = coreService.registerApplication(CORDVTN_APP_ID);
111
112 serviceNetworkStore = storageService.<NetworkId, ServiceNetwork>consistentMapBuilder()
113 .withSerializer(Serializer.using(SERIALIZER_SERVICE))
114 .withName("cordvtn-servicenetstore")
115 .withApplicationId(appId)
116 .build();
117 serviceNetworkStore.addListener(serviceNetworkListener);
118
119 servicePortStore = storageService.<PortId, ServicePort>consistentMapBuilder()
120 .withSerializer(Serializer.using(SERIALIZER_SERVICE))
121 .withName("cordvtn-serviceportstore")
122 .withApplicationId(appId)
123 .build();
124 servicePortStore.addListener(servicePortListener);
125
126 log.info("Started");
127 }
128
129 @Deactivate
130 protected void deactivate() {
131 serviceNetworkStore.removeListener(serviceNetworkListener);
132 servicePortStore.removeListener(servicePortListener);
133
134 log.info("Stopped");
135 }
136
137 @Override
138 public void clear() {
139 synchronized (this) {
140 serviceNetworkStore.clear();
141 servicePortStore.clear();
142 }
143 }
144
145 @Override
146 public void createServiceNetwork(ServiceNetwork snet) {
147 serviceNetworkStore.compute(snet.id(), (id, existing) -> {
148 final String error = snet.name() + ERR_DUPLICATE;
149 checkArgument(existing == null || existing.equals(snet), error);
150 return snet;
151 });
152 }
153
154 @Override
155 public void updateServiceNetwork(ServiceNetwork snet) {
156 serviceNetworkStore.compute(snet.id(), (id, existing) -> {
157 final String error = snet.name() + ERR_NOT_FOUND;
158 checkArgument(existing != null, error);
159 return snet;
160 });
161 }
162
163 @Override
164 public ServiceNetwork removeServiceNetwork(NetworkId netId) {
165 synchronized (this) {
166 Versioned<ServiceNetwork> snet = serviceNetworkStore.remove(netId);
167 return snet == null ? null : snet.value();
168 }
169 }
170
171 @Override
172 public ServiceNetwork serviceNetwork(NetworkId netId) {
173 Versioned<ServiceNetwork> versioned = serviceNetworkStore.get(netId);
174 return versioned == null ? null : versioned.value();
175 }
176
177 @Override
178 public Set<ServiceNetwork> serviceNetworks() {
179 Set<ServiceNetwork> snets = serviceNetworkStore.values().stream()
180 .map(Versioned::value)
181 .collect(Collectors.toSet());
182 return ImmutableSet.copyOf(snets);
183 }
184
185 @Override
186 public void createServicePort(ServicePort sport) {
187 servicePortStore.compute(sport.id(), (id, existing) -> {
188 final String error = sport.id().id() + ERR_DUPLICATE;
189 checkArgument(existing == null || existing.equals(sport), error);
190 return sport;
191 });
192 }
193
194 @Override
195 public void updateServicePort(ServicePort sport) {
196 servicePortStore.compute(sport.id(), (id, existing) -> {
197 final String error = sport.id().id() + ERR_NOT_FOUND;
198 checkArgument(existing != null, error);
199 return sport;
200 });
201 }
202
203 @Override
204 public ServicePort removeServicePort(PortId portId) {
205 Versioned<ServicePort> sport = servicePortStore.remove(portId);
206 return sport.value();
207 }
208
209 @Override
210 public ServicePort servicePort(PortId portId) {
211 Versioned<ServicePort> versioned = servicePortStore.get(portId);
212 return versioned == null ? null : versioned.value();
213 }
214
215 @Override
216 public Set<ServicePort> servicePorts() {
217 Set<ServicePort> sports = servicePortStore.values().stream()
218 .map(Versioned::value)
219 .collect(Collectors.toSet());
220 return ImmutableSet.copyOf(sports);
221 }
222
223 private class ServiceNetworkMapListener implements MapEventListener<NetworkId, ServiceNetwork> {
224
225 @Override
226 public void event(MapEvent<NetworkId, ServiceNetwork> event) {
227 switch (event.type()) {
228 case UPDATE:
229 log.debug("Service network updated {}", event.newValue());
230 eventExecutor.execute(() -> {
231 notifyDelegate(new ServiceNetworkEvent(
232 SERVICE_NETWORK_UPDATED,
233 event.newValue().value()));
234 notifyProviderUpdate(
235 event.oldValue().value(),
236 event.newValue().value());
237 });
238 break;
239 case INSERT:
240 log.debug("Service network created {}", event.newValue());
241 eventExecutor.execute(() -> {
242 notifyDelegate(new ServiceNetworkEvent(
243 SERVICE_NETWORK_CREATED,
244 event.newValue().value()));
245 notifyProviderUpdate(null, event.newValue().value());
246 });
247 break;
248 case REMOVE:
249 log.debug("Service network removed {}", event.oldValue());
250 eventExecutor.execute(() -> {
251 notifyProviderUpdate(event.oldValue().value(), null);
252 notifyDelegate(new ServiceNetworkEvent(
253 SERVICE_NETWORK_REMOVED,
254 event.oldValue().value()));
255 });
256 break;
257 default:
258 log.error("Unsupported event type");
259 break;
260 }
261 }
262
263 private void notifyProviderUpdate(ServiceNetwork oldValue, ServiceNetwork newValue) {
264 Map<NetworkId, DependencyType> oldp =
265 oldValue != null ? oldValue.providers() : ImmutableMap.of();
266 Map<NetworkId, DependencyType> newp =
267 newValue != null ? newValue.providers() : ImmutableMap.of();
268
269 oldp.entrySet().stream().filter(p -> !newp.keySet().contains(p.getKey()))
270 .forEach(p -> {
271 Provider providerNet = Provider.builder()
272 .provider(serviceNetwork(p.getKey()))
273 .type(p.getValue())
274 .build();
275 notifyDelegate(new ServiceNetworkEvent(
276 SERVICE_NETWORK_PROVIDER_REMOVED,
277 oldValue,
278 providerNet
279 ));
280 });
281
282 newp.entrySet().stream().filter(p -> !oldp.keySet().contains(p.getKey()))
283 .forEach(p -> {
284 Provider providerNet = Provider.builder()
285 .provider(serviceNetwork(p.getKey()))
286 .type(p.getValue())
287 .build();
288 notifyDelegate(new ServiceNetworkEvent(
289 SERVICE_NETWORK_PROVIDER_ADDED,
290 newValue,
291 providerNet));
292 });
293 }
294 }
295
296 private class ServicePortMapListener implements MapEventListener<PortId, ServicePort> {
297
298 @Override
299 public void event(MapEvent<PortId, ServicePort> event) {
300 switch (event.type()) {
301 case UPDATE:
302 log.debug("Service port updated {}", event.newValue());
303 eventExecutor.execute(() -> {
304 notifyDelegate(new ServiceNetworkEvent(
305 SERVICE_PORT_UPDATED,
306 serviceNetwork(event.newValue().value().networkId()),
307 event.newValue().value()));
308 });
309 break;
310 case INSERT:
311 log.debug("Service port created {}", event.newValue());
312 eventExecutor.execute(() -> {
313 notifyDelegate(new ServiceNetworkEvent(
314 SERVICE_PORT_CREATED,
315 serviceNetwork(event.newValue().value().networkId()),
316 event.newValue().value()));
317 });
318 break;
319 case REMOVE:
320 log.debug("Service port removed {}", event.oldValue());
321 eventExecutor.execute(() -> {
322 notifyDelegate(new ServiceNetworkEvent(
323 SERVICE_PORT_REMOVED,
324 serviceNetwork(event.oldValue().value().networkId()),
325 event.oldValue().value()));
326 });
327 break;
328 default:
329 log.error("Unsupported event type");
330 break;
331 }
332 }
333 }
334}