blob: 8a4081c43ca43ec2629bbc1beb29228ec45c7d89 [file] [log] [blame]
Khen Nursimulu68b9be32016-10-25 11:57:04 -04001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Khen Nursimulu68b9be32016-10-25 11:57:04 -04003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
alshabib06b449c2017-01-15 17:33:16 -060016import os
Khen Nursimulu68b9be32016-10-25 11:57:04 -040017
18import sys
19
20from twisted.internet import reactor
21from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
22
23from common.utils.asleep import asleep
24from common.utils.consulhelpers import get_endpoint_from_consul
25from structlog import get_logger
26import grpc
Rouzbahan Rashidi-Tabrizi34b7dd72017-03-13 17:45:20 -040027from grpc import StatusCode
28from grpc._channel import _Rendezvous
Zsolt Haraszti1edb8282016-11-08 10:57:19 -080029from ofagent.protos import third_party
Khen Nursimulu68b9be32016-10-25 11:57:04 -040030from protos import voltha_pb2
31from grpc_client import GrpcClient
32
33from agent import Agent
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080034from google.protobuf.empty_pb2 import Empty
35
Khen Nursimulu68b9be32016-10-25 11:57:04 -040036
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070037log = get_logger()
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080038# _ = third_party
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070039
Khen Nursimulu68b9be32016-10-25 11:57:04 -040040class ConnectionManager(object):
41
sgovindacc736782017-05-02 20:06:37 +053042 def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoints,
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070043 voltha_retry_interval=0.5, devices_refresh_interval=5):
Khen Nursimulu68b9be32016-10-25 11:57:04 -040044
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070045 log.info('init-connection-manager')
sgovindacc736782017-05-02 20:06:37 +053046 log.info('list-of-controllers',controller_endpoints=controller_endpoints)
47 self.controller_endpoints = controller_endpoints
Khen Nursimulu68b9be32016-10-25 11:57:04 -040048 self.consul_endpoint = consul_endpoint
49 self.voltha_endpoint = voltha_endpoint
50
51 self.channel = None
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070052 self.grpc_client = None # single, shared gRPC client to Voltha
53
sgovindacc736782017-05-02 20:06:37 +053054 self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070055 self.device_id_to_datapath_id_map = {}
Khen Nursimulu68b9be32016-10-25 11:57:04 -040056
57 self.voltha_retry_interval = voltha_retry_interval
58 self.devices_refresh_interval = devices_refresh_interval
59
60 self.running = False
61
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070062 def start(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070063
Khen Nursimulu68b9be32016-10-25 11:57:04 -040064 if self.running:
65 return
66
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070067 log.debug('starting')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040068
69 self.running = True
70
71 # Get voltha grpc endpoint
72 self.channel = self.get_grpc_channel_with_voltha()
73
Khen Nursimulu68b9be32016-10-25 11:57:04 -040074 # Create shared gRPC API object
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070075 self.grpc_client = GrpcClient(self, self.channel).start()
Khen Nursimulu68b9be32016-10-25 11:57:04 -040076
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070077 # Start monitoring logical devices and manage agents accordingly
78 reactor.callLater(0, self.monitor_logical_devices)
Khen Nursimulu68b9be32016-10-25 11:57:04 -040079
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070080 log.info('started')
81
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070082 return self
Khen Nursimulu68b9be32016-10-25 11:57:04 -040083
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070084 def stop(self):
85 log.debug('stopping')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040086 # clean up all controller connections
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070087 for agent in self.agent_map.itervalues():
88 agent.stop()
Khen Nursimulu68b9be32016-10-25 11:57:04 -040089 self.running = False
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070090 self.grpc_client.stop()
91 del self.channel
92 log.info('stopped')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040093
94 def resolve_endpoint(self, endpoint):
95 ip_port_endpoint = endpoint
96 if endpoint.startswith('@'):
97 try:
98 ip_port_endpoint = get_endpoint_from_consul(
99 self.consul_endpoint, endpoint[1:])
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700100 log.info(
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400101 'Found endpoint {} service at {}'.format(endpoint,
102 ip_port_endpoint))
103 except Exception as e:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700104 log.error('Failure to locate {} service from '
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400105 'consul {}:'.format(endpoint, repr(e)))
alshabib06b449c2017-01-15 17:33:16 -0600106 log.error('Committing suicide...')
107 # Committing suicide in order to let docker restart ofagent
108 os.system("kill -15 {}".format(os.getpid()))
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400109 if ip_port_endpoint:
110 host, port = ip_port_endpoint.split(':', 2)
111 return host, int(port)
112
113 def get_grpc_channel_with_voltha(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700114 log.info('Resolving voltha endpoint {} from consul'.format(
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400115 self.voltha_endpoint))
116 host, port = self.resolve_endpoint(self.voltha_endpoint)
117 assert host is not None
118 assert port is not None
119 # Create grpc channel to Voltha
120 channel = grpc.insecure_channel('{}:{}'.format(host, port))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700121 log.info('Acquired a grpc channel to voltha')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400122 return channel
123
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400124 @inlineCallbacks
125 def get_list_of_logical_devices_from_voltha(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700126
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400127 while True:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700128 log.info('Retrieve devices from voltha')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400129 try:
Zsolt Haraszti66862032016-11-28 14:28:39 -0800130 stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -0800131 devices = stub.ListLogicalDevices(Empty()).items
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400132 for device in devices:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700133 log.info("Devices {} -> {}".format(device.id,
Zsolt Haraszti3300f742017-01-09 01:14:20 -0800134 device.datapath_id))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700135 returnValue(devices)
136
Rouzbahan Rashidi-Tabrizi34b7dd72017-03-13 17:45:20 -0400137 except _Rendezvous, e:
138 if e.code() == StatusCode.UNAVAILABLE:
139 os.system("kill -15 {}".format(os.getpid()))
140
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400141 except Exception as e:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700142 log.error('Failure to retrieve devices from '
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400143 'voltha: {}'.format(repr(e)))
144
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700145 log.info('reconnect', after_delay=self.voltha_retry_interval)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400146 yield asleep(self.voltha_retry_interval)
147
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700148 def refresh_agent_connections(self, devices):
149 """
150 Based on the new device list, update the following state in the class:
151 * agent_map
152 * datapath_map
153 * device_id_map
154 :param devices: full device list freshly received from Voltha
155 :return: None
156 """
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400157
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700158 # Use datapath ids for deciding what's new and what's obsolete
159 desired_datapath_ids = set(d.datapath_id for d in devices)
sgovindacc736782017-05-02 20:06:37 +0530160 current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400161
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700162 # if identical, nothing to do
163 if desired_datapath_ids == current_datapath_ids:
164 return
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400165
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700166 # ... otherwise calculate differences
167 to_add = desired_datapath_ids.difference(current_datapath_ids)
168 to_del = current_datapath_ids.difference(desired_datapath_ids)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400169
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700170 # remove what we don't need
171 for datapath_id in to_del:
172 self.delete_agent(datapath_id)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400173
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700174 # start new agents as needed
175 for device in devices:
176 if device.datapath_id in to_add:
177 self.create_agent(device)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400178
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700179 log.debug('updated-agent-list', count=len(self.agent_map))
180 log.debug('updated-device-id-to-datapath-id-map',
181 map=str(self.device_id_to_datapath_id_map))
182
183 def create_agent(self, device):
184 datapath_id = device.datapath_id
185 device_id = device.id
sgovindacc736782017-05-02 20:06:37 +0530186 for controller_endpoint in self.controller_endpoints:
187 agent = Agent(controller_endpoint, datapath_id,
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700188 device_id, self.grpc_client)
sgovindacc736782017-05-02 20:06:37 +0530189 agent.start()
190 self.agent_map[(datapath_id,controller_endpoint)] = agent
191 self.device_id_to_datapath_id_map[device_id] = datapath_id
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700192
193 def delete_agent(self, datapath_id):
sgovindacc736782017-05-02 20:06:37 +0530194 for controller_endpoint in self.controller_endpoints:
195 agent = self.agent_map[(datapath_id,controller_endpoint)]
196 device_id = agent.get_device_id()
197 agent.stop()
198 del self.agent_map[(datapath_id,controller_endpoint)]
199 del self.device_id_to_datapath_id_map[device_id]
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400200
201 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700202 def monitor_logical_devices(self):
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400203 while True:
alshabibc3fb4942017-01-26 15:34:24 -0800204 # should change to a gRPC streaming call
205 # see https://jira.opencord.org/browse/CORD-821
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700206
207 # get current list from Voltha
208 devices = yield self.get_list_of_logical_devices_from_voltha()
209
210 # update agent list and mapping tables as needed
211 self.refresh_agent_connections(devices)
212
213 # wait before next poll
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400214 yield asleep(self.devices_refresh_interval)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700215 log.info('Monitor connections')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400216
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700217 def forward_packet_in(self, device_id, ofp_packet_in):
218 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
219 if datapath_id:
sgovindacc736782017-05-02 20:06:37 +0530220 for controller_endpoint in self.controller_endpoints:
221 agent = self.agent_map[(datapath_id,controller_endpoint)]
222 agent.forward_packet_in(ofp_packet_in)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800223
224 def forward_change_event(self, device_id, event):
225 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
226 if datapath_id:
sgovindacc736782017-05-02 20:06:37 +0530227 for controller_endpoint in self.controller_endpoints:
228 agent = self.agent_map[(datapath_id,controller_endpoint)]
229 agent.forward_change_event(event)