| # |
| # Copyright 2016 the original author or authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import sys |
| |
| from twisted.internet import reactor |
| from twisted.internet.defer import Deferred, inlineCallbacks, returnValue |
| |
| from common.utils.asleep import asleep |
| from common.utils.consulhelpers import get_endpoint_from_consul |
| from structlog import get_logger |
| import grpc |
| from ofagent.protos import third_party |
| from protos import voltha_pb2 |
| from grpc_client import GrpcClient |
| |
| from agent import Agent |
| |
| log = get_logger() |
| _ = third_party |
| |
| class ConnectionManager(object): |
| |
| def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint, |
| voltha_retry_interval=0.5, devices_refresh_interval=5): |
| |
| log.info('init-connection-manager') |
| self.controller_endpoint = controller_endpoint |
| self.consul_endpoint = consul_endpoint |
| self.voltha_endpoint = voltha_endpoint |
| |
| self.channel = None |
| self.grpc_client = None # single, shared gRPC client to Voltha |
| |
| self.agent_map = {} # datapath_id -> Agent() |
| self.device_id_to_datapath_id_map = {} |
| |
| self.voltha_retry_interval = voltha_retry_interval |
| self.devices_refresh_interval = devices_refresh_interval |
| |
| self.running = False |
| |
| def start(self): |
| |
| if self.running: |
| return |
| |
| log.debug('starting') |
| |
| self.running = True |
| |
| # Get voltha grpc endpoint |
| self.channel = self.get_grpc_channel_with_voltha() |
| |
| # Create shared gRPC API object |
| self.grpc_client = GrpcClient(self, self.channel).start() |
| |
| # Start monitoring logical devices and manage agents accordingly |
| reactor.callLater(0, self.monitor_logical_devices) |
| |
| log.info('started') |
| |
| return self |
| |
| def stop(self): |
| log.debug('stopping') |
| # clean up all controller connections |
| for agent in self.agent_map.itervalues(): |
| agent.stop() |
| self.running = False |
| self.grpc_client.stop() |
| del self.channel |
| log.info('stopped') |
| |
| def resolve_endpoint(self, endpoint): |
| ip_port_endpoint = endpoint |
| if endpoint.startswith('@'): |
| try: |
| ip_port_endpoint = get_endpoint_from_consul( |
| self.consul_endpoint, endpoint[1:]) |
| log.info( |
| 'Found endpoint {} service at {}'.format(endpoint, |
| ip_port_endpoint)) |
| except Exception as e: |
| log.error('Failure to locate {} service from ' |
| 'consul {}:'.format(endpoint, repr(e))) |
| return |
| if ip_port_endpoint: |
| host, port = ip_port_endpoint.split(':', 2) |
| return host, int(port) |
| |
| def get_grpc_channel_with_voltha(self): |
| log.info('Resolving voltha endpoint {} from consul'.format( |
| self.voltha_endpoint)) |
| host, port = self.resolve_endpoint(self.voltha_endpoint) |
| assert host is not None |
| assert port is not None |
| # Create grpc channel to Voltha |
| channel = grpc.insecure_channel('{}:{}'.format(host, port)) |
| log.info('Acquired a grpc channel to voltha') |
| return channel |
| |
| |
| @inlineCallbacks |
| def get_list_of_logical_devices_from_voltha(self): |
| |
| while True: |
| log.info('Retrieve devices from voltha') |
| try: |
| stub = voltha_pb2.VolthaLogicalLayerStub(self.channel) |
| devices = stub.ListLogicalDevices( |
| voltha_pb2.NullMessage()).items |
| for device in devices: |
| log.info("Devices {} -> {}".format(device.id, |
| device.datapath_id)) |
| returnValue(devices) |
| |
| except Exception as e: |
| log.error('Failure to retrieve devices from ' |
| 'voltha: {}'.format(repr(e))) |
| |
| log.info('reconnect', after_delay=self.voltha_retry_interval) |
| yield asleep(self.voltha_retry_interval) |
| |
| |
| def refresh_agent_connections(self, devices): |
| """ |
| Based on the new device list, update the following state in the class: |
| * agent_map |
| * datapath_map |
| * device_id_map |
| :param devices: full device list freshly received from Voltha |
| :return: None |
| """ |
| |
| # Use datapath ids for deciding what's new and what's obsolete |
| desired_datapath_ids = set(d.datapath_id for d in devices) |
| current_datapath_ids = set(self.agent_map.iterkeys()) |
| |
| # if identical, nothing to do |
| if desired_datapath_ids == current_datapath_ids: |
| return |
| |
| # ... otherwise calculate differences |
| to_add = desired_datapath_ids.difference(current_datapath_ids) |
| to_del = current_datapath_ids.difference(desired_datapath_ids) |
| |
| # remove what we don't need |
| for datapath_id in to_del: |
| self.delete_agent(datapath_id) |
| |
| # start new agents as needed |
| for device in devices: |
| if device.datapath_id in to_add: |
| self.create_agent(device) |
| |
| log.debug('updated-agent-list', count=len(self.agent_map)) |
| log.debug('updated-device-id-to-datapath-id-map', |
| map=str(self.device_id_to_datapath_id_map)) |
| |
| def create_agent(self, device): |
| datapath_id = device.datapath_id |
| device_id = device.id |
| agent = Agent(self.controller_endpoint, datapath_id, |
| device_id, self.grpc_client) |
| agent.start() |
| self.agent_map[datapath_id] = agent |
| self.device_id_to_datapath_id_map[device_id] = datapath_id |
| |
| def delete_agent(self, datapath_id): |
| agent = self.agent_map[datapath_id] |
| device_id = agent.get_device_id() |
| agent.stop() |
| del self.agent_map[datapath_id] |
| del self.device_id_to_datapath_id_map[device_id] |
| |
| @inlineCallbacks |
| def monitor_logical_devices(self): |
| while True: |
| # TODO @khen We should switch to a polling mode based on a |
| # streaming gRPC method |
| |
| # get current list from Voltha |
| devices = yield self.get_list_of_logical_devices_from_voltha() |
| |
| # update agent list and mapping tables as needed |
| self.refresh_agent_connections(devices) |
| |
| # wait before next poll |
| yield asleep(self.devices_refresh_interval) |
| log.info('Monitor connections') |
| |
| def forward_packet_in(self, device_id, ofp_packet_in): |
| datapath_id = self.device_id_to_datapath_id_map.get(device_id, None) |
| if datapath_id: |
| agent = self.agent_map[datapath_id] |
| agent.forward_packet_in(ofp_packet_in) |