blob: 1aa703159fe43e76c5eb26c9d75767e841daa1b0 [file] [log] [blame]
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
from twisted.internet import reactor
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
from common.utils.asleep import asleep
from common.utils.consulhelpers import get_endpoint_from_consul
from structlog import get_logger
import grpc
from ofagent.protos import third_party
from protos import voltha_pb2
from grpc_client import GrpcClient
from agent import Agent
from google.protobuf.empty_pb2 import Empty
log = get_logger()
# _ = third_party
class ConnectionManager(object):
def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
voltha_retry_interval=0.5, devices_refresh_interval=5):
log.info('init-connection-manager')
self.controller_endpoint = controller_endpoint
self.consul_endpoint = consul_endpoint
self.voltha_endpoint = voltha_endpoint
self.channel = None
self.grpc_client = None # single, shared gRPC client to Voltha
self.agent_map = {} # datapath_id -> Agent()
self.device_id_to_datapath_id_map = {}
self.voltha_retry_interval = voltha_retry_interval
self.devices_refresh_interval = devices_refresh_interval
self.running = False
def start(self):
if self.running:
return
log.debug('starting')
self.running = True
# Get voltha grpc endpoint
self.channel = self.get_grpc_channel_with_voltha()
# Create shared gRPC API object
self.grpc_client = GrpcClient(self, self.channel).start()
# Start monitoring logical devices and manage agents accordingly
reactor.callLater(0, self.monitor_logical_devices)
log.info('started')
return self
def stop(self):
log.debug('stopping')
# clean up all controller connections
for agent in self.agent_map.itervalues():
agent.stop()
self.running = False
self.grpc_client.stop()
del self.channel
log.info('stopped')
def resolve_endpoint(self, endpoint):
ip_port_endpoint = endpoint
if endpoint.startswith('@'):
try:
ip_port_endpoint = get_endpoint_from_consul(
self.consul_endpoint, endpoint[1:])
log.info(
'Found endpoint {} service at {}'.format(endpoint,
ip_port_endpoint))
except Exception as e:
log.error('Failure to locate {} service from '
'consul {}:'.format(endpoint, repr(e)))
return
if ip_port_endpoint:
host, port = ip_port_endpoint.split(':', 2)
return host, int(port)
def get_grpc_channel_with_voltha(self):
log.info('Resolving voltha endpoint {} from consul'.format(
self.voltha_endpoint))
host, port = self.resolve_endpoint(self.voltha_endpoint)
assert host is not None
assert port is not None
# Create grpc channel to Voltha
channel = grpc.insecure_channel('{}:{}'.format(host, port))
log.info('Acquired a grpc channel to voltha')
return channel
@inlineCallbacks
def get_list_of_logical_devices_from_voltha(self):
while True:
log.info('Retrieve devices from voltha')
try:
stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
devices = stub.ListLogicalDevices(Empty()).items
for device in devices:
log.info("Devices {} -> {}".format(device.id,
device.datapath_id))
returnValue(devices)
except Exception as e:
log.error('Failure to retrieve devices from '
'voltha: {}'.format(repr(e)))
log.info('reconnect', after_delay=self.voltha_retry_interval)
yield asleep(self.voltha_retry_interval)
def refresh_agent_connections(self, devices):
"""
Based on the new device list, update the following state in the class:
* agent_map
* datapath_map
* device_id_map
:param devices: full device list freshly received from Voltha
:return: None
"""
# Use datapath ids for deciding what's new and what's obsolete
desired_datapath_ids = set(d.datapath_id for d in devices)
current_datapath_ids = set(self.agent_map.iterkeys())
# if identical, nothing to do
if desired_datapath_ids == current_datapath_ids:
return
# ... otherwise calculate differences
to_add = desired_datapath_ids.difference(current_datapath_ids)
to_del = current_datapath_ids.difference(desired_datapath_ids)
# remove what we don't need
for datapath_id in to_del:
self.delete_agent(datapath_id)
# start new agents as needed
for device in devices:
if device.datapath_id in to_add:
self.create_agent(device)
log.debug('updated-agent-list', count=len(self.agent_map))
log.debug('updated-device-id-to-datapath-id-map',
map=str(self.device_id_to_datapath_id_map))
def create_agent(self, device):
datapath_id = device.datapath_id
device_id = device.id
agent = Agent(self.controller_endpoint, datapath_id,
device_id, self.grpc_client)
agent.start()
self.agent_map[datapath_id] = agent
self.device_id_to_datapath_id_map[device_id] = datapath_id
def delete_agent(self, datapath_id):
agent = self.agent_map[datapath_id]
device_id = agent.get_device_id()
agent.stop()
del self.agent_map[datapath_id]
del self.device_id_to_datapath_id_map[device_id]
@inlineCallbacks
def monitor_logical_devices(self):
while True:
# TODO @khen We should switch to a polling mode based on a
# streaming gRPC method
# get current list from Voltha
devices = yield self.get_list_of_logical_devices_from_voltha()
# update agent list and mapping tables as needed
self.refresh_agent_connections(devices)
# wait before next poll
yield asleep(self.devices_refresh_interval)
log.info('Monitor connections')
def forward_packet_in(self, device_id, ofp_packet_in):
datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
if datapath_id:
agent = self.agent_map[datapath_id]
agent.forward_packet_in(ofp_packet_in)
def forward_change_event(self, device_id, event):
datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
if datapath_id:
agent = self.agent_map[datapath_id]
agent.forward_change_event(event)