blob: 9a171217531c98307471faae3e2db7900db1f789 [file] [log] [blame]
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
from twisted.internet import reactor
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
from common.utils.asleep import asleep
from common.utils.consulhelpers import get_endpoint_from_consul
from structlog import get_logger
import grpc
from protos import voltha_pb2
from grpc_client import GrpcClient
from agent import Agent
class ConnectionManager(object):
log = get_logger()
def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
voltha_retry_interval=0.5, devices_refresh_interval=60):
self.log.info('Initializing connection manager')
self.controller_endpoint = controller_endpoint
self.consul_endpoint = consul_endpoint
self.voltha_endpoint = voltha_endpoint
self.channel = None
self.connected_devices = None
self.unprocessed_devices = None
self.agent_map = {}
self.grpc_client = None
self.device_id_map = None
self.voltha_retry_interval = voltha_retry_interval
self.devices_refresh_interval = devices_refresh_interval
self.running = False
@inlineCallbacks
def run(self):
if self.running:
return
self.log.info('Running connection manager')
self.running = True
# Get voltha grpc endpoint
self.channel = self.get_grpc_channel_with_voltha()
# Connect to voltha using grpc and fetch the list of logical devices
yield self.get_list_of_logical_devices_from_voltha()
# Create shared gRPC API object
self.grpc_client = GrpcClient(self.channel, self.device_id_map)
# Instantiate an OpenFlow agent for each logical device
self.refresh_openflow_agent_connections()
reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
reactor.callLater(0, self.monitor_connections)
returnValue(self)
def shutdown(self):
# clean up all controller connections
for key, value in enumerate(self.agent_map):
value.stop()
self.running = False
# TODO: close grpc connection to voltha
def resolve_endpoint(self, endpoint):
ip_port_endpoint = endpoint
if endpoint.startswith('@'):
try:
ip_port_endpoint = get_endpoint_from_consul(
self.consul_endpoint, endpoint[1:])
self.log.info(
'Found endpoint {} service at {}'.format(endpoint,
ip_port_endpoint))
except Exception as e:
self.log.error('Failure to locate {} service from '
'consul {}:'.format(endpoint, repr(e)))
return
if ip_port_endpoint:
host, port = ip_port_endpoint.split(':', 2)
return host, int(port)
def get_grpc_channel_with_voltha(self):
self.log.info('Resolving voltha endpoint {} from consul'.format(
self.voltha_endpoint))
host, port = self.resolve_endpoint(self.voltha_endpoint)
assert host is not None
assert port is not None
# Create grpc channel to Voltha
channel = grpc.insecure_channel('{}:{}'.format(host, port))
self.log.info('Acquired a grpc channel to voltha')
return channel
@inlineCallbacks
def get_list_of_logical_devices_from_voltha(self):
while True:
self.log.info('Retrieve devices from voltha')
try:
stub = voltha_pb2.VolthaLogicalLayerStub(self.channel)
devices = stub.ListLogicalDevices(
voltha_pb2.NullMessage()).items
for device in devices:
self.log.info("Devices {} -> {}".format(device.id,
device.datapath_id))
self.unprocessed_devices = devices
self.device_id_map = dict(
(device.datapath_id, device.id) for device in devices)
return
except Exception as e:
self.log.error('Failure to retrieve devices from '
'voltha: {}'.format(repr(e)))
self.log.info('reconnect', after_delay=self.voltha_retry_interval)
yield asleep(self.voltha_retry_interval)
def refresh_openflow_agent_connections(self):
# Compare the new device list again the previous
# For any new device, an agent connection will be created. For
# existing device that are no longer part of the list then that
# agent connection will be stopped
# If the ofagent has no previous devices then just add them
if self.connected_devices is None:
datapath_ids_to_add = [device.datapath_id for device in self.unprocessed_devices]
else:
previous_datapath_ids = [device.datapath_id for device in self.connected_devices]
current_datapath_ids = [device.datapath_id for device in self.unprocessed_devices]
datapath_ids_to_add = [d for d in current_datapath_ids if
d not in previous_datapath_ids]
datapath_ids_to_remove = [d for d in previous_datapath_ids if
d not in current_datapath_ids]
# Check for no change
if not datapath_ids_to_add and not datapath_ids_to_remove:
self.log.info('No new devices found. No OF agent update '
'required')
return
self.log.info('Updating OF agent connections.')
print self.agent_map
# Stop previous agents
for datapath_id in datapath_ids_to_remove:
if self.agent_map.has_key(datapath_id):
self.agent_map[datapath_id].stop()
del self.agent_map[datapath_id]
self.log.info('Removed OF agent with datapath id {'
'}'.format(datapath_id))
# Add the new agents
for datapath_id in datapath_ids_to_add:
self.agent_map[datapath_id] = Agent(self.controller_endpoint,
datapath_id,
self.grpc_client)
self.agent_map[datapath_id].run()
self.log.info('Launched OF agent with datapath id {}'.format(
datapath_id))
# replace the old device list with the new ones
self.connected_devices = self.unprocessed_devices
self.unprocessed_devices = None
@inlineCallbacks
def monitor_connections(self):
while True:
# sleep first
yield asleep(self.devices_refresh_interval)
self.log.info('Monitor connections')
yield self.get_list_of_logical_devices_from_voltha()
self.refresh_openflow_agent_connections()
# class Model(object):
# def __init__(self, id, path):
# self.id=id
# self.datapath_id=path,
# if __name__ == '__main__':
# conn = ConnectionManager("10.0.2.15:3181", "localhost:50555",
# "10.100.198.150:6633")
#
# conn.connected_devices = None
# model1 = Model('12311', 'wdadsa1')
# model2 = Model('12312', 'wdadsa2')
# model3 = Model('12313', 'wdadsa3')
# model4 = Model('12314', 'wdadsa4')
#
# conn.unprocessed_devices = [model1, model2, model3]
#
# conn.refresh_openflow_agent_connections()
#
#
# # val = [device.datapath_id for device in conn.connected_devices]
# # print val
# #
# # for (id,n) in enumerate(val):
# # print n
#
#
# conn.unprocessed_devices = [model1, model2, model3]
#
# conn.refresh_openflow_agent_connections()
#
# conn.unprocessed_devices = [model1, model2, model4]
#
# conn.refresh_openflow_agent_connections()