blob: fea4910cd5e1555eac147106fb79a950e10f298e [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
17
18import sys
19
20from twisted.internet import reactor
21from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
22
23from common.utils.asleep import asleep
24from common.utils.consulhelpers import get_endpoint_from_consul
25from structlog import get_logger
26import grpc
27from grpc import StatusCode
28from grpc._channel import _Rendezvous
29from ofagent.protos import third_party
30from protos import voltha_pb2
31from protos.voltha_pb2 import OfAgentSubscriber
32from grpc_client import GrpcClient
33
34from agent import Agent
35from common.utils.dockerhelpers import get_my_containers_name
36
37
38log = get_logger()
39# _ = third_party
40
41class ConnectionManager(object):
Richard Jankowski46464e92019-03-05 11:53:55 -050042 def __init__(self, consul_endpoint,
43 vcore_endpoint, vcore_grpc_timeout, vcore_binding_key,
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050044 controller_endpoints, instance_id,
45 enable_tls=False, key_file=None, cert_file=None,
46 vcore_retry_interval=0.5, devices_refresh_interval=5,
47 subscription_refresh_interval=5):
48
49 log.info('init-connection-manager')
50 log.info('list-of-controllers', controller_endpoints=controller_endpoints)
51 self.controller_endpoints = controller_endpoints
52 self.consul_endpoint = consul_endpoint
53 self.vcore_endpoint = vcore_endpoint
54 self.grpc_timeout = vcore_grpc_timeout
Richard Jankowski46464e92019-03-05 11:53:55 -050055 self.core_binding_key = vcore_binding_key
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050056 self.instance_id = instance_id
57 self.enable_tls = enable_tls
58 self.key_file = key_file
59 self.cert_file = cert_file
60
61 self.channel = None
62 self.grpc_client = None # single, shared gRPC client to vcore
63
64 self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
65 self.device_id_to_datapath_id_map = {}
66
67 self.vcore_retry_interval = vcore_retry_interval
68 self.devices_refresh_interval = devices_refresh_interval
69 self.subscription_refresh_interval = subscription_refresh_interval
70 self.subscription = None
71
72 self.running = False
73
74 def start(self):
75
76 if self.running:
77 return
78
79 log.debug('starting')
80
81 self.running = True
82
83 # Get a subscription to vcore
84 reactor.callInThread(self.get_vcore_subscription)
85
86 # Start monitoring logical devices and manage agents accordingly
87 reactor.callLater(0, self.monitor_logical_devices)
88
89 log.info('started')
90
91 return self
92
93 def stop(self):
94 log.debug('stopping')
95 # clean up all controller connections
96 for agent in self.agent_map.itervalues():
97 agent.stop()
98 self.running = False
99
100 self._reset_grpc_attributes()
101
102 log.info('stopped')
103
104 def resolve_endpoint(self, endpoint):
105 ip_port_endpoint = endpoint
106 if endpoint.startswith('@'):
107 try:
108 ip_port_endpoint = get_endpoint_from_consul(
109 self.consul_endpoint, endpoint[1:])
110 log.info(
111 '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
112 except Exception as e:
113 log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
114 log.error('committing-suicide')
115 # Committing suicide in order to let docker restart ofagent
116 os.system("kill -15 {}".format(os.getpid()))
117 if ip_port_endpoint:
118 host, port = ip_port_endpoint.split(':', 2)
119 return host, int(port)
120
121 def _reset_grpc_attributes(self):
122 log.debug('start-reset-grpc-attributes')
123
124 if self.grpc_client is not None:
125 self.grpc_client.stop()
126
127 if self.channel is not None:
128 del self.channel
129
130 self.is_alive = False
131 self.channel = None
132 self.subscription = None
133 self.grpc_client = None
134
135 log.debug('stop-reset-grpc-attributes')
136
137 def _assign_grpc_attributes(self):
138 log.debug('start-assign-grpc-attributes')
139
140 host, port = self.resolve_endpoint(self.vcore_endpoint)
141 log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
142
143 assert host is not None
144 assert port is not None
145
146 # Establish a connection to the vcore GRPC server
147 self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
148 self.is_alive = True
149
150 log.debug('stop-assign-grpc-attributes')
151
152 @inlineCallbacks
153 def get_vcore_subscription(self):
154 log.debug('start-get-vcore-subscription')
155
156 while self.running and self.subscription is None:
157 try:
158 # If a subscription is not yet assigned then establish new GRPC connection
159 # ... otherwise keep using existing connection details
160 if self.subscription is None:
161 self._assign_grpc_attributes()
162
163 # Send subscription request to register the current ofagent instance
164 container_name = self.instance_id
165 if self.grpc_client is None:
Richard Jankowski46464e92019-03-05 11:53:55 -0500166 self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout,
167 self.core_binding_key)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500168 subscription = yield self.grpc_client.subscribe(
169 OfAgentSubscriber(ofagent_id=container_name))
170
171 # If the subscriber id matches the current instance
172 # ... then the subscription has succeeded
173 if subscription is not None and subscription.ofagent_id == container_name:
174 if self.subscription is None:
175 # Keep details on the current GRPC session and subscription
176 log.debug('subscription-with-vcore-successful', subscription=subscription)
177 self.subscription = subscription
178 self.grpc_client.start()
179
180 # Sleep a bit in between each subscribe
181 yield asleep(self.subscription_refresh_interval)
182
183 # Move on to next subscribe request
184 continue
185
186 # The subscription did not succeed, reset and move on
187 else:
188 log.info('subscription-with-vcore-unavailable', subscription=subscription)
189
190 except _Rendezvous, e:
191 log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
192
193 except Exception as e:
194 log.exception('unexpected-subscription-termination-with-vcore', e=e)
195
196 # Reset grpc details
197 # The vcore instance is either not available for subscription
198 # or a failure occurred with the existing communication.
199 self._reset_grpc_attributes()
200
201 # Sleep for a short period and retry
202 yield asleep(self.vcore_retry_interval)
203
204 log.debug('stop-get-vcore-subscription')
205
206 @inlineCallbacks
207 def get_list_of_logical_devices_from_voltha(self):
208
209 while self.running:
210 log.info('retrieve-logical-device-list')
211 try:
212 devices = yield \
213 self.grpc_client.list_logical_devices()
214
215 for device in devices:
216 log.info("logical-device-entry", id=device.id,
217 datapath_id=device.datapath_id)
218
219 returnValue(devices)
220
221 except _Rendezvous, e:
222 status = e.code()
223 log.error('vcore-communication-failure', exception=e, status=status)
224 if status == StatusCode.UNAVAILABLE or status == StatusCode.DEADLINE_EXCEEDED:
225 os.system("kill -15 {}".format(os.getpid()))
226
227 except Exception as e:
228 log.exception('logical-devices-retrieval-failure', exception=e)
229
230 log.info('reconnect', after_delay=self.vcore_retry_interval)
231 yield asleep(self.vcore_retry_interval)
232
233 def refresh_agent_connections(self, devices):
234 """
235 Based on the new device list, update the following state in the class:
236 * agent_map
237 * datapath_map
238 * device_id_map
239 :param devices: full device list freshly received from Voltha
240 :return: None
241 """
242
243 # Use datapath ids for deciding what's new and what's obsolete
244 desired_datapath_ids = set(d.datapath_id for d in devices)
245 current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
246
247 # if identical, nothing to do
248 if desired_datapath_ids == current_datapath_ids:
249 return
250
251 # ... otherwise calculate differences
252 to_add = desired_datapath_ids.difference(current_datapath_ids)
253 to_del = current_datapath_ids.difference(desired_datapath_ids)
254
255 # remove what we don't need
256 for datapath_id in to_del:
257 self.delete_agent(datapath_id)
258
259 # start new agents as needed
260 for device in devices:
261 if device.datapath_id in to_add:
262 self.create_agent(device)
263
264 log.debug('updated-agent-list', count=len(self.agent_map))
265 log.debug('updated-device-id-to-datapath-id-map',
266 map=str(self.device_id_to_datapath_id_map))
267
268 def create_agent(self, device):
269 datapath_id = device.datapath_id
270 device_id = device.id
271 for controller_endpoint in self.controller_endpoints:
272 agent = Agent(controller_endpoint, datapath_id,
273 device_id, self.grpc_client, self.enable_tls,
274 self.key_file, self.cert_file)
275 agent.start()
276 self.agent_map[(datapath_id,controller_endpoint)] = agent
277 self.device_id_to_datapath_id_map[device_id] = datapath_id
278
279 def delete_agent(self, datapath_id):
280 for controller_endpoint in self.controller_endpoints:
281 agent = self.agent_map[(datapath_id,controller_endpoint)]
282 device_id = agent.get_device_id()
283 agent.stop()
284 del self.agent_map[(datapath_id,controller_endpoint)]
285 del self.device_id_to_datapath_id_map[device_id]
286
287 @inlineCallbacks
288 def monitor_logical_devices(self):
289 log.debug('start-monitor-logical-devices')
290
291 while self.running:
292 log.info('monitoring-logical-devices')
293
294 # should change to a gRPC streaming call
295 # see https://jira.opencord.org/browse/CORD-821
296
297 try:
298 if self.channel is not None and self.grpc_client is not None and \
299 self.subscription is not None:
300 # get current list from Voltha
301 devices = yield \
302 self.get_list_of_logical_devices_from_voltha()
303
304 # update agent list and mapping tables as needed
305 self.refresh_agent_connections(devices)
306 else:
307 log.info('vcore-communication-unavailable')
308
309 # wait before next poll
310 yield asleep(self.devices_refresh_interval)
311
312 except _Rendezvous, e:
313 log.error('vcore-communication-failure', exception=repr(e), status=e.code())
314
315 except Exception as e:
316 log.exception('unexpected-vcore-communication-failure', exception=repr(e))
317
318 log.debug('stop-monitor-logical-devices')
319
320 def forward_packet_in(self, device_id, ofp_packet_in):
321 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
322 if datapath_id:
323 for controller_endpoint in self.controller_endpoints:
324 agent = self.agent_map[(datapath_id, controller_endpoint)]
325 agent.forward_packet_in(ofp_packet_in)
326
327 def forward_change_event(self, device_id, event):
328 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
329 if datapath_id:
330 for controller_endpoint in self.controller_endpoints:
331 agent = self.agent_map[(datapath_id, controller_endpoint)]
332 agent.forward_change_event(event)