blob: 126dc290d2ea122e69cc5441ac1e878373cefbd6 [file] [log] [blame]
Khen Nursimulu68b9be32016-10-25 11:57:04 -04001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Khen Nursimulu68b9be32016-10-25 11:57:04 -04003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
alshabib06b449c2017-01-15 17:33:16 -060016import os
Khen Nursimulu68b9be32016-10-25 11:57:04 -040017
18import sys
19
20from twisted.internet import reactor
21from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
22
23from common.utils.asleep import asleep
24from common.utils.consulhelpers import get_endpoint_from_consul
25from structlog import get_logger
26import grpc
Rouzbahan Rashidi-Tabrizi34b7dd72017-03-13 17:45:20 -040027from grpc import StatusCode
28from grpc._channel import _Rendezvous
Zsolt Haraszti1edb8282016-11-08 10:57:19 -080029from ofagent.protos import third_party
Khen Nursimulu68b9be32016-10-25 11:57:04 -040030from protos import voltha_pb2
Stephane Barbarie2940dac2017-08-18 14:15:17 -040031from protos.voltha_pb2 import OfAgentSubscriber
Khen Nursimulu68b9be32016-10-25 11:57:04 -040032from grpc_client import GrpcClient
33
34from agent import Agent
Stephane Barbarie2940dac2017-08-18 14:15:17 -040035from common.utils.dockerhelpers import get_my_containers_name
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080036
Khen Nursimulu68b9be32016-10-25 11:57:04 -040037
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080038# _ = third_party
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070039
Khen Nursimulu68b9be32016-10-25 11:57:04 -040040class ConnectionManager(object):
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040041 def __init__(self, consul_endpoint, vcore_endpoint, vcore_grpc_timeout,
42 controller_endpoints, instance_id,
Girishf6eeaea2017-11-13 10:53:57 +053043 enable_tls=False, key_file=None, cert_file=None,
Stephane Barbarie2940dac2017-08-18 14:15:17 -040044 vcore_retry_interval=0.5, devices_refresh_interval=5,
45 subscription_refresh_interval=5):
Khen Nursimulu68b9be32016-10-25 11:57:04 -040046
Zack Williams18357ed2018-11-14 10:41:08 -070047 self.log = get_logger()
48 self.log.info('init-connection-manager')
49 self.log.info('list-of-controllers',
50 controller_endpoints=controller_endpoints)
51
sgovindacc736782017-05-02 20:06:37 +053052 self.controller_endpoints = controller_endpoints
Khen Nursimulu68b9be32016-10-25 11:57:04 -040053 self.consul_endpoint = consul_endpoint
Stephane Barbarie2940dac2017-08-18 14:15:17 -040054 self.vcore_endpoint = vcore_endpoint
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040055 self.grpc_timeout = vcore_grpc_timeout
Richard Jankowskic9d89202018-01-25 10:25:10 -050056 self.instance_id = instance_id
Girishf6eeaea2017-11-13 10:53:57 +053057 self.enable_tls = enable_tls
58 self.key_file = key_file
59 self.cert_file = cert_file
Khen Nursimulu68b9be32016-10-25 11:57:04 -040060
61 self.channel = None
Stephane Barbarie2940dac2017-08-18 14:15:17 -040062 self.grpc_client = None # single, shared gRPC client to vcore
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070063
sgovindacc736782017-05-02 20:06:37 +053064 self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070065 self.device_id_to_datapath_id_map = {}
Khen Nursimulu68b9be32016-10-25 11:57:04 -040066
Stephane Barbarie2940dac2017-08-18 14:15:17 -040067 self.vcore_retry_interval = vcore_retry_interval
Khen Nursimulu68b9be32016-10-25 11:57:04 -040068 self.devices_refresh_interval = devices_refresh_interval
Stephane Barbarie2940dac2017-08-18 14:15:17 -040069 self.subscription_refresh_interval = subscription_refresh_interval
70 self.subscription = None
Khen Nursimulu68b9be32016-10-25 11:57:04 -040071
72 self.running = False
73
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070074 def start(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070075
Khen Nursimulu68b9be32016-10-25 11:57:04 -040076 if self.running:
77 return
78
Zack Williams18357ed2018-11-14 10:41:08 -070079 self.log.debug('starting')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040080
81 self.running = True
82
Stephane Barbarie2940dac2017-08-18 14:15:17 -040083 # Start monitoring the vcore grpc channel
84 reactor.callInThread(self.monitor_vcore_grpc_channel)
Khen Nursimulu68b9be32016-10-25 11:57:04 -040085
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070086 # Start monitoring logical devices and manage agents accordingly
87 reactor.callLater(0, self.monitor_logical_devices)
Khen Nursimulu68b9be32016-10-25 11:57:04 -040088
Zack Williams18357ed2018-11-14 10:41:08 -070089 self.log.info('started')
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070090
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070091 return self
Khen Nursimulu68b9be32016-10-25 11:57:04 -040092
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070093 def stop(self):
Zack Williams18357ed2018-11-14 10:41:08 -070094 self.log.debug('stopping')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040095 # clean up all controller connections
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070096 for agent in self.agent_map.itervalues():
97 agent.stop()
Khen Nursimulu68b9be32016-10-25 11:57:04 -040098 self.running = False
Stephane Barbarie2940dac2017-08-18 14:15:17 -040099
100 self._reset_grpc_attributes()
101
Zack Williams18357ed2018-11-14 10:41:08 -0700102 self.log.info('stopped')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400103
104 def resolve_endpoint(self, endpoint):
105 ip_port_endpoint = endpoint
106 if endpoint.startswith('@'):
107 try:
108 ip_port_endpoint = get_endpoint_from_consul(
109 self.consul_endpoint, endpoint[1:])
Zack Williams18357ed2018-11-14 10:41:08 -0700110 self.log.info(
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400111 '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400112 except Exception as e:
Zack Williams18357ed2018-11-14 10:41:08 -0700113 self.log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
114 self.log.error('committing-suicide')
alshabib06b449c2017-01-15 17:33:16 -0600115 # Committing suicide in order to let docker restart ofagent
116 os.system("kill -15 {}".format(os.getpid()))
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400117 if ip_port_endpoint:
118 host, port = ip_port_endpoint.split(':', 2)
119 return host, int(port)
120
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400121 def _reset_grpc_attributes(self):
Zack Williams18357ed2018-11-14 10:41:08 -0700122 self.log.debug('start-reset-grpc-attributes')
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400123
124 if self.grpc_client is not None:
125 self.grpc_client.stop()
126
127 if self.channel is not None:
128 del self.channel
129
130 self.is_alive = False
131 self.channel = None
132 self.subscription = None
133 self.grpc_client = None
134
Zack Williams18357ed2018-11-14 10:41:08 -0700135 self.log.debug('stop-reset-grpc-attributes')
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400136
137 def _assign_grpc_attributes(self):
Zack Williams18357ed2018-11-14 10:41:08 -0700138 self.log.debug('start-assign-grpc-attributes')
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400139
140 host, port = self.resolve_endpoint(self.vcore_endpoint)
Zack Williams18357ed2018-11-14 10:41:08 -0700141 self.log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400142
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400143 assert host is not None
144 assert port is not None
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400145
146 # Establish a connection to the vcore GRPC server
147 self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
148 self.is_alive = True
149
Zack Williams18357ed2018-11-14 10:41:08 -0700150 self.log.debug('stop-assign-grpc-attributes')
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400151
152 @inlineCallbacks
153 def monitor_vcore_grpc_channel(self):
Zack Williams18357ed2018-11-14 10:41:08 -0700154 self.log.debug('start-monitor-vcore-grpc-channel')
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400155
156 while self.running:
157 try:
158 # If a subscription is not yet assigned then establish new GRPC connection
159 # ... otherwise keep using existing connection details
160 if self.subscription is None:
161 self._assign_grpc_attributes()
162
163 # Send subscription request to register the current ofagent instance
Richard Jankowskic9d89202018-01-25 10:25:10 -0500164 container_name = self.instance_id
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400165 if self.grpc_client is None:
166 self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout)
167 subscription = yield self.grpc_client.subscribe(
168 OfAgentSubscriber(ofagent_id=container_name))
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400169
170 # If the subscriber id matches the current instance
171 # ... then the subscription has succeeded
172 if subscription is not None and subscription.ofagent_id == container_name:
173 if self.subscription is None:
174 # Keep details on the current GRPC session and subscription
Zack Williams18357ed2018-11-14 10:41:08 -0700175 self.log.debug('subscription-with-vcore-successful', subscription=subscription)
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400176 self.subscription = subscription
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400177 self.grpc_client.start()
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400178
179 # Sleep a bit in between each subscribe
180 yield asleep(self.subscription_refresh_interval)
181
182 # Move on to next subscribe request
183 continue
184
185 # The subscription did not succeed, reset and move on
186 else:
Zack Williams18357ed2018-11-14 10:41:08 -0700187 self.log.info('subscription-with-vcore-unavailable', subscription=subscription)
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400188
189 except _Rendezvous, e:
Zack Williams18357ed2018-11-14 10:41:08 -0700190 self.log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400191
192 except Exception as e:
Zack Williams18357ed2018-11-14 10:41:08 -0700193 self.log.exception('unexpected-subscription-termination-with-vcore', e=e)
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400194
195 # Reset grpc details
196 # The vcore instance is either not available for subscription
197 # or a failure occurred with the existing communication.
198 self._reset_grpc_attributes()
199
200 # Sleep for a short period and retry
201 yield asleep(self.vcore_retry_interval)
202
Zack Williams18357ed2018-11-14 10:41:08 -0700203 self.log.debug('stop-monitor-vcore-grpc-channel')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400204
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400205 @inlineCallbacks
Nicolas Palpacuer75ba77f2018-08-27 17:26:57 -0400206 def get_list_of_reachable_logical_devices_from_voltha(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700207
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400208 while self.running:
Zack Williams18357ed2018-11-14 10:41:08 -0700209 self.log.debug('retrieve-logical-device-list')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400210 try:
Nicolas Palpacuer75ba77f2018-08-27 17:26:57 -0400211 devices = yield \
212 self.grpc_client.list_reachable_logical_devices()
213
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400214 for device in devices:
Zack Williams18357ed2018-11-14 10:41:08 -0700215 self.log.debug("reachable-logical-device-entry", id=device.id,
216 datapath_id=device.datapath_id)
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400217
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700218 returnValue(devices)
219
Rouzbahan Rashidi-Tabrizi34b7dd72017-03-13 17:45:20 -0400220 except _Rendezvous, e:
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400221 status = e.code()
Zack Williams18357ed2018-11-14 10:41:08 -0700222 self.log.error('vcore-communication-failure', exception=e, status=status)
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400223 if status == StatusCode.UNAVAILABLE or status == StatusCode.DEADLINE_EXCEEDED:
Rouzbahan Rashidi-Tabrizi34b7dd72017-03-13 17:45:20 -0400224 os.system("kill -15 {}".format(os.getpid()))
225
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400226 except Exception as e:
Zack Williams18357ed2018-11-14 10:41:08 -0700227 self.log.exception('logical-devices-retrieval-failure', exception=e)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400228
Zack Williams18357ed2018-11-14 10:41:08 -0700229 self.log.info('reconnect', after_delay=self.vcore_retry_interval)
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400230 yield asleep(self.vcore_retry_interval)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400231
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700232 def refresh_agent_connections(self, devices):
233 """
234 Based on the new device list, update the following state in the class:
235 * agent_map
236 * datapath_map
237 * device_id_map
238 :param devices: full device list freshly received from Voltha
239 :return: None
240 """
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400241
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700242 # Use datapath ids for deciding what's new and what's obsolete
243 desired_datapath_ids = set(d.datapath_id for d in devices)
sgovindacc736782017-05-02 20:06:37 +0530244 current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400245
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700246 # if identical, nothing to do
247 if desired_datapath_ids == current_datapath_ids:
248 return
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400249
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700250 # ... otherwise calculate differences
251 to_add = desired_datapath_ids.difference(current_datapath_ids)
252 to_del = current_datapath_ids.difference(desired_datapath_ids)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400253
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700254 # remove what we don't need
255 for datapath_id in to_del:
256 self.delete_agent(datapath_id)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400257
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700258 # start new agents as needed
259 for device in devices:
260 if device.datapath_id in to_add:
261 self.create_agent(device)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400262
Zack Williams18357ed2018-11-14 10:41:08 -0700263 self.log.debug('updated-agent-list', count=len(self.agent_map))
264 self.log.debug('updated-device-id-to-datapath-id-map',
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700265 map=str(self.device_id_to_datapath_id_map))
266
267 def create_agent(self, device):
268 datapath_id = device.datapath_id
269 device_id = device.id
sgovindacc736782017-05-02 20:06:37 +0530270 for controller_endpoint in self.controller_endpoints:
271 agent = Agent(controller_endpoint, datapath_id,
Girishf6eeaea2017-11-13 10:53:57 +0530272 device_id, self.grpc_client, self.enable_tls,
273 self.key_file, self.cert_file)
sgovindacc736782017-05-02 20:06:37 +0530274 agent.start()
275 self.agent_map[(datapath_id,controller_endpoint)] = agent
276 self.device_id_to_datapath_id_map[device_id] = datapath_id
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700277
278 def delete_agent(self, datapath_id):
sgovindacc736782017-05-02 20:06:37 +0530279 for controller_endpoint in self.controller_endpoints:
280 agent = self.agent_map[(datapath_id,controller_endpoint)]
281 device_id = agent.get_device_id()
282 agent.stop()
283 del self.agent_map[(datapath_id,controller_endpoint)]
284 del self.device_id_to_datapath_id_map[device_id]
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400285
286 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700287 def monitor_logical_devices(self):
Zack Williams18357ed2018-11-14 10:41:08 -0700288 self.log.debug('start-monitor-logical-devices')
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400289
290 while self.running:
Zack Williams18357ed2018-11-14 10:41:08 -0700291 self.log.debug('monitoring-logical-devices')
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400292
alshabibc3fb4942017-01-26 15:34:24 -0800293 # should change to a gRPC streaming call
294 # see https://jira.opencord.org/browse/CORD-821
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700295
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400296 try:
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400297 if self.channel is not None and self.grpc_client is not None and \
298 self.subscription is not None:
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400299 # get current list from Voltha
Nicolas Palpacuer75ba77f2018-08-27 17:26:57 -0400300 reachable_devices = yield \
301 self.get_list_of_reachable_logical_devices_from_voltha()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700302
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400303 # update agent list and mapping tables as needed
Nicolas Palpacuer75ba77f2018-08-27 17:26:57 -0400304 self.refresh_agent_connections(reachable_devices)
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400305 else:
Zack Williams18357ed2018-11-14 10:41:08 -0700306 self.log.warning('vcore-communication-unavailable')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700307
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400308 # wait before next poll
309 yield asleep(self.devices_refresh_interval)
310
311 except _Rendezvous, e:
Zack Williams18357ed2018-11-14 10:41:08 -0700312 self.log.error('vcore-communication-failure', exception=repr(e), status=e.code())
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400313
314 except Exception as e:
Zack Williams18357ed2018-11-14 10:41:08 -0700315 self.log.exception('unexpected-vcore-communication-failure', exception=repr(e))
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400316
Zack Williams18357ed2018-11-14 10:41:08 -0700317 self.log.debug('stop-monitor-logical-devices')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400318
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700319 def forward_packet_in(self, device_id, ofp_packet_in):
320 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
321 if datapath_id:
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400322 for controller_endpoint in self.controller_endpoints:
323 agent = self.agent_map[(datapath_id, controller_endpoint)]
324 agent.forward_packet_in(ofp_packet_in)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800325
326 def forward_change_event(self, device_id, event):
327 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
328 if datapath_id:
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400329 for controller_endpoint in self.controller_endpoints:
330 agent = self.agent_map[(datapath_id, controller_endpoint)]
331 agent.forward_change_event(event)