blob: eb578a22726f7032ac68eef1960f3c8a9725b7ca [file] [log] [blame]
Girish Gowdru4e854c22018-09-26 02:51:57 -07001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""
17Implementation for the interface of KV store of flow and bal flow mapping
18"""
19import structlog
20import json
21from zope.interface import implementer
22from voltha.adapters.asfvolt16_olt.kv_store_interface import KvStoreInterface
23from voltha.core.config.config_backend import ConsulStore
24from voltha.core.config.config_backend import EtcdStore
25
26# KV store uses this prefix to store flows info
27PATH_PREFIX = 'asfvolt16_flow_store'
28
29log = structlog.get_logger()
30
31@implementer(KvStoreInterface)
32class Asfvolt16KvStore(object):
33
34 def __init__(self, backend, host, port):
35 """
36 based on backend ('consul' and 'etcd' use the host and port
37 to create of the respective object
38 :param backend: Type of backend storage (etcd or consul)
39 :param host: host ip info for backend storage
40 :param port: port for the backend storage
41 """
42 try:
43 if backend == 'consul':
44 self.kv_store = ConsulStore(host, port, PATH_PREFIX)
45 elif backend == 'etcd':
46 self.kv_store = EtcdStore(host, port, PATH_PREFIX)
47 else:
48 log.error('Invalid-backend')
49 raise Exception("Invalid-backend-for-kv-store")
50 except Exception as e:
51 log.exception("exception-in-init", e=e)
52
53 # Used for incremental flow, as we are getting remove flow cookies,
54 # So instead of evaluating, we are just getting the mapping info
55 # from kv store
56 def get_flows_to_remove_info(self, device_id, flows):
57 # store flows to be removed
58 flows_to_remove_list = []
59 id = device_id.encode('ascii', 'ignore')
60
61 try:
62 # Preparing cookie info list from received remove flow
63 for flow in flows:
64 cookie_info = self.kv_store[id + '/' + str(flow.cookie)]
65 if cookie_info:
66 log.debug("cookie-info-exist", cookie=flow.cookie,
67 cookie_info=cookie_info)
68 flows_to_remove_list.append(json.loads(cookie_info))
69 else:
70 log.debug("cookie-info-does-not-exist", cookie=flow.cookie)
71
72 except Exception as e:
73 log.exception("evaulating-flows-to-remove-info", e=e)
74
75 return flows_to_remove_list
76
77 # Used for bulk flow update, as we are getting bulk flow cookies,
78 # So we evalute based on the curent flows present in kv store
79 def get_flows_to_remove(self, device_id, flows):
80 # store the flows present in the db
81 current_flows_list = []
82
83 # store flows to be removed
84 flows_to_remove_list = []
85 id = device_id.encode('ascii', 'ignore')
86
87 # Get all the flows already present in the consul
88 try:
89 # Get all the flows already present in the consul
90 # Preparing cookie list from flows present in the KV store
91 kv_store_flows = self.kv_store._kv_get(PATH_PREFIX + '/' + id,
92 recurse=True)
93 if kv_store_flows is None:
94 return flows_to_remove_list
95
96 for kv_store_flow in kv_store_flows:
97 value = kv_store_flow['Value']
98 current_flows_list.append(json.loads(value))
99
100 # Preparing cookie list from bulk flow received
101 bulk_update_flow_cookie_list = [flow.cookie for flow in flows]
102
103 # Evaluating the flows need to be removed
104 # current_flows not present in bulk_flow
105 for current_flow in current_flows_list:
106 cookie = current_flow.keys()[0]
107 if long(cookie) not in bulk_update_flow_cookie_list:
108 flows_to_remove_list.append(current_flow[cookie])
109
110 except Exception as e:
111 log.exception("evaulating-flows-to-remove", e=e)
112
113 return flows_to_remove_list
114
115 def get_flows_to_add(self, device_id, flows):
116 # store the flows present in the db
117 current_flows_list = []
118 id = device_id.encode('ascii', 'ignore')
119
120 try:
121 # Get all the flows already present in the consul
122 # Preparing cookie set from flows present in the KV store
123 kv_store_flows = self.kv_store._kv_get(PATH_PREFIX + '/' + id,
124 recurse=True)
125 if kv_store_flows is not None:
126 for kv_store_flow in kv_store_flows:
127 value = kv_store_flow['Value']
128 current_flows_list.append(json.loads(value))
129
130 current_flow_cookie_set = set(long(current_flow.keys()[0])
131 for current_flow in current_flows_list)
132 # Preparing cookie set from bulk flow received
133 bulk_update_flow_cookie_set = set(flow.cookie for flow in flows)
134
135 # Evaluating the list of flows newly to be added
136 flow_to_add_set = bulk_update_flow_cookie_set.difference \
137 (current_flow_cookie_set)
138 flows_to_add_list = list(flow_to_add_set)
139
140 except Exception as e:
141 log.exception("evaluating-flows-to-add", e=e)
142
143 return flows_to_add_list
144
145 def add_to_kv_store(self, device_id, new_flow_mapping_list):
146 # store the flows present in the db
147 current_flows_list = []
148 id = device_id.encode('ascii', 'ignore')
149
150 try:
151 log.debug("incremental-flows-to-be-added-to-kv-store",
152 flows=new_flow_mapping_list)
153 # Key is the cookie id, extracted from the key stored in new_flow_mapping_list
154 for flow in new_flow_mapping_list:
155 self.kv_store[id + '/' + str(flow.keys()[0])] = json.dumps(flow)
156
157 except Exception as e:
158 log.exception("incremental-flow-add-to-kv-store", e=e)
159
160 def remove_from_kv_store(self, device_id, flows_to_remove):
161 id = device_id.encode('ascii', 'ignore')
162
163 try:
164 log.debug("incremental-flows-to-be-removed-from-kv-store",
165 flows=flows_to_remove)
166 # remove the flows based on cookie id from kv store
167 for cookie in flows_to_remove:
168 del self.kv_store[id + '/' + str(cookie)]
169
170 except Exception as e:
171 log.exception("incremental-flow-remove-from-kv-store", e=e)
172
173 def update_kv_store(self, device_id, new_flow_mapping_list, flows):
174 # store the flows present in the db
175 current_flows_list = []
176 id = device_id.encode('ascii', 'ignore')
177
178 try:
179 # Get all the flows already present in the consul
180 # Preparing cookie set from flows present in the KV store
181 kv_store_flows = self.kv_store._kv_get(PATH_PREFIX + '/' + id,
182 recurse=True)
183 if kv_store_flows is not None:
184 for kv_store_flow in kv_store_flows:
185 value = kv_store_flow['Value']
186 current_flows_list.append(json.loads(value))
187
188 current_flow_cookie_set = set(long(current_flow.keys()[0])
189 for current_flow in current_flows_list)
190
191 # Preparing cookie set from flows added newly
192 new_flow_added_cookie_set = set(new_flow.keys()[0]
193 for new_flow in new_flow_mapping_list)
194
195 # Preparing cookie set from bulk flow received
196 bulk_update_flow_cookie_set = set(flow.cookie for flow in flows)
197
198 # Evaluating flows to be removed, remove from KV store
199 remove_flows_list = list(current_flow_cookie_set.difference \
200 (bulk_update_flow_cookie_set))
201 log.debug("bulk-flows-to-be-removed-from-kv-store",
202 flows=remove_flows_list)
203
204 for cookie in remove_flows_list:
205 del self.kv_store[id + '/' + str(cookie)]
206
207 # Evaluating flows need to be added newly to KV, add to KV
208 new_flows_list = list(new_flow_added_cookie_set.difference \
209 (current_flow_cookie_set))
210 log.debug("bulk-flows-to-be-added-to-kv-store", flows=new_flows_list)
211
212 for new_flow in new_flows_list:
213 for fl in new_flow_mapping_list:
214 if fl.keys()[0] == new_flow:
215 self.kv_store[id + '/' + str(new_flow)] = json.dumps(fl)
216
217 except Exception as e:
218 log.exception("bulk-flow-update-kv-store", e=e)
219
220
221 def clear_kv_store(self, device_id):
222 id = device_id.encode('ascii', 'ignore')
223 try:
224 # Recurse flow is not working as cache is not getting cleared
225 # So extracting all flows using GET and deleting each flow
226 #kv_store_clear_flows = self.kv_store._kv_delete(PATH_PREFIX + '/' \
227 # + id, recurse=True)
228
229 # Get all the flows present in the consul
230 kv_store_flows = self.kv_store._kv_get(PATH_PREFIX + '/' + id,
231 recurse=True)
232 if kv_store_flows is not None:
233 for kv_store_flow in kv_store_flows:
234 # Extracting cookie id from the kv store flow details
235 # and deleting each flows
236 flow = json.loads(kv_store_flow['Value'])
237 cookie = flow.keys()[0]
238 del self.kv_store[id + '/' + str(cookie)]
239 log.debug("kv-store-flows-cleared-successfully")
240 else:
241 log.debug("no-flows-found-in-kv-store")
242 return
243
244 except Exception as e:
245 log.exception("clear-kv-store", e=e)
246
247 def is_reference_found_for_key_value(self, device_id, key, value):
248 id = device_id.encode('ascii', 'ignore')
249 try:
250 # Get all the flows present in the consul
251 kv_store_flows = self.kv_store._kv_get(PATH_PREFIX + '/' + id,
252 recurse=True)
253 if kv_store_flows is not None:
254 for kv_store_flow in kv_store_flows:
255 flow = json.loads(kv_store_flow['Value'])
256 cookie = flow.keys()[0]
257 flow_data = flow[cookie]
258 if key in flow_data.keys():
259 # Check if have reference for the Key in the flow
260 # with the given value
261 if flow_data[key] == value:
262 return True
263 except Exception as e:
264 log.exception("excepting-finding-refernece-for-kv", e=e)
265
266 return False