blob: cb59d249ea7d83c63e21e476a7ff44bd61b9e36b [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
import structlog
import os.path
from twisted.internet import protocol
from twisted.internet import reactor
from twisted.internet import reactor, ssl
from twisted.internet import reactor
from twisted.internet.defer import Deferred, inlineCallbacks
import loxi.of13 as of13
from common.utils.asleep import asleep
from of_connection import OpenFlowConnection
from of_protocol_handler import OpenFlowProtocolHandler
# from ofagent.protos.openflow_13_pb2 import ChangeEvent
log = structlog.get_logger()
class Agent(protocol.ClientFactory):
generation_is_defined = False
cached_generation_id = None
def __init__(self,
controller_endpoint,
datapath_id,
device_id,
rpc_stub,
enable_tls=False,
key_file=None,
cert_file=None,
conn_retry_interval=1):
self.controller_endpoint = controller_endpoint
self.datapath_id = datapath_id
self.device_id = device_id
self.rpc_stub = rpc_stub
self.enable_tls = enable_tls
self.key_file = key_file
self.cert_file = cert_file
self.retry_interval = conn_retry_interval
self.running = False
self.connector = None # will be a Connector instance once connected
self.d_disconnected = None # a deferred to signal reconnect loop when
# TCP connection is lost
self.connected = False
self.exiting = False
self.proto_handler = None
def get_device_id(self):
return self.device_id
def start(self):
log.debug('starting')
if self.running:
return
self.running = True
reactor.callLater(0, self.keep_connected)
log.info('started')
return self
def stop(self):
log.debug('stopping')
self.connected = False
self.exiting = True
self.connector.disconnect()
log.info('stopped')
def resolve_endpoint(self, endpoint):
# enable optional resolution via consul;
# see https://jira.opencord.org/browse/CORD-820
host, port = endpoint.split(':', 2)
return host, int(port)
@inlineCallbacks
def keep_connected(self):
"""Keep reconnecting to the controller"""
while not self.exiting:
host, port = self.resolve_endpoint(self.controller_endpoint)
log.info('connecting', host=host, port=port)
if self.enable_tls:
try:
# Check that key_file and cert_file is provided and
# the files exist
if self.key_file is None or \
self.cert_file is None or \
not os.path.isfile(self.key_file) or \
not os.path.isfile(self.cert_file):
raise Exception('key_file "{}" or cert_file "{}"'
' is not found'.
format(self.key_file, self.cert_file))
with open(self.key_file) as keyFile:
with open(self.cert_file) as certFile:
clientCert = ssl.PrivateCertificate.loadPEM(
keyFile.read() + certFile.read())
ctx = clientCert.options()
self.connector = reactor.connectSSL(host, port, self, ctx)
log.info('tls-enabled')
except Exception as e:
log.exception('failed-to-connect', reason=e)
else:
self.connector = reactor.connectTCP(host, port, self)
log.info('tls-disabled')
self.d_disconnected = Deferred()
yield self.d_disconnected
log.debug('reconnect', after_delay=self.retry_interval)
yield asleep(self.retry_interval)
def enter_disconnected(self, event, reason):
"""Internally signal entering disconnected state"""
self.connected = False
if not self.exiting:
log.error(event, reason=reason)
self.d_disconnected.callback(None)
def enter_connected(self):
"""Handle transitioning from disconnected to connected state"""
log.info('connected')
self.connected = True
self.read_buffer = None
reactor.callLater(0, self.proto_handler.start)
# protocol.ClientFactory methods
def protocol(self):
cxn = OpenFlowConnection(self) # Low level message handler
self.proto_handler = OpenFlowProtocolHandler(
self.datapath_id, self.device_id, self, cxn, self.rpc_stub)
return cxn
def clientConnectionFailed(self, connector, reason):
self.enter_disconnected('connection-failed', reason)
def clientConnectionLost(self, connector, reason):
if not self.exiting:
log.error('client-connection-lost',
reason=reason, connector=connector)
def forward_packet_in(self, ofp_packet_in):
if self.proto_handler is not None:
self.proto_handler.forward_packet_in(ofp_packet_in)
def forward_change_event(self, event):
# assert isinstance(event, ChangeEvent)
log.info('got-change-event', change_event=event)
if event.HasField("port_status"):
if self.proto_handler is not None:
self.proto_handler.forward_port_status(event.port_status)
else:
log.error('unknown-change-event', change_event=event)
if __name__ == '__main__':
"""Run this to test the agent for N concurrent sessions:
python agent [<number-of-desired-instances>]
"""
n = 1 if len(sys.argv) < 2 else int(sys.argv[1])
from utils import mac_str_to_tuple
class MockRpc(object):
@staticmethod
def get_port_list(_):
ports = []
cap = of13.OFPPF_1GB_FD | of13.OFPPF_FIBER
for pno, mac, nam, cur, adv, sup, spe in (
(1, '00:00:00:00:00:01', 'onu1', cap, cap, cap,
of13.OFPPF_1GB_FD),
(2, '00:00:00:00:00:02', 'onu2', cap, cap, cap,
of13.OFPPF_1GB_FD),
(129, '00:00:00:00:00:81', 'olt', cap, cap, cap,
of13.OFPPF_1GB_FD)
):
port = of13.common.port_desc(pno, mac_str_to_tuple(mac), nam,
curr=cur, advertised=adv,
supported=sup,
curr_speed=spe, max_speed=spe)
ports.append(port)
return ports
stub = MockRpc()
agents = [Agent('localhost:6653', 256 + i, stub).start() for i in range(n)]
def shutdown():
[a.stop() for a in agents]
reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
reactor.run()