blob: e4d415ddec6a2b99cfad28f3e904d62c27b79922 [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
import structlog
import os.path
from twisted.internet import protocol, reactor, ssl
from twisted.internet.defer import Deferred, inlineCallbacks
import loxi.of13 as of13
from pyvoltha.common.utils.asleep import asleep
from of_connection import OpenFlowConnection
from of_protocol_handler import OpenFlowProtocolHandler
log = structlog.get_logger()
class Agent(protocol.ClientFactory):
generation_is_defined = False
cached_generation_id = None
def __init__(self,
connection_manager,
controller_endpoint,
datapath_id,
device_id,
enable_tls=False,
key_file=None,
cert_file=None,
conn_retry_interval=1):
self.controller_endpoint = controller_endpoint
self.datapath_id = datapath_id
self.device_id = device_id
self.connection_manager = connection_manager
self.enable_tls = enable_tls
self.key_file = key_file
self.cert_file = cert_file
self.retry_interval = conn_retry_interval
self.running = False
self.connector = None # will be a Connector instance once connected
self.d_disconnected = None # a deferred to signal reconnect loop when
# TCP connection is lost
self.connected = False
self.exiting = False
self.proto_handler = None
def get_device_id(self):
return self.device_id
def start(self):
log.debug('starting')
if self.running:
return
self.running = True
reactor.callLater(0, self.keep_connected)
log.info('started')
return self
def stop(self):
log.debug('stopping')
self.connected = False
self.exiting = True
self.connector.disconnect()
log.info('stopped')
def resolve_endpoint(self, endpoint):
# enable optional resolution via consul;
# see https://jira.opencord.org/browse/CORD-820
host, port = endpoint.split(':', 2)
return host, int(port)
@inlineCallbacks
def keep_connected(self):
"""Keep reconnecting to the controller"""
while not self.exiting:
host, port = self.resolve_endpoint(self.controller_endpoint)
log.info('connecting', host=host, port=port)
if self.enable_tls:
try:
# Check that key_file and cert_file is provided and
# the files exist
if self.key_file is None or \
self.cert_file is None or \
not os.path.isfile(self.key_file) or \
not os.path.isfile(self.cert_file):
raise Exception('key_file "{}" or cert_file "{}"'
' is not found'.
format(self.key_file, self.cert_file))
with open(self.key_file) as keyFile:
with open(self.cert_file) as certFile:
clientCert = ssl.PrivateCertificate.loadPEM(
keyFile.read() + certFile.read())
ctx = clientCert.options()
self.connector = reactor.connectSSL(host, port, self, ctx)
log.info('tls-enabled')
except Exception as e:
log.exception('failed-to-connect', reason=e)
else:
self.connector = reactor.connectTCP(host, port, self)
log.info('tls-disabled')
self.d_disconnected = Deferred()
yield self.d_disconnected
log.debug('reconnect', after_delay=self.retry_interval)
yield asleep(self.retry_interval)
def enter_disconnected(self, event, reason):
"""Internally signal entering disconnected state"""
self.connected = False
if not self.exiting:
log.error(event, reason=reason)
self.d_disconnected.callback(None)
def enter_connected(self):
"""Handle transitioning from disconnected to connected state"""
log.info('connected')
self.connected = True
self.read_buffer = None
reactor.callLater(0, self.proto_handler.start)
# protocol.ClientFactory methods
def protocol(self):
cxn = OpenFlowConnection(self) # Low level message handler
self.proto_handler = OpenFlowProtocolHandler(
self.datapath_id, self.device_id, self, cxn)
return cxn
def clientConnectionFailed(self, connector, reason):
self.enter_disconnected('connection-failed', reason)
def clientConnectionLost(self, connector, reason):
if not self.exiting:
log.error('client-connection-lost',
reason=reason, connector=connector)
def forward_packet_in(self, ofp_packet_in):
if self.proto_handler is not None:
self.proto_handler.forward_packet_in(ofp_packet_in)
def forward_change_event(self, event):
# assert isinstance(event, ChangeEvent)
log.info('got-change-event', change_event=event)
if event.HasField("port_status"):
if self.proto_handler is not None:
self.proto_handler.forward_port_status(event.port_status)
else:
log.error('unknown-change-event', change_event=event)
if __name__ == '__main__':
"""Run this to test the agent for N concurrent sessions:
python agent [<number-of-desired-instances>]
"""
n = 1 if len(sys.argv) < 2 else int(sys.argv[1])
from utils import mac_str_to_tuple
class MockRpc(object):
@staticmethod
def get_port_list(_):
ports = []
cap = of13.OFPPF_1GB_FD | of13.OFPPF_FIBER
for pno, mac, nam, cur, adv, sup, spe in (
(1, '00:00:00:00:00:01', 'onu1', cap, cap, cap,
of13.OFPPF_1GB_FD),
(2, '00:00:00:00:00:02', 'onu2', cap, cap, cap,
of13.OFPPF_1GB_FD),
(129, '00:00:00:00:00:81', 'olt', cap, cap, cap,
of13.OFPPF_1GB_FD)
):
port = of13.common.port_desc(pno, mac_str_to_tuple(mac), nam,
curr=cur, advertised=adv,
supported=sup,
curr_speed=spe, max_speed=spe)
ports.append(port)
return ports
stub = MockRpc()
agents = [Agent('localhost:6653', 256 + i, stub).start() for i in range(n)]
def shutdown():
[a.stop() for a in agents]
reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
reactor.run()