blob: a141946b8f55ac719715086d7ddbc4b9fd57e7ab [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
17
18import sys
19
20from twisted.internet import reactor
21from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
22
William Kurkianfc0dcda2019-04-08 16:54:36 -040023from pyvoltha.common.utils.asleep import asleep
24from pyvoltha.common.utils.consulhelpers import get_endpoint_from_consul
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050025from structlog import get_logger
26import grpc
27from grpc import StatusCode
28from grpc._channel import _Rendezvous
William Kurkianfc0dcda2019-04-08 16:54:36 -040029from voltha_protos.voltha_pb2 import OfAgentSubscriber
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050030from grpc_client import GrpcClient
31
32from agent import Agent
William Kurkianfc0dcda2019-04-08 16:54:36 -040033from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050034
35
36log = get_logger()
37# _ = third_party
38
39class ConnectionManager(object):
Thomas Lee S9f0da512019-09-27 21:09:25 +053040 running = False
41 channel = None
42 subscription = None
43 grpc_client = None
44
Richard Jankowski46464e92019-03-05 11:53:55 -050045 def __init__(self, consul_endpoint,
46 vcore_endpoint, vcore_grpc_timeout, vcore_binding_key,
khenaidoo43aa6bd2019-05-29 13:35:13 -040047 vcore_transaction_key, controller_endpoints, instance_id,
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050048 enable_tls=False, key_file=None, cert_file=None,
49 vcore_retry_interval=0.5, devices_refresh_interval=5,
50 subscription_refresh_interval=5):
51
52 log.info('init-connection-manager')
53 log.info('list-of-controllers', controller_endpoints=controller_endpoints)
54 self.controller_endpoints = controller_endpoints
55 self.consul_endpoint = consul_endpoint
56 self.vcore_endpoint = vcore_endpoint
57 self.grpc_timeout = vcore_grpc_timeout
Richard Jankowski46464e92019-03-05 11:53:55 -050058 self.core_binding_key = vcore_binding_key
khenaidoo43aa6bd2019-05-29 13:35:13 -040059 self.core_transaction_key = vcore_transaction_key
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050060 self.instance_id = instance_id
61 self.enable_tls = enable_tls
62 self.key_file = key_file
63 self.cert_file = cert_file
64
65 self.channel = None
66 self.grpc_client = None # single, shared gRPC client to vcore
67
68 self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
69 self.device_id_to_datapath_id_map = {}
70
71 self.vcore_retry_interval = vcore_retry_interval
72 self.devices_refresh_interval = devices_refresh_interval
73 self.subscription_refresh_interval = subscription_refresh_interval
74 self.subscription = None
75
76 self.running = False
77
78 def start(self):
79
80 if self.running:
81 return
82
83 log.debug('starting')
84
85 self.running = True
Thomas Lee S9f0da512019-09-27 21:09:25 +053086 ConnectionManager.running = True
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050087
88 # Get a subscription to vcore
89 reactor.callInThread(self.get_vcore_subscription)
90
91 # Start monitoring logical devices and manage agents accordingly
92 reactor.callLater(0, self.monitor_logical_devices)
93
94 log.info('started')
95
96 return self
97
Thomas Lee S9f0da512019-09-27 21:09:25 +053098 @classmethod
99 def liveness_probe(cls):
100 # Pod restarts when liveness condition fails
101 return ConnectionManager.running
102
103 @classmethod
104 def readiness_probe(cls):
105 # Pod is isolated when readiness condition fails
106 return bool(ConnectionManager.channel and ConnectionManager.subscription and ConnectionManager.grpc_client)
107
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500108 def stop(self):
109 log.debug('stopping')
110 # clean up all controller connections
111 for agent in self.agent_map.itervalues():
112 agent.stop()
113 self.running = False
114
115 self._reset_grpc_attributes()
116
117 log.info('stopped')
118
119 def resolve_endpoint(self, endpoint):
120 ip_port_endpoint = endpoint
121 if endpoint.startswith('@'):
122 try:
123 ip_port_endpoint = get_endpoint_from_consul(
124 self.consul_endpoint, endpoint[1:])
125 log.info(
126 '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
127 except Exception as e:
128 log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
129 log.error('committing-suicide')
130 # Committing suicide in order to let docker restart ofagent
131 os.system("kill -15 {}".format(os.getpid()))
132 if ip_port_endpoint:
133 host, port = ip_port_endpoint.split(':', 2)
134 return host, int(port)
135
136 def _reset_grpc_attributes(self):
137 log.debug('start-reset-grpc-attributes')
138
139 if self.grpc_client is not None:
140 self.grpc_client.stop()
141
142 if self.channel is not None:
143 del self.channel
144
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500145 self.channel = None
146 self.subscription = None
147 self.grpc_client = None
148
Thomas Lee S9f0da512019-09-27 21:09:25 +0530149 ConnectionManager.channel = None
150 ConnectionManager.subscription = None
151 ConnectionManager.grpc_client = None
152
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500153 log.debug('stop-reset-grpc-attributes')
154
155 def _assign_grpc_attributes(self):
156 log.debug('start-assign-grpc-attributes')
157
158 host, port = self.resolve_endpoint(self.vcore_endpoint)
159 log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
160
161 assert host is not None
162 assert port is not None
163
164 # Establish a connection to the vcore GRPC server
165 self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
Thomas Lee S9f0da512019-09-27 21:09:25 +0530166
167 # For Readiness probe
168 ConnectionManager.channel = self.channel
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500169
170 log.debug('stop-assign-grpc-attributes')
171
172 @inlineCallbacks
173 def get_vcore_subscription(self):
174 log.debug('start-get-vcore-subscription')
175
176 while self.running and self.subscription is None:
177 try:
178 # If a subscription is not yet assigned then establish new GRPC connection
179 # ... otherwise keep using existing connection details
180 if self.subscription is None:
181 self._assign_grpc_attributes()
182
183 # Send subscription request to register the current ofagent instance
184 container_name = self.instance_id
185 if self.grpc_client is None:
Richard Jankowski46464e92019-03-05 11:53:55 -0500186 self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout,
khenaidoo43aa6bd2019-05-29 13:35:13 -0400187 self.core_binding_key, self.core_transaction_key)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500188 subscription = yield self.grpc_client.subscribe(
189 OfAgentSubscriber(ofagent_id=container_name))
190
Thomas Lee S9f0da512019-09-27 21:09:25 +0530191 #For Readiness probes
192 ConnectionManager.subscription = subscription
193 ConnectionManager.grpc_client = self.grpc_client
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500194 # If the subscriber id matches the current instance
195 # ... then the subscription has succeeded
196 if subscription is not None and subscription.ofagent_id == container_name:
197 if self.subscription is None:
198 # Keep details on the current GRPC session and subscription
199 log.debug('subscription-with-vcore-successful', subscription=subscription)
200 self.subscription = subscription
201 self.grpc_client.start()
202
203 # Sleep a bit in between each subscribe
204 yield asleep(self.subscription_refresh_interval)
205
206 # Move on to next subscribe request
207 continue
208
209 # The subscription did not succeed, reset and move on
210 else:
211 log.info('subscription-with-vcore-unavailable', subscription=subscription)
212
213 except _Rendezvous, e:
214 log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
215
216 except Exception as e:
217 log.exception('unexpected-subscription-termination-with-vcore', e=e)
218
219 # Reset grpc details
220 # The vcore instance is either not available for subscription
221 # or a failure occurred with the existing communication.
222 self._reset_grpc_attributes()
223
224 # Sleep for a short period and retry
225 yield asleep(self.vcore_retry_interval)
226
227 log.debug('stop-get-vcore-subscription')
228
229 @inlineCallbacks
230 def get_list_of_logical_devices_from_voltha(self):
231
232 while self.running:
233 log.info('retrieve-logical-device-list')
234 try:
235 devices = yield \
236 self.grpc_client.list_logical_devices()
237
238 for device in devices:
239 log.info("logical-device-entry", id=device.id,
240 datapath_id=device.datapath_id)
241
242 returnValue(devices)
243
244 except _Rendezvous, e:
245 status = e.code()
246 log.error('vcore-communication-failure', exception=e, status=status)
247 if status == StatusCode.UNAVAILABLE or status == StatusCode.DEADLINE_EXCEEDED:
248 os.system("kill -15 {}".format(os.getpid()))
249
250 except Exception as e:
251 log.exception('logical-devices-retrieval-failure', exception=e)
252
253 log.info('reconnect', after_delay=self.vcore_retry_interval)
254 yield asleep(self.vcore_retry_interval)
255
256 def refresh_agent_connections(self, devices):
257 """
258 Based on the new device list, update the following state in the class:
259 * agent_map
260 * datapath_map
261 * device_id_map
262 :param devices: full device list freshly received from Voltha
263 :return: None
264 """
265
266 # Use datapath ids for deciding what's new and what's obsolete
267 desired_datapath_ids = set(d.datapath_id for d in devices)
268 current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
269
270 # if identical, nothing to do
271 if desired_datapath_ids == current_datapath_ids:
272 return
273
274 # ... otherwise calculate differences
275 to_add = desired_datapath_ids.difference(current_datapath_ids)
276 to_del = current_datapath_ids.difference(desired_datapath_ids)
277
278 # remove what we don't need
279 for datapath_id in to_del:
280 self.delete_agent(datapath_id)
281
282 # start new agents as needed
283 for device in devices:
284 if device.datapath_id in to_add:
285 self.create_agent(device)
286
287 log.debug('updated-agent-list', count=len(self.agent_map))
288 log.debug('updated-device-id-to-datapath-id-map',
289 map=str(self.device_id_to_datapath_id_map))
290
291 def create_agent(self, device):
292 datapath_id = device.datapath_id
293 device_id = device.id
294 for controller_endpoint in self.controller_endpoints:
295 agent = Agent(controller_endpoint, datapath_id,
296 device_id, self.grpc_client, self.enable_tls,
297 self.key_file, self.cert_file)
298 agent.start()
299 self.agent_map[(datapath_id,controller_endpoint)] = agent
300 self.device_id_to_datapath_id_map[device_id] = datapath_id
301
302 def delete_agent(self, datapath_id):
303 for controller_endpoint in self.controller_endpoints:
304 agent = self.agent_map[(datapath_id,controller_endpoint)]
305 device_id = agent.get_device_id()
306 agent.stop()
307 del self.agent_map[(datapath_id,controller_endpoint)]
308 del self.device_id_to_datapath_id_map[device_id]
309
310 @inlineCallbacks
311 def monitor_logical_devices(self):
312 log.debug('start-monitor-logical-devices')
313
314 while self.running:
315 log.info('monitoring-logical-devices')
316
317 # should change to a gRPC streaming call
318 # see https://jira.opencord.org/browse/CORD-821
319
320 try:
321 if self.channel is not None and self.grpc_client is not None and \
322 self.subscription is not None:
323 # get current list from Voltha
324 devices = yield \
325 self.get_list_of_logical_devices_from_voltha()
326
327 # update agent list and mapping tables as needed
328 self.refresh_agent_connections(devices)
329 else:
330 log.info('vcore-communication-unavailable')
331
332 # wait before next poll
333 yield asleep(self.devices_refresh_interval)
334
335 except _Rendezvous, e:
336 log.error('vcore-communication-failure', exception=repr(e), status=e.code())
337
338 except Exception as e:
339 log.exception('unexpected-vcore-communication-failure', exception=repr(e))
340
341 log.debug('stop-monitor-logical-devices')
342
343 def forward_packet_in(self, device_id, ofp_packet_in):
344 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
345 if datapath_id:
346 for controller_endpoint in self.controller_endpoints:
347 agent = self.agent_map[(datapath_id, controller_endpoint)]
348 agent.forward_packet_in(ofp_packet_in)
349
350 def forward_change_event(self, device_id, event):
351 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
352 if datapath_id:
353 for controller_endpoint in self.controller_endpoints:
354 agent = self.agent_map[(datapath_id, controller_endpoint)]
355 agent.forward_change_event(event)