blob: 0bb2f8909697c3c10f757b2e611a32202d9c1047 [file] [log] [blame]
Khen Nursimulu68b9be32016-10-25 11:57:04 -04001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import sys
18
19from twisted.internet import reactor
20from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
21
22from common.utils.asleep import asleep
23from common.utils.consulhelpers import get_endpoint_from_consul
24from structlog import get_logger
25import grpc
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070026from ofagent.protos import third_party
Khen Nursimulu68b9be32016-10-25 11:57:04 -040027from protos import voltha_pb2
28from grpc_client import GrpcClient
29
30from agent import Agent
31
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070032log = get_logger()
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070033_ = third_party
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070034
Khen Nursimulu68b9be32016-10-25 11:57:04 -040035class ConnectionManager(object):
36
Khen Nursimulu68b9be32016-10-25 11:57:04 -040037 def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070038 voltha_retry_interval=0.5, devices_refresh_interval=5):
Khen Nursimulu68b9be32016-10-25 11:57:04 -040039
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070040 log.info('init-connection-manager')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040041 self.controller_endpoint = controller_endpoint
42 self.consul_endpoint = consul_endpoint
43 self.voltha_endpoint = voltha_endpoint
44
45 self.channel = None
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070046 self.grpc_client = None # single, shared gRPC client to Voltha
47
48 self.agent_map = {} # datapath_id -> Agent()
49 self.device_id_to_datapath_id_map = {}
Khen Nursimulu68b9be32016-10-25 11:57:04 -040050
51 self.voltha_retry_interval = voltha_retry_interval
52 self.devices_refresh_interval = devices_refresh_interval
53
54 self.running = False
55
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070056 def start(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070057
Khen Nursimulu68b9be32016-10-25 11:57:04 -040058 if self.running:
59 return
60
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070061 log.debug('starting')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040062
63 self.running = True
64
65 # Get voltha grpc endpoint
66 self.channel = self.get_grpc_channel_with_voltha()
67
Khen Nursimulu68b9be32016-10-25 11:57:04 -040068 # Create shared gRPC API object
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070069 self.grpc_client = GrpcClient(self, self.channel).start()
Khen Nursimulu68b9be32016-10-25 11:57:04 -040070
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070071 # Start monitoring logical devices and manage agents accordingly
72 reactor.callLater(0, self.monitor_logical_devices)
Khen Nursimulu68b9be32016-10-25 11:57:04 -040073
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070074 log.info('started')
75
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070076 return self
Khen Nursimulu68b9be32016-10-25 11:57:04 -040077
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070078 def stop(self):
79 log.debug('stopping')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040080 # clean up all controller connections
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070081 for agent in self.agent_map.itervalues():
82 agent.stop()
Khen Nursimulu68b9be32016-10-25 11:57:04 -040083 self.running = False
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070084 self.grpc_client.stop()
85 del self.channel
86 log.info('stopped')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040087
88 def resolve_endpoint(self, endpoint):
89 ip_port_endpoint = endpoint
90 if endpoint.startswith('@'):
91 try:
92 ip_port_endpoint = get_endpoint_from_consul(
93 self.consul_endpoint, endpoint[1:])
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070094 log.info(
Khen Nursimulu68b9be32016-10-25 11:57:04 -040095 'Found endpoint {} service at {}'.format(endpoint,
96 ip_port_endpoint))
97 except Exception as e:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070098 log.error('Failure to locate {} service from '
Khen Nursimulu68b9be32016-10-25 11:57:04 -040099 'consul {}:'.format(endpoint, repr(e)))
100 return
101 if ip_port_endpoint:
102 host, port = ip_port_endpoint.split(':', 2)
103 return host, int(port)
104
105 def get_grpc_channel_with_voltha(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700106 log.info('Resolving voltha endpoint {} from consul'.format(
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400107 self.voltha_endpoint))
108 host, port = self.resolve_endpoint(self.voltha_endpoint)
109 assert host is not None
110 assert port is not None
111 # Create grpc channel to Voltha
112 channel = grpc.insecure_channel('{}:{}'.format(host, port))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700113 log.info('Acquired a grpc channel to voltha')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400114 return channel
115
116
117 @inlineCallbacks
118 def get_list_of_logical_devices_from_voltha(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700119
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400120 while True:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700121 log.info('Retrieve devices from voltha')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400122 try:
123 stub = voltha_pb2.VolthaLogicalLayerStub(self.channel)
124 devices = stub.ListLogicalDevices(
125 voltha_pb2.NullMessage()).items
126 for device in devices:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700127 log.info("Devices {} -> {}".format(device.id,
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400128 device.datapath_id))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700129 returnValue(devices)
130
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400131 except Exception as e:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700132 log.error('Failure to retrieve devices from '
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400133 'voltha: {}'.format(repr(e)))
134
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700135 log.info('reconnect', after_delay=self.voltha_retry_interval)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400136 yield asleep(self.voltha_retry_interval)
137
138
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700139 def refresh_agent_connections(self, devices):
140 """
141 Based on the new device list, update the following state in the class:
142 * agent_map
143 * datapath_map
144 * device_id_map
145 :param devices: full device list freshly received from Voltha
146 :return: None
147 """
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400148
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700149 # Use datapath ids for deciding what's new and what's obsolete
150 desired_datapath_ids = set(d.datapath_id for d in devices)
151 current_datapath_ids = set(self.agent_map.iterkeys())
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400152
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700153 # if identical, nothing to do
154 if desired_datapath_ids == current_datapath_ids:
155 return
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400156
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700157 # ... otherwise calculate differences
158 to_add = desired_datapath_ids.difference(current_datapath_ids)
159 to_del = current_datapath_ids.difference(desired_datapath_ids)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400160
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700161 # remove what we don't need
162 for datapath_id in to_del:
163 self.delete_agent(datapath_id)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400164
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700165 # start new agents as needed
166 for device in devices:
167 if device.datapath_id in to_add:
168 self.create_agent(device)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400169
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700170 log.debug('updated-agent-list', count=len(self.agent_map))
171 log.debug('updated-device-id-to-datapath-id-map',
172 map=str(self.device_id_to_datapath_id_map))
173
174 def create_agent(self, device):
175 datapath_id = device.datapath_id
176 device_id = device.id
177 agent = Agent(self.controller_endpoint, datapath_id,
178 device_id, self.grpc_client)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700179 agent.start()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700180 self.agent_map[datapath_id] = agent
181 self.device_id_to_datapath_id_map[device_id] = datapath_id
182
183 def delete_agent(self, datapath_id):
184 agent = self.agent_map[datapath_id]
185 device_id = agent.get_device_id()
186 agent.stop()
187 del self.agent_map[datapath_id]
188 del self.device_id_to_datapath_id_map[device_id]
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400189
190 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700191 def monitor_logical_devices(self):
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400192 while True:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700193 # TODO @khen We should switch to a polling mode based on a
194 # streaming gRPC method
195
196 # get current list from Voltha
197 devices = yield self.get_list_of_logical_devices_from_voltha()
198
199 # update agent list and mapping tables as needed
200 self.refresh_agent_connections(devices)
201
202 # wait before next poll
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400203 yield asleep(self.devices_refresh_interval)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700204 log.info('Monitor connections')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400205
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700206 def forward_packet_in(self, device_id, ofp_packet_in):
207 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
208 if datapath_id:
209 agent = self.agent_map[datapath_id]
210 agent.forward_packet_in(ofp_packet_in)