blob: 479bf4ec5d61840beaf8cfc2b1ed51b9f183f384 [file] [log] [blame]
Khen Nursimulu68b9be32016-10-25 11:57:04 -04001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Khen Nursimulu68b9be32016-10-25 11:57:04 -04003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import sys
18
19from twisted.internet import reactor
20from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
21
22from common.utils.asleep import asleep
23from common.utils.consulhelpers import get_endpoint_from_consul
24from structlog import get_logger
25import grpc
Zsolt Haraszti1edb8282016-11-08 10:57:19 -080026from ofagent.protos import third_party
Khen Nursimulu68b9be32016-10-25 11:57:04 -040027from protos import voltha_pb2
28from grpc_client import GrpcClient
29
30from agent import Agent
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080031from google.protobuf.empty_pb2 import Empty
32
Khen Nursimulu68b9be32016-10-25 11:57:04 -040033
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070034log = get_logger()
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080035# _ = third_party
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070036
Khen Nursimulu68b9be32016-10-25 11:57:04 -040037class ConnectionManager(object):
38
Khen Nursimulu68b9be32016-10-25 11:57:04 -040039 def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070040 voltha_retry_interval=0.5, devices_refresh_interval=5):
Khen Nursimulu68b9be32016-10-25 11:57:04 -040041
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070042 log.info('init-connection-manager')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040043 self.controller_endpoint = controller_endpoint
44 self.consul_endpoint = consul_endpoint
45 self.voltha_endpoint = voltha_endpoint
46
47 self.channel = None
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070048 self.grpc_client = None # single, shared gRPC client to Voltha
49
50 self.agent_map = {} # datapath_id -> Agent()
51 self.device_id_to_datapath_id_map = {}
Khen Nursimulu68b9be32016-10-25 11:57:04 -040052
53 self.voltha_retry_interval = voltha_retry_interval
54 self.devices_refresh_interval = devices_refresh_interval
55
56 self.running = False
57
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070058 def start(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070059
Khen Nursimulu68b9be32016-10-25 11:57:04 -040060 if self.running:
61 return
62
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070063 log.debug('starting')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040064
65 self.running = True
66
67 # Get voltha grpc endpoint
68 self.channel = self.get_grpc_channel_with_voltha()
69
Khen Nursimulu68b9be32016-10-25 11:57:04 -040070 # Create shared gRPC API object
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070071 self.grpc_client = GrpcClient(self, self.channel).start()
Khen Nursimulu68b9be32016-10-25 11:57:04 -040072
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070073 # Start monitoring logical devices and manage agents accordingly
74 reactor.callLater(0, self.monitor_logical_devices)
Khen Nursimulu68b9be32016-10-25 11:57:04 -040075
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070076 log.info('started')
77
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070078 return self
Khen Nursimulu68b9be32016-10-25 11:57:04 -040079
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070080 def stop(self):
81 log.debug('stopping')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040082 # clean up all controller connections
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070083 for agent in self.agent_map.itervalues():
84 agent.stop()
Khen Nursimulu68b9be32016-10-25 11:57:04 -040085 self.running = False
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070086 self.grpc_client.stop()
87 del self.channel
88 log.info('stopped')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040089
90 def resolve_endpoint(self, endpoint):
91 ip_port_endpoint = endpoint
92 if endpoint.startswith('@'):
93 try:
94 ip_port_endpoint = get_endpoint_from_consul(
95 self.consul_endpoint, endpoint[1:])
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070096 log.info(
Khen Nursimulu68b9be32016-10-25 11:57:04 -040097 'Found endpoint {} service at {}'.format(endpoint,
98 ip_port_endpoint))
99 except Exception as e:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700100 log.error('Failure to locate {} service from '
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400101 'consul {}:'.format(endpoint, repr(e)))
102 return
103 if ip_port_endpoint:
104 host, port = ip_port_endpoint.split(':', 2)
105 return host, int(port)
106
107 def get_grpc_channel_with_voltha(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700108 log.info('Resolving voltha endpoint {} from consul'.format(
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400109 self.voltha_endpoint))
110 host, port = self.resolve_endpoint(self.voltha_endpoint)
111 assert host is not None
112 assert port is not None
113 # Create grpc channel to Voltha
114 channel = grpc.insecure_channel('{}:{}'.format(host, port))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700115 log.info('Acquired a grpc channel to voltha')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400116 return channel
117
118
119 @inlineCallbacks
120 def get_list_of_logical_devices_from_voltha(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700121
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400122 while True:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700123 log.info('Retrieve devices from voltha')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400124 try:
Zsolt Haraszti66862032016-11-28 14:28:39 -0800125 stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -0800126 devices = stub.ListLogicalDevices(Empty()).items
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400127 for device in devices:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700128 log.info("Devices {} -> {}".format(device.id,
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400129 device.datapath_id))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700130 returnValue(devices)
131
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400132 except Exception as e:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700133 log.error('Failure to retrieve devices from '
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400134 'voltha: {}'.format(repr(e)))
135
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700136 log.info('reconnect', after_delay=self.voltha_retry_interval)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400137 yield asleep(self.voltha_retry_interval)
138
139
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700140 def refresh_agent_connections(self, devices):
141 """
142 Based on the new device list, update the following state in the class:
143 * agent_map
144 * datapath_map
145 * device_id_map
146 :param devices: full device list freshly received from Voltha
147 :return: None
148 """
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400149
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700150 # Use datapath ids for deciding what's new and what's obsolete
151 desired_datapath_ids = set(d.datapath_id for d in devices)
152 current_datapath_ids = set(self.agent_map.iterkeys())
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400153
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700154 # if identical, nothing to do
155 if desired_datapath_ids == current_datapath_ids:
156 return
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400157
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700158 # ... otherwise calculate differences
159 to_add = desired_datapath_ids.difference(current_datapath_ids)
160 to_del = current_datapath_ids.difference(desired_datapath_ids)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400161
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700162 # remove what we don't need
163 for datapath_id in to_del:
164 self.delete_agent(datapath_id)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400165
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700166 # start new agents as needed
167 for device in devices:
168 if device.datapath_id in to_add:
169 self.create_agent(device)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400170
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700171 log.debug('updated-agent-list', count=len(self.agent_map))
172 log.debug('updated-device-id-to-datapath-id-map',
173 map=str(self.device_id_to_datapath_id_map))
174
175 def create_agent(self, device):
176 datapath_id = device.datapath_id
177 device_id = device.id
178 agent = Agent(self.controller_endpoint, datapath_id,
179 device_id, self.grpc_client)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700180 agent.start()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700181 self.agent_map[datapath_id] = agent
182 self.device_id_to_datapath_id_map[device_id] = datapath_id
183
184 def delete_agent(self, datapath_id):
185 agent = self.agent_map[datapath_id]
186 device_id = agent.get_device_id()
187 agent.stop()
188 del self.agent_map[datapath_id]
189 del self.device_id_to_datapath_id_map[device_id]
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400190
191 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700192 def monitor_logical_devices(self):
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400193 while True:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700194 # TODO @khen We should switch to a polling mode based on a
195 # streaming gRPC method
196
197 # get current list from Voltha
198 devices = yield self.get_list_of_logical_devices_from_voltha()
199
200 # update agent list and mapping tables as needed
201 self.refresh_agent_connections(devices)
202
203 # wait before next poll
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400204 yield asleep(self.devices_refresh_interval)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700205 log.info('Monitor connections')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400206
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700207 def forward_packet_in(self, device_id, ofp_packet_in):
208 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
209 if datapath_id:
210 agent = self.agent_map[datapath_id]
211 agent.forward_packet_in(ofp_packet_in)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800212
213 def forward_change_event(self, device_id, event):
214 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
215 if datapath_id:
216 agent = self.agent_map[datapath_id]
217 agent.forward_change_event(event)