blob: 13ce2a0e6e15edfecdab03880233b63a42ea865f [file] [log] [blame]
Khen Nursimulu68b9be32016-10-25 11:57:04 -04001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Khen Nursimulu68b9be32016-10-25 11:57:04 -04003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
alshabib06b449c2017-01-15 17:33:16 -060016import os
Khen Nursimulu68b9be32016-10-25 11:57:04 -040017
18import sys
19
20from twisted.internet import reactor
21from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
22
23from common.utils.asleep import asleep
24from common.utils.consulhelpers import get_endpoint_from_consul
25from structlog import get_logger
26import grpc
Rouzbahan Rashidi-Tabrizi34b7dd72017-03-13 17:45:20 -040027from grpc import StatusCode
28from grpc._channel import _Rendezvous
Zsolt Haraszti1edb8282016-11-08 10:57:19 -080029from ofagent.protos import third_party
Khen Nursimulu68b9be32016-10-25 11:57:04 -040030from protos import voltha_pb2
Stephane Barbarie2940dac2017-08-18 14:15:17 -040031from protos.voltha_pb2 import OfAgentSubscriber
Khen Nursimulu68b9be32016-10-25 11:57:04 -040032from grpc_client import GrpcClient
33
34from agent import Agent
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080035from google.protobuf.empty_pb2 import Empty
Stephane Barbarie2940dac2017-08-18 14:15:17 -040036from common.utils.dockerhelpers import get_my_containers_name
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080037
Khen Nursimulu68b9be32016-10-25 11:57:04 -040038
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070039log = get_logger()
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080040# _ = third_party
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070041
Khen Nursimulu68b9be32016-10-25 11:57:04 -040042class ConnectionManager(object):
Stephane Barbarie2940dac2017-08-18 14:15:17 -040043 def __init__(self, consul_endpoint, vcore_endpoint, controller_endpoints,
Richard Jankowskic9d89202018-01-25 10:25:10 -050044 instance_id,
Girishf6eeaea2017-11-13 10:53:57 +053045 enable_tls=False, key_file=None, cert_file=None,
Stephane Barbarie2940dac2017-08-18 14:15:17 -040046 vcore_retry_interval=0.5, devices_refresh_interval=5,
47 subscription_refresh_interval=5):
Khen Nursimulu68b9be32016-10-25 11:57:04 -040048
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070049 log.info('init-connection-manager')
Stephane Barbarie2940dac2017-08-18 14:15:17 -040050 log.info('list-of-controllers', controller_endpoints=controller_endpoints)
sgovindacc736782017-05-02 20:06:37 +053051 self.controller_endpoints = controller_endpoints
Khen Nursimulu68b9be32016-10-25 11:57:04 -040052 self.consul_endpoint = consul_endpoint
Stephane Barbarie2940dac2017-08-18 14:15:17 -040053 self.vcore_endpoint = vcore_endpoint
Richard Jankowskic9d89202018-01-25 10:25:10 -050054 self.instance_id = instance_id
Girishf6eeaea2017-11-13 10:53:57 +053055 self.enable_tls = enable_tls
56 self.key_file = key_file
57 self.cert_file = cert_file
Khen Nursimulu68b9be32016-10-25 11:57:04 -040058
59 self.channel = None
Stephane Barbarie2940dac2017-08-18 14:15:17 -040060 self.grpc_client = None # single, shared gRPC client to vcore
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070061
sgovindacc736782017-05-02 20:06:37 +053062 self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070063 self.device_id_to_datapath_id_map = {}
Khen Nursimulu68b9be32016-10-25 11:57:04 -040064
Stephane Barbarie2940dac2017-08-18 14:15:17 -040065 self.vcore_retry_interval = vcore_retry_interval
Khen Nursimulu68b9be32016-10-25 11:57:04 -040066 self.devices_refresh_interval = devices_refresh_interval
Stephane Barbarie2940dac2017-08-18 14:15:17 -040067 self.subscription_refresh_interval = subscription_refresh_interval
68 self.subscription = None
Khen Nursimulu68b9be32016-10-25 11:57:04 -040069
70 self.running = False
71
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070072 def start(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070073
Khen Nursimulu68b9be32016-10-25 11:57:04 -040074 if self.running:
75 return
76
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070077 log.debug('starting')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040078
79 self.running = True
80
Stephane Barbarie2940dac2017-08-18 14:15:17 -040081 # Start monitoring the vcore grpc channel
82 reactor.callInThread(self.monitor_vcore_grpc_channel)
Khen Nursimulu68b9be32016-10-25 11:57:04 -040083
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070084 # Start monitoring logical devices and manage agents accordingly
85 reactor.callLater(0, self.monitor_logical_devices)
Khen Nursimulu68b9be32016-10-25 11:57:04 -040086
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070087 log.info('started')
88
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070089 return self
Khen Nursimulu68b9be32016-10-25 11:57:04 -040090
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070091 def stop(self):
92 log.debug('stopping')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040093 # clean up all controller connections
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070094 for agent in self.agent_map.itervalues():
95 agent.stop()
Khen Nursimulu68b9be32016-10-25 11:57:04 -040096 self.running = False
Stephane Barbarie2940dac2017-08-18 14:15:17 -040097
98 self._reset_grpc_attributes()
99
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700100 log.info('stopped')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400101
102 def resolve_endpoint(self, endpoint):
103 ip_port_endpoint = endpoint
104 if endpoint.startswith('@'):
105 try:
106 ip_port_endpoint = get_endpoint_from_consul(
107 self.consul_endpoint, endpoint[1:])
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700108 log.info(
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400109 '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400110 except Exception as e:
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400111 log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
112 log.error('committing-suicide')
alshabib06b449c2017-01-15 17:33:16 -0600113 # Committing suicide in order to let docker restart ofagent
114 os.system("kill -15 {}".format(os.getpid()))
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400115 if ip_port_endpoint:
116 host, port = ip_port_endpoint.split(':', 2)
117 return host, int(port)
118
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400119 def _reset_grpc_attributes(self):
120 log.debug('start-reset-grpc-attributes')
121
122 if self.grpc_client is not None:
123 self.grpc_client.stop()
124
125 if self.channel is not None:
126 del self.channel
127
128 self.is_alive = False
129 self.channel = None
130 self.subscription = None
131 self.grpc_client = None
132
133 log.debug('stop-reset-grpc-attributes')
134
135 def _assign_grpc_attributes(self):
136 log.debug('start-assign-grpc-attributes')
137
138 host, port = self.resolve_endpoint(self.vcore_endpoint)
139 log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
140
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400141 assert host is not None
142 assert port is not None
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400143
144 # Establish a connection to the vcore GRPC server
145 self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
146 self.is_alive = True
147
148 log.debug('stop-assign-grpc-attributes')
149
150 @inlineCallbacks
151 def monitor_vcore_grpc_channel(self):
152 log.debug('start-monitor-vcore-grpc-channel')
153
154 while self.running:
155 try:
156 # If a subscription is not yet assigned then establish new GRPC connection
157 # ... otherwise keep using existing connection details
158 if self.subscription is None:
159 self._assign_grpc_attributes()
160
161 # Send subscription request to register the current ofagent instance
Richard Jankowskic9d89202018-01-25 10:25:10 -0500162 container_name = self.instance_id
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400163 stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
164 subscription = stub.Subscribe(OfAgentSubscriber(ofagent_id=container_name))
165
166 # If the subscriber id matches the current instance
167 # ... then the subscription has succeeded
168 if subscription is not None and subscription.ofagent_id == container_name:
169 if self.subscription is None:
170 # Keep details on the current GRPC session and subscription
171 log.debug('subscription-with-vcore-successful', subscription=subscription)
172 self.subscription = subscription
173 self.grpc_client = GrpcClient(self, self.channel).start()
174
175 # Sleep a bit in between each subscribe
176 yield asleep(self.subscription_refresh_interval)
177
178 # Move on to next subscribe request
179 continue
180
181 # The subscription did not succeed, reset and move on
182 else:
183 log.info('subscription-with-vcore-unavailable', subscription=subscription)
184
185 except _Rendezvous, e:
186 log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
187
188 except Exception as e:
189 log.exception('unexpected-subscription-termination-with-vcore', e=e)
190
191 # Reset grpc details
192 # The vcore instance is either not available for subscription
193 # or a failure occurred with the existing communication.
194 self._reset_grpc_attributes()
195
196 # Sleep for a short period and retry
197 yield asleep(self.vcore_retry_interval)
198
199 log.debug('stop-monitor-vcore-grpc-channel')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400200
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400201 @inlineCallbacks
202 def get_list_of_logical_devices_from_voltha(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700203
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400204 while self.running:
205 log.info('retrieve-logical-device-list')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400206 try:
Zsolt Haraszti66862032016-11-28 14:28:39 -0800207 stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -0800208 devices = stub.ListLogicalDevices(Empty()).items
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400209 for device in devices:
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400210 log.info("logical-device-entry", id=device.id, datapath_id=device.datapath_id)
211
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700212 returnValue(devices)
213
Rouzbahan Rashidi-Tabrizi34b7dd72017-03-13 17:45:20 -0400214 except _Rendezvous, e:
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400215 log.error('vcore-communication-failure', exception=e, status=e.code())
Rouzbahan Rashidi-Tabrizi34b7dd72017-03-13 17:45:20 -0400216 if e.code() == StatusCode.UNAVAILABLE:
217 os.system("kill -15 {}".format(os.getpid()))
218
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400219 except Exception as e:
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400220 log.exception('logical-devices-retrieval-failure', exception=e)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400221
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400222 log.info('reconnect', after_delay=self.vcore_retry_interval)
223 yield asleep(self.vcore_retry_interval)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400224
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700225 def refresh_agent_connections(self, devices):
226 """
227 Based on the new device list, update the following state in the class:
228 * agent_map
229 * datapath_map
230 * device_id_map
231 :param devices: full device list freshly received from Voltha
232 :return: None
233 """
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400234
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700235 # Use datapath ids for deciding what's new and what's obsolete
236 desired_datapath_ids = set(d.datapath_id for d in devices)
sgovindacc736782017-05-02 20:06:37 +0530237 current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400238
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700239 # if identical, nothing to do
240 if desired_datapath_ids == current_datapath_ids:
241 return
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400242
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700243 # ... otherwise calculate differences
244 to_add = desired_datapath_ids.difference(current_datapath_ids)
245 to_del = current_datapath_ids.difference(desired_datapath_ids)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400246
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700247 # remove what we don't need
248 for datapath_id in to_del:
249 self.delete_agent(datapath_id)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400250
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700251 # start new agents as needed
252 for device in devices:
253 if device.datapath_id in to_add:
254 self.create_agent(device)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400255
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700256 log.debug('updated-agent-list', count=len(self.agent_map))
257 log.debug('updated-device-id-to-datapath-id-map',
258 map=str(self.device_id_to_datapath_id_map))
259
260 def create_agent(self, device):
261 datapath_id = device.datapath_id
262 device_id = device.id
sgovindacc736782017-05-02 20:06:37 +0530263 for controller_endpoint in self.controller_endpoints:
264 agent = Agent(controller_endpoint, datapath_id,
Girishf6eeaea2017-11-13 10:53:57 +0530265 device_id, self.grpc_client, self.enable_tls,
266 self.key_file, self.cert_file)
sgovindacc736782017-05-02 20:06:37 +0530267 agent.start()
268 self.agent_map[(datapath_id,controller_endpoint)] = agent
269 self.device_id_to_datapath_id_map[device_id] = datapath_id
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700270
271 def delete_agent(self, datapath_id):
sgovindacc736782017-05-02 20:06:37 +0530272 for controller_endpoint in self.controller_endpoints:
273 agent = self.agent_map[(datapath_id,controller_endpoint)]
274 device_id = agent.get_device_id()
275 agent.stop()
276 del self.agent_map[(datapath_id,controller_endpoint)]
277 del self.device_id_to_datapath_id_map[device_id]
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400278
279 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700280 def monitor_logical_devices(self):
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400281 log.debug('start-monitor-logical-devices')
282
283 while self.running:
284 log.info('monitoring-logical-devices')
285
alshabibc3fb4942017-01-26 15:34:24 -0800286 # should change to a gRPC streaming call
287 # see https://jira.opencord.org/browse/CORD-821
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700288
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400289 try:
290 if self.channel is not None and self.grpc_client is not None:
291 # get current list from Voltha
292 devices = yield self.get_list_of_logical_devices_from_voltha()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700293
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400294 # update agent list and mapping tables as needed
295 self.refresh_agent_connections(devices)
296 else:
297 log.info('vcore-communication-unavailable')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700298
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400299 # wait before next poll
300 yield asleep(self.devices_refresh_interval)
301
302 except _Rendezvous, e:
303 log.error('vcore-communication-failure', exception=repr(e), status=e.code())
304
305 except Exception as e:
306 log.exception('unexpected-vcore-communication-failure', exception=repr(e))
307
308 log.debug('stop-monitor-logical-devices')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400309
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700310 def forward_packet_in(self, device_id, ofp_packet_in):
311 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
312 if datapath_id:
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400313 for controller_endpoint in self.controller_endpoints:
314 agent = self.agent_map[(datapath_id, controller_endpoint)]
315 agent.forward_packet_in(ofp_packet_in)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800316
317 def forward_change_event(self, device_id, event):
318 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
319 if datapath_id:
Stephane Barbarie2940dac2017-08-18 14:15:17 -0400320 for controller_endpoint in self.controller_endpoints:
321 agent = self.agent_map[(datapath_id, controller_endpoint)]
322 agent.forward_change_event(event)