blob: 9a171217531c98307471faae3e2db7900db1f789 [file] [log] [blame]
Khen Nursimulu68b9be32016-10-25 11:57:04 -04001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import sys
18
19from twisted.internet import reactor
20from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
21
22from common.utils.asleep import asleep
23from common.utils.consulhelpers import get_endpoint_from_consul
24from structlog import get_logger
25import grpc
26from protos import voltha_pb2
27from grpc_client import GrpcClient
28
29from agent import Agent
30
31class ConnectionManager(object):
32
33 log = get_logger()
34
35 def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
36 voltha_retry_interval=0.5, devices_refresh_interval=60):
37
38 self.log.info('Initializing connection manager')
39 self.controller_endpoint = controller_endpoint
40 self.consul_endpoint = consul_endpoint
41 self.voltha_endpoint = voltha_endpoint
42
43 self.channel = None
44 self.connected_devices = None
45 self.unprocessed_devices = None
46 self.agent_map = {}
47 self.grpc_client = None
48 self.device_id_map = None
49
50 self.voltha_retry_interval = voltha_retry_interval
51 self.devices_refresh_interval = devices_refresh_interval
52
53 self.running = False
54
55 @inlineCallbacks
56 def run(self):
57 if self.running:
58 return
59
60 self.log.info('Running connection manager')
61
62 self.running = True
63
64 # Get voltha grpc endpoint
65 self.channel = self.get_grpc_channel_with_voltha()
66
67 # Connect to voltha using grpc and fetch the list of logical devices
68 yield self.get_list_of_logical_devices_from_voltha()
69
70 # Create shared gRPC API object
71 self.grpc_client = GrpcClient(self.channel, self.device_id_map)
72
73 # Instantiate an OpenFlow agent for each logical device
74 self.refresh_openflow_agent_connections()
75
76 reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
77 reactor.callLater(0, self.monitor_connections)
78
79 returnValue(self)
80
81
82 def shutdown(self):
83 # clean up all controller connections
84 for key, value in enumerate(self.agent_map):
85 value.stop()
86 self.running = False
87 # TODO: close grpc connection to voltha
88
89 def resolve_endpoint(self, endpoint):
90 ip_port_endpoint = endpoint
91 if endpoint.startswith('@'):
92 try:
93 ip_port_endpoint = get_endpoint_from_consul(
94 self.consul_endpoint, endpoint[1:])
95 self.log.info(
96 'Found endpoint {} service at {}'.format(endpoint,
97 ip_port_endpoint))
98 except Exception as e:
99 self.log.error('Failure to locate {} service from '
100 'consul {}:'.format(endpoint, repr(e)))
101 return
102 if ip_port_endpoint:
103 host, port = ip_port_endpoint.split(':', 2)
104 return host, int(port)
105
106 def get_grpc_channel_with_voltha(self):
107 self.log.info('Resolving voltha endpoint {} from consul'.format(
108 self.voltha_endpoint))
109 host, port = self.resolve_endpoint(self.voltha_endpoint)
110 assert host is not None
111 assert port is not None
112 # Create grpc channel to Voltha
113 channel = grpc.insecure_channel('{}:{}'.format(host, port))
114 self.log.info('Acquired a grpc channel to voltha')
115 return channel
116
117
118 @inlineCallbacks
119 def get_list_of_logical_devices_from_voltha(self):
120 while True:
121 self.log.info('Retrieve devices from voltha')
122 try:
123 stub = voltha_pb2.VolthaLogicalLayerStub(self.channel)
124 devices = stub.ListLogicalDevices(
125 voltha_pb2.NullMessage()).items
126 for device in devices:
127 self.log.info("Devices {} -> {}".format(device.id,
128 device.datapath_id))
129 self.unprocessed_devices = devices
130 self.device_id_map = dict(
131 (device.datapath_id, device.id) for device in devices)
132 return
133 except Exception as e:
134 self.log.error('Failure to retrieve devices from '
135 'voltha: {}'.format(repr(e)))
136
137 self.log.info('reconnect', after_delay=self.voltha_retry_interval)
138 yield asleep(self.voltha_retry_interval)
139
140
141 def refresh_openflow_agent_connections(self):
142 # Compare the new device list again the previous
143 # For any new device, an agent connection will be created. For
144 # existing device that are no longer part of the list then that
145 # agent connection will be stopped
146
147 # If the ofagent has no previous devices then just add them
148 if self.connected_devices is None:
149 datapath_ids_to_add = [device.datapath_id for device in self.unprocessed_devices]
150 else:
151 previous_datapath_ids = [device.datapath_id for device in self.connected_devices]
152 current_datapath_ids = [device.datapath_id for device in self.unprocessed_devices]
153 datapath_ids_to_add = [d for d in current_datapath_ids if
154 d not in previous_datapath_ids]
155 datapath_ids_to_remove = [d for d in previous_datapath_ids if
156 d not in current_datapath_ids]
157
158 # Check for no change
159 if not datapath_ids_to_add and not datapath_ids_to_remove:
160 self.log.info('No new devices found. No OF agent update '
161 'required')
162 return
163
164 self.log.info('Updating OF agent connections.')
165 print self.agent_map
166
167 # Stop previous agents
168 for datapath_id in datapath_ids_to_remove:
169 if self.agent_map.has_key(datapath_id):
170 self.agent_map[datapath_id].stop()
171 del self.agent_map[datapath_id]
172 self.log.info('Removed OF agent with datapath id {'
173 '}'.format(datapath_id))
174
175 # Add the new agents
176 for datapath_id in datapath_ids_to_add:
177 self.agent_map[datapath_id] = Agent(self.controller_endpoint,
178 datapath_id,
179 self.grpc_client)
180 self.agent_map[datapath_id].run()
181 self.log.info('Launched OF agent with datapath id {}'.format(
182 datapath_id))
183
184 # replace the old device list with the new ones
185 self.connected_devices = self.unprocessed_devices
186 self.unprocessed_devices = None
187
188 @inlineCallbacks
189 def monitor_connections(self):
190 while True:
191 # sleep first
192 yield asleep(self.devices_refresh_interval)
193 self.log.info('Monitor connections')
194 yield self.get_list_of_logical_devices_from_voltha()
195 self.refresh_openflow_agent_connections()
196
197# class Model(object):
198# def __init__(self, id, path):
199# self.id=id
200# self.datapath_id=path,
201
202
203# if __name__ == '__main__':
204# conn = ConnectionManager("10.0.2.15:3181", "localhost:50555",
205# "10.100.198.150:6633")
206#
207# conn.connected_devices = None
208# model1 = Model('12311', 'wdadsa1')
209# model2 = Model('12312', 'wdadsa2')
210# model3 = Model('12313', 'wdadsa3')
211# model4 = Model('12314', 'wdadsa4')
212#
213# conn.unprocessed_devices = [model1, model2, model3]
214#
215# conn.refresh_openflow_agent_connections()
216#
217#
218# # val = [device.datapath_id for device in conn.connected_devices]
219# # print val
220# #
221# # for (id,n) in enumerate(val):
222# # print n
223#
224#
225# conn.unprocessed_devices = [model1, model2, model3]
226#
227# conn.refresh_openflow_agent_connections()
228#
229# conn.unprocessed_devices = [model1, model2, model4]
230#
231# conn.refresh_openflow_agent_connections()