blob: 9c64854190eb0831093cea937e4057fda0d16a7d [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
17
18import sys
19
20from twisted.internet import reactor
21from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
22
William Kurkianfc0dcda2019-04-08 16:54:36 -040023from pyvoltha.common.utils.asleep import asleep
24from pyvoltha.common.utils.consulhelpers import get_endpoint_from_consul
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050025from structlog import get_logger
26import grpc
27from grpc import StatusCode
28from grpc._channel import _Rendezvous
William Kurkianfc0dcda2019-04-08 16:54:36 -040029from voltha_protos.voltha_pb2 import OfAgentSubscriber
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050030from grpc_client import GrpcClient
31
32from agent import Agent
William Kurkianfc0dcda2019-04-08 16:54:36 -040033from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050034
35
36log = get_logger()
37# _ = third_party
38
39class ConnectionManager(object):
Richard Jankowski46464e92019-03-05 11:53:55 -050040 def __init__(self, consul_endpoint,
41 vcore_endpoint, vcore_grpc_timeout, vcore_binding_key,
khenaidoo43aa6bd2019-05-29 13:35:13 -040042 vcore_transaction_key, controller_endpoints, instance_id,
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050043 enable_tls=False, key_file=None, cert_file=None,
44 vcore_retry_interval=0.5, devices_refresh_interval=5,
45 subscription_refresh_interval=5):
46
47 log.info('init-connection-manager')
48 log.info('list-of-controllers', controller_endpoints=controller_endpoints)
49 self.controller_endpoints = controller_endpoints
50 self.consul_endpoint = consul_endpoint
51 self.vcore_endpoint = vcore_endpoint
52 self.grpc_timeout = vcore_grpc_timeout
Richard Jankowski46464e92019-03-05 11:53:55 -050053 self.core_binding_key = vcore_binding_key
khenaidoo43aa6bd2019-05-29 13:35:13 -040054 self.core_transaction_key = vcore_transaction_key
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050055 self.instance_id = instance_id
56 self.enable_tls = enable_tls
57 self.key_file = key_file
58 self.cert_file = cert_file
59
60 self.channel = None
61 self.grpc_client = None # single, shared gRPC client to vcore
62
63 self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
64 self.device_id_to_datapath_id_map = {}
65
66 self.vcore_retry_interval = vcore_retry_interval
67 self.devices_refresh_interval = devices_refresh_interval
68 self.subscription_refresh_interval = subscription_refresh_interval
69 self.subscription = None
David Bainbridge006dc842019-11-22 02:05:32 +000070 self.connecting = True
71 self.monitor = True
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050072
73 self.running = False
74
75 def start(self):
76
77 if self.running:
78 return
79
80 log.debug('starting')
81
82 self.running = True
83
84 # Get a subscription to vcore
85 reactor.callInThread(self.get_vcore_subscription)
86
87 # Start monitoring logical devices and manage agents accordingly
88 reactor.callLater(0, self.monitor_logical_devices)
89
90 log.info('started')
91
92 return self
93
David Bainbridge006dc842019-11-22 02:05:32 +000094 def grpc_client_terminated(self):
95 if not self.connecting and self.grpc_client is not None:
96 self.connecting = True
97 self._reset_grpc_attributes()
98 self.delete_all_agents()
99 reactor.callInThread(self.get_vcore_subscription)
Thomas Lee S9f0da512019-09-27 21:09:25 +0530100
David Bainbridge006dc842019-11-22 02:05:32 +0000101 def liveness_probe(self):
102 return self.running
103
104 def readiness_probe(self):
Thomas Lee S9f0da512019-09-27 21:09:25 +0530105 # Pod is isolated when readiness condition fails
David Bainbridge006dc842019-11-22 02:05:32 +0000106 return bool(not self.connecting and self.channel and self.subscription and self.grpc_client)
Thomas Lee S9f0da512019-09-27 21:09:25 +0530107
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500108 def stop(self):
109 log.debug('stopping')
David Bainbridge006dc842019-11-22 02:05:32 +0000110 self.delete_all_agents()
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500111 self._reset_grpc_attributes()
David Bainbridge006dc842019-11-22 02:05:32 +0000112 self.running = False
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500113 log.info('stopped')
114
115 def resolve_endpoint(self, endpoint):
116 ip_port_endpoint = endpoint
117 if endpoint.startswith('@'):
118 try:
119 ip_port_endpoint = get_endpoint_from_consul(
120 self.consul_endpoint, endpoint[1:])
121 log.info(
122 '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
123 except Exception as e:
124 log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
125 log.error('committing-suicide')
126 # Committing suicide in order to let docker restart ofagent
127 os.system("kill -15 {}".format(os.getpid()))
128 if ip_port_endpoint:
129 host, port = ip_port_endpoint.split(':', 2)
130 return host, int(port)
131
132 def _reset_grpc_attributes(self):
133 log.debug('start-reset-grpc-attributes')
134
135 if self.grpc_client is not None:
136 self.grpc_client.stop()
137
138 if self.channel is not None:
139 del self.channel
140
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500141 self.channel = None
142 self.subscription = None
143 self.grpc_client = None
144
145 log.debug('stop-reset-grpc-attributes')
146
147 def _assign_grpc_attributes(self):
148 log.debug('start-assign-grpc-attributes')
149
150 host, port = self.resolve_endpoint(self.vcore_endpoint)
151 log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
152
153 assert host is not None
154 assert port is not None
155
156 # Establish a connection to the vcore GRPC server
157 self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
Thomas Lee S9f0da512019-09-27 21:09:25 +0530158
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500159 log.debug('stop-assign-grpc-attributes')
160
161 @inlineCallbacks
162 def get_vcore_subscription(self):
163 log.debug('start-get-vcore-subscription')
164
David Bainbridge006dc842019-11-22 02:05:32 +0000165 self.connecting = True
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500166 while self.running and self.subscription is None:
167 try:
168 # If a subscription is not yet assigned then establish new GRPC connection
169 # ... otherwise keep using existing connection details
170 if self.subscription is None:
171 self._assign_grpc_attributes()
172
173 # Send subscription request to register the current ofagent instance
174 container_name = self.instance_id
175 if self.grpc_client is None:
Richard Jankowski46464e92019-03-05 11:53:55 -0500176 self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout,
khenaidoo43aa6bd2019-05-29 13:35:13 -0400177 self.core_binding_key, self.core_transaction_key)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500178 subscription = yield self.grpc_client.subscribe(
179 OfAgentSubscriber(ofagent_id=container_name))
180
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500181 # If the subscriber id matches the current instance
182 # ... then the subscription has succeeded
183 if subscription is not None and subscription.ofagent_id == container_name:
184 if self.subscription is None:
185 # Keep details on the current GRPC session and subscription
186 log.debug('subscription-with-vcore-successful', subscription=subscription)
187 self.subscription = subscription
188 self.grpc_client.start()
David Bainbridge006dc842019-11-22 02:05:32 +0000189 self.connecting = False
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500190
191 # Sleep a bit in between each subscribe
192 yield asleep(self.subscription_refresh_interval)
193
194 # Move on to next subscribe request
195 continue
196
197 # The subscription did not succeed, reset and move on
198 else:
199 log.info('subscription-with-vcore-unavailable', subscription=subscription)
200
201 except _Rendezvous, e:
202 log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
203
204 except Exception as e:
205 log.exception('unexpected-subscription-termination-with-vcore', e=e)
206
207 # Reset grpc details
208 # The vcore instance is either not available for subscription
209 # or a failure occurred with the existing communication.
210 self._reset_grpc_attributes()
211
212 # Sleep for a short period and retry
213 yield asleep(self.vcore_retry_interval)
214
215 log.debug('stop-get-vcore-subscription')
216
David Bainbridge006dc842019-11-22 02:05:32 +0000217 def get_rpc_client(self):
218 return self.grpc_client if not self.connecting else None
219
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500220 @inlineCallbacks
221 def get_list_of_logical_devices_from_voltha(self):
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500222 while self.running:
223 log.info('retrieve-logical-device-list')
David Bainbridge006dc842019-11-22 02:05:32 +0000224 rpc = self.get_rpc_client()
225 if rpc is not None:
226 try:
227 devices = yield rpc.list_logical_devices()
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500228
David Bainbridge006dc842019-11-22 02:05:32 +0000229 for device in devices:
230 log.info("logical-device-entry", id=device.id,
231 datapath_id=device.datapath_id)
Scott Bakeree6a0872019-10-29 15:59:52 -0700232
David Bainbridge006dc842019-11-22 02:05:32 +0000233 returnValue(devices)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500234
David Bainbridge006dc842019-11-22 02:05:32 +0000235 except _Rendezvous, e:
236 rpc.stop()
237 status = e.code()
238 log.error('vcore-communication-failure', exception=e, status=status)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500239
David Bainbridge006dc842019-11-22 02:05:32 +0000240 except Exception as e:
241 rpc.stop()
242 log.exception('logical-devices-retrieval-failure', exception=e)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500243
244 log.info('reconnect', after_delay=self.vcore_retry_interval)
245 yield asleep(self.vcore_retry_interval)
246
247 def refresh_agent_connections(self, devices):
248 """
249 Based on the new device list, update the following state in the class:
250 * agent_map
251 * datapath_map
252 * device_id_map
253 :param devices: full device list freshly received from Voltha
254 :return: None
255 """
256
257 # Use datapath ids for deciding what's new and what's obsolete
258 desired_datapath_ids = set(d.datapath_id for d in devices)
259 current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
260
261 # if identical, nothing to do
262 if desired_datapath_ids == current_datapath_ids:
263 return
264
265 # ... otherwise calculate differences
266 to_add = desired_datapath_ids.difference(current_datapath_ids)
267 to_del = current_datapath_ids.difference(desired_datapath_ids)
268
269 # remove what we don't need
270 for datapath_id in to_del:
271 self.delete_agent(datapath_id)
272
273 # start new agents as needed
274 for device in devices:
275 if device.datapath_id in to_add:
276 self.create_agent(device)
277
278 log.debug('updated-agent-list', count=len(self.agent_map))
279 log.debug('updated-device-id-to-datapath-id-map',
280 map=str(self.device_id_to_datapath_id_map))
281
282 def create_agent(self, device):
283 datapath_id = device.datapath_id
284 device_id = device.id
285 for controller_endpoint in self.controller_endpoints:
David Bainbridge006dc842019-11-22 02:05:32 +0000286 agent = Agent(self, controller_endpoint, datapath_id,
287 device_id, self.enable_tls,
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500288 self.key_file, self.cert_file)
289 agent.start()
290 self.agent_map[(datapath_id,controller_endpoint)] = agent
291 self.device_id_to_datapath_id_map[device_id] = datapath_id
292
David Bainbridge006dc842019-11-22 02:05:32 +0000293 def delete_all_agents(self):
294 for agent in self.agent_map.itervalues(): agent.stop()
295 self.agent_map = {}
296 self.device_id_to_datapath_id_map = {}
297
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500298 def delete_agent(self, datapath_id):
299 for controller_endpoint in self.controller_endpoints:
300 agent = self.agent_map[(datapath_id,controller_endpoint)]
301 device_id = agent.get_device_id()
302 agent.stop()
303 del self.agent_map[(datapath_id,controller_endpoint)]
304 del self.device_id_to_datapath_id_map[device_id]
305
306 @inlineCallbacks
307 def monitor_logical_devices(self):
308 log.debug('start-monitor-logical-devices')
309
310 while self.running:
311 log.info('monitoring-logical-devices')
312
313 # should change to a gRPC streaming call
314 # see https://jira.opencord.org/browse/CORD-821
315
316 try:
David Bainbridge006dc842019-11-22 02:05:32 +0000317 if self.channel is not None and self.get_rpc_client() is not None and self.subscription is not None:
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500318 # get current list from Voltha
David Bainbridge006dc842019-11-22 02:05:32 +0000319 devices = yield self.get_list_of_logical_devices_from_voltha()
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500320
321 # update agent list and mapping tables as needed
322 self.refresh_agent_connections(devices)
323 else:
324 log.info('vcore-communication-unavailable')
325
326 # wait before next poll
327 yield asleep(self.devices_refresh_interval)
328
329 except _Rendezvous, e:
330 log.error('vcore-communication-failure', exception=repr(e), status=e.code())
331
332 except Exception as e:
333 log.exception('unexpected-vcore-communication-failure', exception=repr(e))
334
335 log.debug('stop-monitor-logical-devices')
336
337 def forward_packet_in(self, device_id, ofp_packet_in):
338 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
339 if datapath_id:
340 for controller_endpoint in self.controller_endpoints:
341 agent = self.agent_map[(datapath_id, controller_endpoint)]
342 agent.forward_packet_in(ofp_packet_in)
343
344 def forward_change_event(self, device_id, event):
345 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
346 if datapath_id:
347 for controller_endpoint in self.controller_endpoints:
348 agent = self.agent_map[(datapath_id, controller_endpoint)]
349 agent.forward_change_event(event)