Dockerizing ofagent
Change-Id: Ic2ead67cedd74463a72efca8c3f5d74ea2433af8
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
new file mode 100644
index 0000000..9a17121
--- /dev/null
+++ b/ofagent/connection_mgr.py
@@ -0,0 +1,231 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
+
+from common.utils.asleep import asleep
+from common.utils.consulhelpers import get_endpoint_from_consul
+from structlog import get_logger
+import grpc
+from protos import voltha_pb2
+from grpc_client import GrpcClient
+
+from agent import Agent
+
+class ConnectionManager(object):
+
+ log = get_logger()
+
+ def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
+ voltha_retry_interval=0.5, devices_refresh_interval=60):
+
+ self.log.info('Initializing connection manager')
+ self.controller_endpoint = controller_endpoint
+ self.consul_endpoint = consul_endpoint
+ self.voltha_endpoint = voltha_endpoint
+
+ self.channel = None
+ self.connected_devices = None
+ self.unprocessed_devices = None
+ self.agent_map = {}
+ self.grpc_client = None
+ self.device_id_map = None
+
+ self.voltha_retry_interval = voltha_retry_interval
+ self.devices_refresh_interval = devices_refresh_interval
+
+ self.running = False
+
+ @inlineCallbacks
+ def run(self):
+ if self.running:
+ return
+
+ self.log.info('Running connection manager')
+
+ self.running = True
+
+ # Get voltha grpc endpoint
+ self.channel = self.get_grpc_channel_with_voltha()
+
+ # Connect to voltha using grpc and fetch the list of logical devices
+ yield self.get_list_of_logical_devices_from_voltha()
+
+ # Create shared gRPC API object
+ self.grpc_client = GrpcClient(self.channel, self.device_id_map)
+
+ # Instantiate an OpenFlow agent for each logical device
+ self.refresh_openflow_agent_connections()
+
+ reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
+ reactor.callLater(0, self.monitor_connections)
+
+ returnValue(self)
+
+
+ def shutdown(self):
+ # clean up all controller connections
+ for key, value in enumerate(self.agent_map):
+ value.stop()
+ self.running = False
+ # TODO: close grpc connection to voltha
+
+ def resolve_endpoint(self, endpoint):
+ ip_port_endpoint = endpoint
+ if endpoint.startswith('@'):
+ try:
+ ip_port_endpoint = get_endpoint_from_consul(
+ self.consul_endpoint, endpoint[1:])
+ self.log.info(
+ 'Found endpoint {} service at {}'.format(endpoint,
+ ip_port_endpoint))
+ except Exception as e:
+ self.log.error('Failure to locate {} service from '
+ 'consul {}:'.format(endpoint, repr(e)))
+ return
+ if ip_port_endpoint:
+ host, port = ip_port_endpoint.split(':', 2)
+ return host, int(port)
+
+ def get_grpc_channel_with_voltha(self):
+ self.log.info('Resolving voltha endpoint {} from consul'.format(
+ self.voltha_endpoint))
+ host, port = self.resolve_endpoint(self.voltha_endpoint)
+ assert host is not None
+ assert port is not None
+ # Create grpc channel to Voltha
+ channel = grpc.insecure_channel('{}:{}'.format(host, port))
+ self.log.info('Acquired a grpc channel to voltha')
+ return channel
+
+
+ @inlineCallbacks
+ def get_list_of_logical_devices_from_voltha(self):
+ while True:
+ self.log.info('Retrieve devices from voltha')
+ try:
+ stub = voltha_pb2.VolthaLogicalLayerStub(self.channel)
+ devices = stub.ListLogicalDevices(
+ voltha_pb2.NullMessage()).items
+ for device in devices:
+ self.log.info("Devices {} -> {}".format(device.id,
+ device.datapath_id))
+ self.unprocessed_devices = devices
+ self.device_id_map = dict(
+ (device.datapath_id, device.id) for device in devices)
+ return
+ except Exception as e:
+ self.log.error('Failure to retrieve devices from '
+ 'voltha: {}'.format(repr(e)))
+
+ self.log.info('reconnect', after_delay=self.voltha_retry_interval)
+ yield asleep(self.voltha_retry_interval)
+
+
+ def refresh_openflow_agent_connections(self):
+ # Compare the new device list again the previous
+ # For any new device, an agent connection will be created. For
+ # existing device that are no longer part of the list then that
+ # agent connection will be stopped
+
+ # If the ofagent has no previous devices then just add them
+ if self.connected_devices is None:
+ datapath_ids_to_add = [device.datapath_id for device in self.unprocessed_devices]
+ else:
+ previous_datapath_ids = [device.datapath_id for device in self.connected_devices]
+ current_datapath_ids = [device.datapath_id for device in self.unprocessed_devices]
+ datapath_ids_to_add = [d for d in current_datapath_ids if
+ d not in previous_datapath_ids]
+ datapath_ids_to_remove = [d for d in previous_datapath_ids if
+ d not in current_datapath_ids]
+
+ # Check for no change
+ if not datapath_ids_to_add and not datapath_ids_to_remove:
+ self.log.info('No new devices found. No OF agent update '
+ 'required')
+ return
+
+ self.log.info('Updating OF agent connections.')
+ print self.agent_map
+
+ # Stop previous agents
+ for datapath_id in datapath_ids_to_remove:
+ if self.agent_map.has_key(datapath_id):
+ self.agent_map[datapath_id].stop()
+ del self.agent_map[datapath_id]
+ self.log.info('Removed OF agent with datapath id {'
+ '}'.format(datapath_id))
+
+ # Add the new agents
+ for datapath_id in datapath_ids_to_add:
+ self.agent_map[datapath_id] = Agent(self.controller_endpoint,
+ datapath_id,
+ self.grpc_client)
+ self.agent_map[datapath_id].run()
+ self.log.info('Launched OF agent with datapath id {}'.format(
+ datapath_id))
+
+ # replace the old device list with the new ones
+ self.connected_devices = self.unprocessed_devices
+ self.unprocessed_devices = None
+
+ @inlineCallbacks
+ def monitor_connections(self):
+ while True:
+ # sleep first
+ yield asleep(self.devices_refresh_interval)
+ self.log.info('Monitor connections')
+ yield self.get_list_of_logical_devices_from_voltha()
+ self.refresh_openflow_agent_connections()
+
+# class Model(object):
+# def __init__(self, id, path):
+# self.id=id
+# self.datapath_id=path,
+
+
+# if __name__ == '__main__':
+# conn = ConnectionManager("10.0.2.15:3181", "localhost:50555",
+# "10.100.198.150:6633")
+#
+# conn.connected_devices = None
+# model1 = Model('12311', 'wdadsa1')
+# model2 = Model('12312', 'wdadsa2')
+# model3 = Model('12313', 'wdadsa3')
+# model4 = Model('12314', 'wdadsa4')
+#
+# conn.unprocessed_devices = [model1, model2, model3]
+#
+# conn.refresh_openflow_agent_connections()
+#
+#
+# # val = [device.datapath_id for device in conn.connected_devices]
+# # print val
+# #
+# # for (id,n) in enumerate(val):
+# # print n
+#
+#
+# conn.unprocessed_devices = [model1, model2, model3]
+#
+# conn.refresh_openflow_agent_connections()
+#
+# conn.unprocessed_devices = [model1, model2, model4]
+#
+# conn.refresh_openflow_agent_connections()
\ No newline at end of file