blob: 11a1334f0b8e42f496978c4b3623e49819f7b772 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
17
18import sys
19
20from twisted.internet import reactor
21from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
22
William Kurkianfc0dcda2019-04-08 16:54:36 -040023from pyvoltha.common.utils.asleep import asleep
24from pyvoltha.common.utils.consulhelpers import get_endpoint_from_consul
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050025from structlog import get_logger
26import grpc
27from grpc import StatusCode
28from grpc._channel import _Rendezvous
William Kurkianfc0dcda2019-04-08 16:54:36 -040029from voltha_protos.voltha_pb2 import OfAgentSubscriber
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050030from grpc_client import GrpcClient
31
32from agent import Agent
William Kurkianfc0dcda2019-04-08 16:54:36 -040033from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050034
35
36log = get_logger()
37# _ = third_party
38
39class ConnectionManager(object):
Richard Jankowski46464e92019-03-05 11:53:55 -050040 def __init__(self, consul_endpoint,
41 vcore_endpoint, vcore_grpc_timeout, vcore_binding_key,
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050042 controller_endpoints, instance_id,
43 enable_tls=False, key_file=None, cert_file=None,
44 vcore_retry_interval=0.5, devices_refresh_interval=5,
45 subscription_refresh_interval=5):
46
47 log.info('init-connection-manager')
48 log.info('list-of-controllers', controller_endpoints=controller_endpoints)
49 self.controller_endpoints = controller_endpoints
50 self.consul_endpoint = consul_endpoint
51 self.vcore_endpoint = vcore_endpoint
52 self.grpc_timeout = vcore_grpc_timeout
Richard Jankowski46464e92019-03-05 11:53:55 -050053 self.core_binding_key = vcore_binding_key
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050054 self.instance_id = instance_id
55 self.enable_tls = enable_tls
56 self.key_file = key_file
57 self.cert_file = cert_file
58
59 self.channel = None
60 self.grpc_client = None # single, shared gRPC client to vcore
61
62 self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
63 self.device_id_to_datapath_id_map = {}
64
65 self.vcore_retry_interval = vcore_retry_interval
66 self.devices_refresh_interval = devices_refresh_interval
67 self.subscription_refresh_interval = subscription_refresh_interval
68 self.subscription = None
69
70 self.running = False
71
72 def start(self):
73
74 if self.running:
75 return
76
77 log.debug('starting')
78
79 self.running = True
80
81 # Get a subscription to vcore
82 reactor.callInThread(self.get_vcore_subscription)
83
84 # Start monitoring logical devices and manage agents accordingly
85 reactor.callLater(0, self.monitor_logical_devices)
86
87 log.info('started')
88
89 return self
90
91 def stop(self):
92 log.debug('stopping')
93 # clean up all controller connections
94 for agent in self.agent_map.itervalues():
95 agent.stop()
96 self.running = False
97
98 self._reset_grpc_attributes()
99
100 log.info('stopped')
101
102 def resolve_endpoint(self, endpoint):
103 ip_port_endpoint = endpoint
104 if endpoint.startswith('@'):
105 try:
106 ip_port_endpoint = get_endpoint_from_consul(
107 self.consul_endpoint, endpoint[1:])
108 log.info(
109 '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
110 except Exception as e:
111 log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
112 log.error('committing-suicide')
113 # Committing suicide in order to let docker restart ofagent
114 os.system("kill -15 {}".format(os.getpid()))
115 if ip_port_endpoint:
116 host, port = ip_port_endpoint.split(':', 2)
117 return host, int(port)
118
119 def _reset_grpc_attributes(self):
120 log.debug('start-reset-grpc-attributes')
121
122 if self.grpc_client is not None:
123 self.grpc_client.stop()
124
125 if self.channel is not None:
126 del self.channel
127
128 self.is_alive = False
129 self.channel = None
130 self.subscription = None
131 self.grpc_client = None
132
133 log.debug('stop-reset-grpc-attributes')
134
135 def _assign_grpc_attributes(self):
136 log.debug('start-assign-grpc-attributes')
137
138 host, port = self.resolve_endpoint(self.vcore_endpoint)
139 log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
140
141 assert host is not None
142 assert port is not None
143
144 # Establish a connection to the vcore GRPC server
145 self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
146 self.is_alive = True
147
148 log.debug('stop-assign-grpc-attributes')
149
150 @inlineCallbacks
151 def get_vcore_subscription(self):
152 log.debug('start-get-vcore-subscription')
153
154 while self.running and self.subscription is None:
155 try:
156 # If a subscription is not yet assigned then establish new GRPC connection
157 # ... otherwise keep using existing connection details
158 if self.subscription is None:
159 self._assign_grpc_attributes()
160
161 # Send subscription request to register the current ofagent instance
162 container_name = self.instance_id
163 if self.grpc_client is None:
Richard Jankowski46464e92019-03-05 11:53:55 -0500164 self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout,
165 self.core_binding_key)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500166 subscription = yield self.grpc_client.subscribe(
167 OfAgentSubscriber(ofagent_id=container_name))
168
169 # If the subscriber id matches the current instance
170 # ... then the subscription has succeeded
171 if subscription is not None and subscription.ofagent_id == container_name:
172 if self.subscription is None:
173 # Keep details on the current GRPC session and subscription
174 log.debug('subscription-with-vcore-successful', subscription=subscription)
175 self.subscription = subscription
176 self.grpc_client.start()
177
178 # Sleep a bit in between each subscribe
179 yield asleep(self.subscription_refresh_interval)
180
181 # Move on to next subscribe request
182 continue
183
184 # The subscription did not succeed, reset and move on
185 else:
186 log.info('subscription-with-vcore-unavailable', subscription=subscription)
187
188 except _Rendezvous, e:
189 log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
190
191 except Exception as e:
192 log.exception('unexpected-subscription-termination-with-vcore', e=e)
193
194 # Reset grpc details
195 # The vcore instance is either not available for subscription
196 # or a failure occurred with the existing communication.
197 self._reset_grpc_attributes()
198
199 # Sleep for a short period and retry
200 yield asleep(self.vcore_retry_interval)
201
202 log.debug('stop-get-vcore-subscription')
203
204 @inlineCallbacks
205 def get_list_of_logical_devices_from_voltha(self):
206
207 while self.running:
208 log.info('retrieve-logical-device-list')
209 try:
210 devices = yield \
211 self.grpc_client.list_logical_devices()
212
213 for device in devices:
214 log.info("logical-device-entry", id=device.id,
215 datapath_id=device.datapath_id)
216
217 returnValue(devices)
218
219 except _Rendezvous, e:
220 status = e.code()
221 log.error('vcore-communication-failure', exception=e, status=status)
222 if status == StatusCode.UNAVAILABLE or status == StatusCode.DEADLINE_EXCEEDED:
223 os.system("kill -15 {}".format(os.getpid()))
224
225 except Exception as e:
226 log.exception('logical-devices-retrieval-failure', exception=e)
227
228 log.info('reconnect', after_delay=self.vcore_retry_interval)
229 yield asleep(self.vcore_retry_interval)
230
231 def refresh_agent_connections(self, devices):
232 """
233 Based on the new device list, update the following state in the class:
234 * agent_map
235 * datapath_map
236 * device_id_map
237 :param devices: full device list freshly received from Voltha
238 :return: None
239 """
240
241 # Use datapath ids for deciding what's new and what's obsolete
242 desired_datapath_ids = set(d.datapath_id for d in devices)
243 current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
244
245 # if identical, nothing to do
246 if desired_datapath_ids == current_datapath_ids:
247 return
248
249 # ... otherwise calculate differences
250 to_add = desired_datapath_ids.difference(current_datapath_ids)
251 to_del = current_datapath_ids.difference(desired_datapath_ids)
252
253 # remove what we don't need
254 for datapath_id in to_del:
255 self.delete_agent(datapath_id)
256
257 # start new agents as needed
258 for device in devices:
259 if device.datapath_id in to_add:
260 self.create_agent(device)
261
262 log.debug('updated-agent-list', count=len(self.agent_map))
263 log.debug('updated-device-id-to-datapath-id-map',
264 map=str(self.device_id_to_datapath_id_map))
265
266 def create_agent(self, device):
267 datapath_id = device.datapath_id
268 device_id = device.id
269 for controller_endpoint in self.controller_endpoints:
270 agent = Agent(controller_endpoint, datapath_id,
271 device_id, self.grpc_client, self.enable_tls,
272 self.key_file, self.cert_file)
273 agent.start()
274 self.agent_map[(datapath_id,controller_endpoint)] = agent
275 self.device_id_to_datapath_id_map[device_id] = datapath_id
276
277 def delete_agent(self, datapath_id):
278 for controller_endpoint in self.controller_endpoints:
279 agent = self.agent_map[(datapath_id,controller_endpoint)]
280 device_id = agent.get_device_id()
281 agent.stop()
282 del self.agent_map[(datapath_id,controller_endpoint)]
283 del self.device_id_to_datapath_id_map[device_id]
284
285 @inlineCallbacks
286 def monitor_logical_devices(self):
287 log.debug('start-monitor-logical-devices')
288
289 while self.running:
290 log.info('monitoring-logical-devices')
291
292 # should change to a gRPC streaming call
293 # see https://jira.opencord.org/browse/CORD-821
294
295 try:
296 if self.channel is not None and self.grpc_client is not None and \
297 self.subscription is not None:
298 # get current list from Voltha
299 devices = yield \
300 self.get_list_of_logical_devices_from_voltha()
301
302 # update agent list and mapping tables as needed
303 self.refresh_agent_connections(devices)
304 else:
305 log.info('vcore-communication-unavailable')
306
307 # wait before next poll
308 yield asleep(self.devices_refresh_interval)
309
310 except _Rendezvous, e:
311 log.error('vcore-communication-failure', exception=repr(e), status=e.code())
312
313 except Exception as e:
314 log.exception('unexpected-vcore-communication-failure', exception=repr(e))
315
316 log.debug('stop-monitor-logical-devices')
317
318 def forward_packet_in(self, device_id, ofp_packet_in):
319 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
320 if datapath_id:
321 for controller_endpoint in self.controller_endpoints:
322 agent = self.agent_map[(datapath_id, controller_endpoint)]
323 agent.forward_packet_in(ofp_packet_in)
324
325 def forward_change_event(self, device_id, event):
326 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
327 if datapath_id:
328 for controller_endpoint in self.controller_endpoints:
329 agent = self.agent_map[(datapath_id, controller_endpoint)]
330 agent.forward_change_event(event)