blob: 13fb6de9a5a740ee8550b52a7a21e2978c2e93ec [file] [log] [blame]
Khen Nursimulu68b9be32016-10-25 11:57:04 -04001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import sys
18
19from twisted.internet import reactor
20from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
21
22from common.utils.asleep import asleep
23from common.utils.consulhelpers import get_endpoint_from_consul
24from structlog import get_logger
25import grpc
26from protos import voltha_pb2
27from grpc_client import GrpcClient
28
29from agent import Agent
30
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070031log = get_logger()
32
33
Khen Nursimulu68b9be32016-10-25 11:57:04 -040034class ConnectionManager(object):
35
Khen Nursimulu68b9be32016-10-25 11:57:04 -040036 def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070037 voltha_retry_interval=0.5, devices_refresh_interval=5):
Khen Nursimulu68b9be32016-10-25 11:57:04 -040038
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070039 log.info('init-connection-manager')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040040 self.controller_endpoint = controller_endpoint
41 self.consul_endpoint = consul_endpoint
42 self.voltha_endpoint = voltha_endpoint
43
44 self.channel = None
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070045 self.grpc_client = None # single, shared gRPC client to Voltha
46
47 self.agent_map = {} # datapath_id -> Agent()
48 self.device_id_to_datapath_id_map = {}
Khen Nursimulu68b9be32016-10-25 11:57:04 -040049
50 self.voltha_retry_interval = voltha_retry_interval
51 self.devices_refresh_interval = devices_refresh_interval
52
53 self.running = False
54
Khen Nursimulu68b9be32016-10-25 11:57:04 -040055 def run(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070056
Khen Nursimulu68b9be32016-10-25 11:57:04 -040057 if self.running:
58 return
59
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070060 log.info('run-connection-manager')
Khen Nursimulu68b9be32016-10-25 11:57:04 -040061
62 self.running = True
63
64 # Get voltha grpc endpoint
65 self.channel = self.get_grpc_channel_with_voltha()
66
Khen Nursimulu68b9be32016-10-25 11:57:04 -040067 # Create shared gRPC API object
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070068 self.grpc_client = GrpcClient(self, self.channel)
Khen Nursimulu68b9be32016-10-25 11:57:04 -040069
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070070 # Start monitoring logical devices and manage agents accordingly
71 reactor.callLater(0, self.monitor_logical_devices)
Khen Nursimulu68b9be32016-10-25 11:57:04 -040072
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070073 return self
Khen Nursimulu68b9be32016-10-25 11:57:04 -040074
75 def shutdown(self):
76 # clean up all controller connections
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070077 for _, value in enumerate(self.agent_map):
Khen Nursimulu68b9be32016-10-25 11:57:04 -040078 value.stop()
79 self.running = False
80 # TODO: close grpc connection to voltha
81
82 def resolve_endpoint(self, endpoint):
83 ip_port_endpoint = endpoint
84 if endpoint.startswith('@'):
85 try:
86 ip_port_endpoint = get_endpoint_from_consul(
87 self.consul_endpoint, endpoint[1:])
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070088 log.info(
Khen Nursimulu68b9be32016-10-25 11:57:04 -040089 'Found endpoint {} service at {}'.format(endpoint,
90 ip_port_endpoint))
91 except Exception as e:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070092 log.error('Failure to locate {} service from '
Khen Nursimulu68b9be32016-10-25 11:57:04 -040093 'consul {}:'.format(endpoint, repr(e)))
94 return
95 if ip_port_endpoint:
96 host, port = ip_port_endpoint.split(':', 2)
97 return host, int(port)
98
99 def get_grpc_channel_with_voltha(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700100 log.info('Resolving voltha endpoint {} from consul'.format(
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400101 self.voltha_endpoint))
102 host, port = self.resolve_endpoint(self.voltha_endpoint)
103 assert host is not None
104 assert port is not None
105 # Create grpc channel to Voltha
106 channel = grpc.insecure_channel('{}:{}'.format(host, port))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700107 log.info('Acquired a grpc channel to voltha')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400108 return channel
109
110
111 @inlineCallbacks
112 def get_list_of_logical_devices_from_voltha(self):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700113
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400114 while True:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700115 log.info('Retrieve devices from voltha')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400116 try:
117 stub = voltha_pb2.VolthaLogicalLayerStub(self.channel)
118 devices = stub.ListLogicalDevices(
119 voltha_pb2.NullMessage()).items
120 for device in devices:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700121 log.info("Devices {} -> {}".format(device.id,
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400122 device.datapath_id))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700123 returnValue(devices)
124
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400125 except Exception as e:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700126 log.error('Failure to retrieve devices from '
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400127 'voltha: {}'.format(repr(e)))
128
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700129 log.info('reconnect', after_delay=self.voltha_retry_interval)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400130 yield asleep(self.voltha_retry_interval)
131
132
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700133 def refresh_agent_connections(self, devices):
134 """
135 Based on the new device list, update the following state in the class:
136 * agent_map
137 * datapath_map
138 * device_id_map
139 :param devices: full device list freshly received from Voltha
140 :return: None
141 """
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400142
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700143 # Use datapath ids for deciding what's new and what's obsolete
144 desired_datapath_ids = set(d.datapath_id for d in devices)
145 current_datapath_ids = set(self.agent_map.iterkeys())
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400146
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700147 # if identical, nothing to do
148 if desired_datapath_ids == current_datapath_ids:
149 return
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400150
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700151 # ... otherwise calculate differences
152 to_add = desired_datapath_ids.difference(current_datapath_ids)
153 to_del = current_datapath_ids.difference(desired_datapath_ids)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400154
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700155 # remove what we don't need
156 for datapath_id in to_del:
157 self.delete_agent(datapath_id)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400158
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700159 # start new agents as needed
160 for device in devices:
161 if device.datapath_id in to_add:
162 self.create_agent(device)
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400163
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700164 log.debug('updated-agent-list', count=len(self.agent_map))
165 log.debug('updated-device-id-to-datapath-id-map',
166 map=str(self.device_id_to_datapath_id_map))
167
168 def create_agent(self, device):
169 datapath_id = device.datapath_id
170 device_id = device.id
171 agent = Agent(self.controller_endpoint, datapath_id,
172 device_id, self.grpc_client)
173 agent.run()
174 self.agent_map[datapath_id] = agent
175 self.device_id_to_datapath_id_map[device_id] = datapath_id
176
177 def delete_agent(self, datapath_id):
178 agent = self.agent_map[datapath_id]
179 device_id = agent.get_device_id()
180 agent.stop()
181 del self.agent_map[datapath_id]
182 del self.device_id_to_datapath_id_map[device_id]
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400183
184 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700185 def monitor_logical_devices(self):
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400186 while True:
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700187 # TODO @khen We should switch to a polling mode based on a
188 # streaming gRPC method
189
190 # get current list from Voltha
191 devices = yield self.get_list_of_logical_devices_from_voltha()
192
193 # update agent list and mapping tables as needed
194 self.refresh_agent_connections(devices)
195
196 # wait before next poll
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400197 yield asleep(self.devices_refresh_interval)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700198 log.info('Monitor connections')
Khen Nursimulu68b9be32016-10-25 11:57:04 -0400199
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700200 def forward_packet_in(self, device_id, ofp_packet_in):
201 datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
202 if datapath_id:
203 agent = self.agent_map[datapath_id]
204 agent.forward_packet_in(ofp_packet_in)