blob: 3b1028075c973b07f4f568cff0ae3b84a67ba41b [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
17
18import sys
19
20from twisted.internet import reactor
21from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
22
William Kurkianfc0dcda2019-04-08 16:54:36 -040023from pyvoltha.common.utils.asleep import asleep
24from pyvoltha.common.utils.consulhelpers import get_endpoint_from_consul
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050025from structlog import get_logger
26import grpc
27from grpc import StatusCode
28from grpc._channel import _Rendezvous
William Kurkianfc0dcda2019-04-08 16:54:36 -040029from voltha_protos.voltha_pb2 import OfAgentSubscriber
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050030from grpc_client import GrpcClient
31
32from agent import Agent
William Kurkianfc0dcda2019-04-08 16:54:36 -040033from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050034
35
36log = get_logger()
37# _ = third_party
38
39class ConnectionManager(object):
Thomas Lee S9f0da512019-09-27 21:09:25 +053040 running = False
Scott Bakeree6a0872019-10-29 15:59:52 -070041 core_ready = False
Thomas Lee S9f0da512019-09-27 21:09:25 +053042 channel = None
43 subscription = None
44 grpc_client = None
45
Richard Jankowski46464e92019-03-05 11:53:55 -050046 def __init__(self, consul_endpoint,
47 vcore_endpoint, vcore_grpc_timeout, vcore_binding_key,
khenaidoo43aa6bd2019-05-29 13:35:13 -040048 vcore_transaction_key, controller_endpoints, instance_id,
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050049 enable_tls=False, key_file=None, cert_file=None,
50 vcore_retry_interval=0.5, devices_refresh_interval=5,
51 subscription_refresh_interval=5):
52
53 log.info('init-connection-manager')
54 log.info('list-of-controllers', controller_endpoints=controller_endpoints)
55 self.controller_endpoints = controller_endpoints
56 self.consul_endpoint = consul_endpoint
57 self.vcore_endpoint = vcore_endpoint
58 self.grpc_timeout = vcore_grpc_timeout
Richard Jankowski46464e92019-03-05 11:53:55 -050059 self.core_binding_key = vcore_binding_key
khenaidoo43aa6bd2019-05-29 13:35:13 -040060 self.core_transaction_key = vcore_transaction_key
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050061 self.instance_id = instance_id
62 self.enable_tls = enable_tls
63 self.key_file = key_file
64 self.cert_file = cert_file
65
66 self.channel = None
67 self.grpc_client = None # single, shared gRPC client to vcore
68
69 self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
70 self.device_id_to_datapath_id_map = {}
71
72 self.vcore_retry_interval = vcore_retry_interval
73 self.devices_refresh_interval = devices_refresh_interval
74 self.subscription_refresh_interval = subscription_refresh_interval
75 self.subscription = None
76
77 self.running = False
78
79 def start(self):
80
81 if self.running:
82 return
83
84 log.debug('starting')
85
86 self.running = True
Scott Bakeree6a0872019-10-29 15:59:52 -070087 ConnectionManager.core_ready = True # Assume core is ready until proven otherwise
Thomas Lee S9f0da512019-09-27 21:09:25 +053088 ConnectionManager.running = True
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050089
90 # Get a subscription to vcore
91 reactor.callInThread(self.get_vcore_subscription)
92
93 # Start monitoring logical devices and manage agents accordingly
94 reactor.callLater(0, self.monitor_logical_devices)
95
96 log.info('started')
97
98 return self
99
Thomas Lee S9f0da512019-09-27 21:09:25 +0530100 @classmethod
101 def liveness_probe(cls):
102 # Pod restarts when liveness condition fails
103 return ConnectionManager.running
104
105 @classmethod
106 def readiness_probe(cls):
107 # Pod is isolated when readiness condition fails
Scott Bakeree6a0872019-10-29 15:59:52 -0700108 return bool(ConnectionManager.core_ready and ConnectionManager.channel and ConnectionManager.subscription and ConnectionManager.grpc_client)
Thomas Lee S9f0da512019-09-27 21:09:25 +0530109
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500110 def stop(self):
111 log.debug('stopping')
112 # clean up all controller connections
113 for agent in self.agent_map.itervalues():
114 agent.stop()
115 self.running = False
116
117 self._reset_grpc_attributes()
118
119 log.info('stopped')
120
121 def resolve_endpoint(self, endpoint):
122 ip_port_endpoint = endpoint
123 if endpoint.startswith('@'):
124 try:
125 ip_port_endpoint = get_endpoint_from_consul(
126 self.consul_endpoint, endpoint[1:])
127 log.info(
128 '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
129 except Exception as e:
130 log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
131 log.error('committing-suicide')
132 # Committing suicide in order to let docker restart ofagent
133 os.system("kill -15 {}".format(os.getpid()))
134 if ip_port_endpoint:
135 host, port = ip_port_endpoint.split(':', 2)
136 return host, int(port)
137
138 def _reset_grpc_attributes(self):
139 log.debug('start-reset-grpc-attributes')
140
141 if self.grpc_client is not None:
142 self.grpc_client.stop()
143
144 if self.channel is not None:
145 del self.channel
146
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500147 self.channel = None
148 self.subscription = None
149 self.grpc_client = None
150
Thomas Lee S9f0da512019-09-27 21:09:25 +0530151 ConnectionManager.channel = None
152 ConnectionManager.subscription = None
153 ConnectionManager.grpc_client = None
154
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500155 log.debug('stop-reset-grpc-attributes')
156
157 def _assign_grpc_attributes(self):
158 log.debug('start-assign-grpc-attributes')
159
160 host, port = self.resolve_endpoint(self.vcore_endpoint)
161 log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
162
163 assert host is not None
164 assert port is not None
165
166 # Establish a connection to the vcore GRPC server
167 self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
Thomas Lee S9f0da512019-09-27 21:09:25 +0530168
169 # For Readiness probe
170 ConnectionManager.channel = self.channel
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500171
172 log.debug('stop-assign-grpc-attributes')
173
174 @inlineCallbacks
175 def get_vcore_subscription(self):
176 log.debug('start-get-vcore-subscription')
177
178 while self.running and self.subscription is None:
179 try:
180 # If a subscription is not yet assigned then establish new GRPC connection
181 # ... otherwise keep using existing connection details
182 if self.subscription is None:
183 self._assign_grpc_attributes()
184
185 # Send subscription request to register the current ofagent instance
186 container_name = self.instance_id
187 if self.grpc_client is None:
Richard Jankowski46464e92019-03-05 11:53:55 -0500188 self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout,
khenaidoo43aa6bd2019-05-29 13:35:13 -0400189 self.core_binding_key, self.core_transaction_key)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500190 subscription = yield self.grpc_client.subscribe(
191 OfAgentSubscriber(ofagent_id=container_name))
192
Thomas Lee S9f0da512019-09-27 21:09:25 +0530193 #For Readiness probes
194 ConnectionManager.subscription = subscription
195 ConnectionManager.grpc_client = self.grpc_client
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500196 # If the subscriber id matches the current instance
197 # ... then the subscription has succeeded
198 if subscription is not None and subscription.ofagent_id == container_name:
199 if self.subscription is None:
200 # Keep details on the current GRPC session and subscription
201 log.debug('subscription-with-vcore-successful', subscription=subscription)
202 self.subscription = subscription
203 self.grpc_client.start()
204
205 # Sleep a bit in between each subscribe
206 yield asleep(self.subscription_refresh_interval)
207
208 # Move on to next subscribe request
209 continue
210
211 # The subscription did not succeed, reset and move on
212 else:
213 log.info('subscription-with-vcore-unavailable', subscription=subscription)
214
215 except _Rendezvous, e:
216 log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
217
218 except Exception as e:
219 log.exception('unexpected-subscription-termination-with-vcore', e=e)
220
221 # Reset grpc details
222 # The vcore instance is either not available for subscription
223 # or a failure occurred with the existing communication.
224 self._reset_grpc_attributes()
225
226 # Sleep for a short period and retry
227 yield asleep(self.vcore_retry_interval)
228
229 log.debug('stop-get-vcore-subscription')
230
231 @inlineCallbacks
232 def get_list_of_logical_devices_from_voltha(self):
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500233 while self.running:
234 log.info('retrieve-logical-device-list')
235 try:
236 devices = yield \
237 self.grpc_client.list_logical_devices()
238
Scott Bakeree6a0872019-10-29 15:59:52 -0700239 ConnectionManager.core_ready = True # We've successfully talked to the core
240
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500241 for device in devices:
242 log.info("logical-device-entry", id=device.id,
243 datapath_id=device.datapath_id)
244
245 returnValue(devices)
246
247 except _Rendezvous, e:
248 status = e.code()
249 log.error('vcore-communication-failure', exception=e, status=status)
Scott Bakeree6a0872019-10-29 15:59:52 -0700250 ConnectionManager.core_ready = False # Will be reflected in readiness probe
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500251
252 except Exception as e:
253 log.exception('logical-devices-retrieval-failure', exception=e)
Scott Bakeree6a0872019-10-29 15:59:52 -0700254 ConnectionManager.core_ready = False # will be reflected in readiness probe
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500255
256 log.info('reconnect', after_delay=self.vcore_retry_interval)
257 yield asleep(self.vcore_retry_interval)
258
259 def refresh_agent_connections(self, devices):
260 """
261 Based on the new device list, update the following state in the class:
262 * agent_map
263 * datapath_map
264 * device_id_map
265 :param devices: full device list freshly received from Voltha
266 :return: None
267 """
268
269 # Use datapath ids for deciding what's new and what's obsolete
270 desired_datapath_ids = set(d.datapath_id for d in devices)
271 current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
272
273 # if identical, nothing to do
274 if desired_datapath_ids == current_datapath_ids:
275 return
276
277 # ... otherwise calculate differences
278 to_add = desired_datapath_ids.difference(current_datapath_ids)
279 to_del = current_datapath_ids.difference(desired_datapath_ids)
280
281 # remove what we don't need
282 for datapath_id in to_del:
283 self.delete_agent(datapath_id)
284
285 # start new agents as needed
286 for device in devices:
287 if device.datapath_id in to_add:
288 self.create_agent(device)
289
290 log.debug('updated-agent-list', count=len(self.agent_map))
291 log.debug('updated-device-id-to-datapath-id-map',
292 map=str(self.device_id_to_datapath_id_map))
293
294 def create_agent(self, device):
295 datapath_id = device.datapath_id
296 device_id = device.id
297 for controller_endpoint in self.controller_endpoints:
298 agent = Agent(controller_endpoint, datapath_id,
299 device_id, self.grpc_client, self.enable_tls,
300 self.key_file, self.cert_file)
301 agent.start()
302 self.agent_map[(datapath_id,controller_endpoint)] = agent
303 self.device_id_to_datapath_id_map[device_id] = datapath_id
304
305 def delete_agent(self, datapath_id):
306 for controller_endpoint in self.controller_endpoints:
307 agent = self.agent_map[(datapath_id,controller_endpoint)]
308 device_id = agent.get_device_id()
309 agent.stop()
310 del self.agent_map[(datapath_id,controller_endpoint)]
311 del self.device_id_to_datapath_id_map[device_id]
312
313 @inlineCallbacks
314 def monitor_logical_devices(self):
315 log.debug('start-monitor-logical-devices')
316
317 while self.running:
318 log.info('monitoring-logical-devices')
319
320 # should change to a gRPC streaming call
321 # see https://jira.opencord.org/browse/CORD-821
322
323 try:
324 if self.channel is not None and self.grpc_client is not None and \
325 self.subscription is not None:
326 # get current list from Voltha
327 devices = yield \
328 self.get_list_of_logical_devices_from_voltha()
329
330 # update agent list and mapping tables as needed
331 self.refresh_agent_connections(devices)
332 else:
333 log.info('vcore-communication-unavailable')
334
335 # wait before next poll
336 yield asleep(self.devices_refresh_interval)
337
338 except _Rendezvous, e:
339 log.error('vcore-communication-failure', exception=repr(e), status=e.code())
340
341 except Exception as e:
342 log.exception('unexpected-vcore-communication-failure', exception=repr(e))
343
344 log.debug('stop-monitor-logical-devices')
345
346 def forward_packet_in(self, device_id, ofp_packet_in):
347 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
348 if datapath_id:
349 for controller_endpoint in self.controller_endpoints:
350 agent = self.agent_map[(datapath_id, controller_endpoint)]
351 agent.forward_packet_in(ofp_packet_in)
352
353 def forward_change_event(self, device_id, event):
354 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
355 if datapath_id:
356 for controller_endpoint in self.controller_endpoints:
357 agent = self.agent_map[(datapath_id, controller_endpoint)]
358 agent.forward_change_event(event)