This commit consists of the following:
1) The kafka messaging proxy in Twisted python for adapters
2) Initial implementation and containerization of ponsim OLT adapter
and ponsim ONU adapter
3) Initial submission of request and response facade in both Twisted
python and Go Language
4) Initial implementation of device management and logical device management
in the Core
5) Update to the log module to allow dynamic setting of log level per
package using the gRPC API
6) Bug fixes and minor changes
Change-Id: Ia8f033da84cfd08275335bae9542802415e7bb0f
diff --git a/adapters/ponsim_onu/VERSION b/adapters/ponsim_onu/VERSION
new file mode 100644
index 0000000..c0ab82c
--- /dev/null
+++ b/adapters/ponsim_onu/VERSION
@@ -0,0 +1 @@
+0.0.1-dev
diff --git a/adapters/ponsim_onu/__init__.py b/adapters/ponsim_onu/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/adapters/ponsim_onu/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/adapters/ponsim_onu/main.py b/adapters/ponsim_onu/main.py
new file mode 100755
index 0000000..63e2bc4
--- /dev/null
+++ b/adapters/ponsim_onu/main.py
@@ -0,0 +1,485 @@
+#!/usr/bin/env python
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Ponsim ONU Adapter main entry point"""
+
+import argparse
+import arrow
+import os
+import time
+
+import yaml
+from simplejson import dumps
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.task import LoopingCall
+from zope.interface import implementer
+from adapters.protos import third_party
+from adapters.common.structlog_setup import setup_logging, update_logging
+from adapters.common.utils.dockerhelpers import get_my_containers_name
+from adapters.common.utils.nethelpers import get_my_primary_local_ipv4, \
+ get_my_primary_interface
+from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from adapters.common.utils.registry import registry, IComponent
+from packaging.version import Version
+from adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, get_messaging_proxy
+from adapters.ponsim_onu.ponsim_onu import PonSimOnuAdapter
+from adapters.protos.adapter_pb2 import AdapterConfig, Adapter
+from adapters.kafka.adapter_request_facade import AdapterRequestFacade
+from adapters.kafka.core_proxy import CoreProxy
+from adapters.common.utils.deferred_utils import TimeOutError
+from adapters.common.utils.asleep import asleep
+
+_ = third_party
+
+defs = dict(
+ version_file='./VERSION',
+ config=os.environ.get('CONFIG', './ponsim_onu.yml'),
+ container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
+ '0-9]+)\..*$'),
+ consul=os.environ.get('CONSUL', 'localhost:8500'),
+ name=os.environ.get('NAME', 'ponsim_onu'),
+ vendor=os.environ.get('VENDOR', 'Voltha Project'),
+ device_type=os.environ.get('DEVICE_TYPE', 'ponsim_onu'),
+ accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
+ accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
+ etcd=os.environ.get('ETCD', 'localhost:2379'),
+ core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
+ interface=os.environ.get('INTERFACE', get_my_primary_interface()),
+ instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
+ kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
+ kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
+ backend=os.environ.get('BACKEND', 'none'),
+ retry_interval=os.environ.get('RETRY_INTERVAL', 2),
+ heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
+)
+
+
+def parse_args():
+ parser = argparse.ArgumentParser()
+
+ _help = ('Path to ponsim_onu.yml config file (default: %s). '
+ 'If relative, it is relative to main.py of ponsim adapter.'
+ % defs['config'])
+ parser.add_argument('-c', '--config',
+ dest='config',
+ action='store',
+ default=defs['config'],
+ help=_help)
+
+ _help = 'Regular expression for extracting conatiner number from ' \
+ 'container name (default: %s)' % defs['container_name_regex']
+ parser.add_argument('-X', '--container-number-extractor',
+ dest='container_name_regex',
+ action='store',
+ default=defs['container_name_regex'],
+ help=_help)
+
+ _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
+ parser.add_argument('-C', '--consul',
+ dest='consul',
+ action='store',
+ default=defs['consul'],
+ help=_help)
+
+ _help = 'name of this adapter (default: %s)' % defs['name']
+ parser.add_argument('-na', '--name',
+ dest='name',
+ action='store',
+ default=defs['name'],
+ help=_help)
+
+ _help = 'vendor of this adapter (default: %s)' % defs['vendor']
+ parser.add_argument('-ven', '--vendor',
+ dest='vendor',
+ action='store',
+ default=defs['vendor'],
+ help=_help)
+
+ _help = 'supported device type of this adapter (default: %s)' % defs[
+ 'device_type']
+ parser.add_argument('-dt', '--device_type',
+ dest='device_type',
+ action='store',
+ default=defs['device_type'],
+ help=_help)
+
+ _help = 'specifies whether the device type accepts bulk flow updates ' \
+ 'adapter (default: %s)' % defs['accept_bulk_flow']
+ parser.add_argument('-abf', '--accept_bulk_flow',
+ dest='accept_bulk_flow',
+ action='store',
+ default=defs['accept_bulk_flow'],
+ help=_help)
+
+ _help = 'specifies whether the device type accepts add/remove flow ' \
+ '(default: %s)' % defs['accept_atomic_flow']
+ parser.add_argument('-aaf', '--accept_atomic_flow',
+ dest='accept_atomic_flow',
+ action='store',
+ default=defs['accept_atomic_flow'],
+ help=_help)
+
+ _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
+ parser.add_argument('-e', '--etcd',
+ dest='etcd',
+ action='store',
+ default=defs['etcd'],
+ help=_help)
+
+ _help = ('unique string id of this container instance (default: %s)'
+ % defs['instance_id'])
+ parser.add_argument('-i', '--instance-id',
+ dest='instance_id',
+ action='store',
+ default=defs['instance_id'],
+ help=_help)
+
+ _help = 'ETH interface to recieve (default: %s)' % defs['interface']
+ parser.add_argument('-I', '--interface',
+ dest='interface',
+ action='store',
+ default=defs['interface'],
+ help=_help)
+
+ _help = 'omit startup banner log lines'
+ parser.add_argument('-n', '--no-banner',
+ dest='no_banner',
+ action='store_true',
+ default=False,
+ help=_help)
+
+ _help = 'do not emit periodic heartbeat log messages'
+ parser.add_argument('-N', '--no-heartbeat',
+ dest='no_heartbeat',
+ action='store_true',
+ default=False,
+ help=_help)
+
+ _help = "suppress debug and info logs"
+ parser.add_argument('-q', '--quiet',
+ dest='quiet',
+ action='count',
+ help=_help)
+
+ _help = 'enable verbose logging'
+ parser.add_argument('-v', '--verbose',
+ dest='verbose',
+ action='count',
+ help=_help)
+
+ _help = ('use docker container name as conatiner instance id'
+ ' (overrides -i/--instance-id option)')
+ parser.add_argument('--instance-id-is-container-name',
+ dest='instance_id_is_container_name',
+ action='store_true',
+ default=False,
+ help=_help)
+
+ _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
+ 'If not '
+ 'specified (None), the address from the config file is used'
+ % defs['kafka_adapter'])
+ parser.add_argument('-KA', '--kafka_adapter',
+ dest='kafka_adapter',
+ action='store',
+ default=defs['kafka_adapter'],
+ help=_help)
+
+ _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
+ 'If not '
+ 'specified (None), the address from the config file is used'
+ % defs['kafka_cluster'])
+ parser.add_argument('-KC', '--kafka_cluster',
+ dest='kafka_cluster',
+ action='store',
+ default=defs['kafka_cluster'],
+ help=_help)
+
+ _help = 'backend to use for config persitence'
+ parser.add_argument('-b', '--backend',
+ default=defs['backend'],
+ choices=['none', 'consul', 'etcd'],
+ help=_help)
+
+ _help = 'topic of core on the kafka bus'
+ parser.add_argument('-ct', '--core_topic',
+ dest='core_topic',
+ action='store',
+ default=defs['core_topic'],
+ help=_help)
+
+ args = parser.parse_args()
+
+ # post-processing
+
+ if args.instance_id_is_container_name:
+ args.instance_id = get_my_containers_name()
+
+ return args
+
+
+def load_config(args):
+ path = args.config
+ if path.startswith('.'):
+ dir = os.path.dirname(os.path.abspath(__file__))
+ path = os.path.join(dir, path)
+ path = os.path.abspath(path)
+ with open(path) as fd:
+ config = yaml.load(fd)
+ return config
+
+
+def print_banner(log):
+ log.info(' ____ _ ___ _ _ _ _ ')
+ log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| \ | | | | | ')
+ log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | \| | | | | ')
+ log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |\ | |_| | ')
+ log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_| \_|\___/ ')
+ log.info(' _ _ _ ')
+ log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
+ log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
+ log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
+ log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
+ log.info(' |_| ')
+ log.info('(to stop: press Ctrl-C)')
+
+
+@implementer(IComponent)
+class Main(object):
+
+ def __init__(self):
+
+ self.args = args = parse_args()
+ self.config = load_config(args)
+
+ verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
+ self.log = setup_logging(self.config.get('logging', {}),
+ args.instance_id,
+ verbosity_adjust=verbosity_adjust)
+ self.log.info('container-number-extractor',
+ regex=args.container_name_regex)
+
+ self.ponsim_olt_adapter_version = self.get_version()
+ self.log.info('Ponsim-ONU-Adapter-Version', version=
+ self.ponsim_olt_adapter_version)
+
+ if not args.no_banner:
+ print_banner(self.log)
+
+ # Create a unique instance id using the passed-in instance id and
+ # UTC timestamp
+ current_time = arrow.utcnow().timestamp
+ self.instance_id = self.args.instance_id + '_' + str(current_time)
+
+ self.core_topic = args.core_topic
+ self.listening_topic = args.name
+ self.startup_components()
+
+ if not args.no_heartbeat:
+ self.start_heartbeat()
+ self.start_kafka_cluster_heartbeat(self.instance_id)
+
+ def get_version(self):
+ path = defs['version_file']
+ if not path.startswith('/'):
+ dir = os.path.dirname(os.path.abspath(__file__))
+ path = os.path.join(dir, path)
+
+ path = os.path.abspath(path)
+ version_file = open(path, 'r')
+ v = version_file.read()
+
+ # Use Version to validate the version string - exception will be raised
+ # if the version is invalid
+ Version(v)
+
+ version_file.close()
+ return v
+
+ def start(self):
+ self.start_reactor() # will not return except Keyboard interrupt
+
+ def stop(self):
+ pass
+
+ def get_args(self):
+ """Allow access to command line args"""
+ return self.args
+
+ def get_config(self):
+ """Allow access to content of config file"""
+ return self.config
+
+ def _get_adapter_config(self):
+ cfg = AdapterConfig()
+ return cfg
+
+ @inlineCallbacks
+ def startup_components(self):
+ try:
+ self.log.info('starting-internal-components',
+ consul=self.args.consul,
+ etcd=self.args.etcd)
+
+ registry.register('main', self)
+
+ # Update the logger to output the vcore id.
+ self.log = update_logging(instance_id=self.instance_id,
+ vcore_id=None)
+
+ yield registry.register(
+ 'kafka_cluster_proxy',
+ KafkaProxy(
+ self.args.consul,
+ self.args.kafka_cluster,
+ config=self.config.get('kafka-cluster-proxy', {})
+ )
+ ).start()
+
+ config = self._get_adapter_config()
+
+ self.core_proxy = CoreProxy(
+ kafka_proxy=None,
+ core_topic=self.core_topic,
+ my_listening_topic=self.listening_topic)
+
+ ponsim_onu_adapter = PonSimOnuAdapter(
+ adapter_agent=self.core_proxy, config=config)
+ ponsim_request_handler = AdapterRequestFacade(
+ adapter=ponsim_onu_adapter)
+
+ yield registry.register(
+ 'kafka_adapter_proxy',
+ IKafkaMessagingProxy(
+ kafka_host_port=self.args.kafka_adapter,
+ # TODO: Add KV Store object reference
+ kv_store=self.args.backend,
+ default_topic=self.args.name,
+ target_cls=ponsim_request_handler
+ )
+ ).start()
+
+ self.core_proxy.kafka_proxy = get_messaging_proxy()
+
+ # retry for ever
+ res = yield self._register_with_core(-1)
+
+ self.log.info('started-internal-services')
+
+ except Exception as e:
+ self.log.exception('Failure-to-start-all-components', e=e)
+
+ @inlineCallbacks
+ def shutdown_components(self):
+ """Execute before the reactor is shut down"""
+ self.log.info('exiting-on-keyboard-interrupt')
+ for component in reversed(registry.iterate()):
+ yield component.stop()
+
+ import threading
+ self.log.info('THREADS:')
+ main_thread = threading.current_thread()
+ for t in threading.enumerate():
+ if t is main_thread:
+ continue
+ if not t.isDaemon():
+ continue
+ self.log.info('joining thread {} {}'.format(
+ t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
+ t.join()
+
+ def start_reactor(self):
+ from twisted.internet import reactor
+ reactor.callWhenRunning(
+ lambda: self.log.info('twisted-reactor-started'))
+ reactor.addSystemEventTrigger('before', 'shutdown',
+ self.shutdown_components)
+ reactor.run()
+
+ @inlineCallbacks
+ def _register_with_core(self, retries):
+ # Send registration to Core with adapter specs
+ adapter = Adapter()
+ adapter.id = self.args.name
+ adapter.vendor = self.args.name
+ adapter.version = self.ponsim_olt_adapter_version
+ while 1:
+ try:
+ resp = yield self.core_proxy.register(adapter)
+ self.log.info('registration-response', response=resp)
+ returnValue(resp)
+ except TimeOutError as e:
+ self.log.warn("timeout-when-registering-with-core", e=e)
+ if retries == 0:
+ self.log.exception("no-more-retries", e=e)
+ raise
+ else:
+ retries = retries if retries < 0 else retries - 1
+ yield asleep(defs['retry_interval'])
+ except Exception as e:
+ self.log.exception("failed-registration", e=e)
+ raise
+
+ def start_heartbeat(self):
+
+ t0 = time.time()
+ t0s = time.ctime(t0)
+
+ def heartbeat():
+ self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
+
+ lc = LoopingCall(heartbeat)
+ lc.start(10)
+
+ # Temporary function to send a heartbeat message to the external kafka
+ # broker
+ def start_kafka_cluster_heartbeat(self, instance_id):
+ # For heartbeat we will send a message to a specific "voltha-heartbeat"
+ # topic. The message is a protocol buf
+ # message
+ message = dict(
+ type='heartbeat',
+ adapter=self.args.name,
+ instance=instance_id,
+ ip=get_my_primary_local_ipv4()
+ )
+ topic = defs['heartbeat_topic']
+
+ def send_msg(start_time):
+ try:
+ kafka_cluster_proxy = get_kafka_proxy()
+ if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
+ # self.log.debug('kafka-proxy-available')
+ message['ts'] = arrow.utcnow().timestamp
+ message['uptime'] = time.time() - start_time
+ # self.log.debug('start-kafka-heartbeat')
+ kafka_cluster_proxy.send_message(topic, dumps(message))
+ else:
+ self.log.error('kafka-proxy-unavailable')
+ except Exception, e:
+ self.log.exception('failed-sending-message-heartbeat', e=e)
+
+ try:
+ t0 = time.time()
+ lc = LoopingCall(send_msg, t0)
+ lc.start(10)
+ except Exception, e:
+ self.log.exception('failed-kafka-heartbeat', e=e)
+
+
+if __name__ == '__main__':
+ Main().start()
diff --git a/adapters/ponsim_onu/ponsim_onu.py b/adapters/ponsim_onu/ponsim_onu.py
new file mode 100644
index 0000000..7e35c7f
--- /dev/null
+++ b/adapters/ponsim_onu/ponsim_onu.py
@@ -0,0 +1,373 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Fully simulated OLT/ONU adapter.
+"""
+
+import sys
+import structlog
+from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from adapters.common.utils.asleep import asleep
+
+from adapters.iadapter import OnuAdapter
+from adapters.protos import third_party
+from adapters.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
+from adapters.protos.device_pb2 import Port
+from adapters.protos.logical_device_pb2 import LogicalPort
+from adapters.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
+ OFPPF_1GB_FD
+from adapters.protos.openflow_13_pb2 import ofp_port
+from adapters.protos.ponsim_pb2 import FlowTable
+from adapters.protos.core_adapter_pb2 import PortCapability
+
+_ = third_party
+log = structlog.get_logger()
+
+
+def mac_str_to_tuple(mac):
+ return tuple(int(d, 16) for d in mac.split(':'))
+
+class PonSimOnuAdapter(OnuAdapter):
+ def __init__(self, adapter_agent, config):
+ # DeviceType of ONU should be same as VENDOR ID of ONU Serial Number as specified by standard
+ # requires for identifying correct adapter or ranged ONU
+ super(PonSimOnuAdapter, self).__init__(adapter_agent=adapter_agent,
+ config=config,
+ device_handler_class=PonSimOnuHandler,
+ name='ponsim_onu',
+ vendor='Voltha project',
+ version='0.4',
+ device_type='ponsim_onu',
+ vendor_id='PSMO',
+ accepts_bulk_flow_update=True,
+ accepts_add_remove_flow_updates=False)
+
+
+class PonSimOnuHandler(object):
+ def __init__(self, adapter, device_id):
+ self.adapter = adapter
+ self.adapter_agent = adapter.adapter_agent
+ self.device_id = device_id
+ self.log = structlog.get_logger(device_id=device_id)
+ self.incoming_messages = DeferredQueue()
+ self.proxy_address = None
+ # reference of uni_port is required when re-enabling the device if
+ # it was disabled previously
+ self.uni_port = None
+ self.pon_port = None
+
+ def receive_message(self, msg):
+ self.incoming_messages.put(msg)
+
+
+ @inlineCallbacks
+ def activate(self, device):
+ self.log.info('activating')
+
+ # TODO: Register for proxy address
+ # # first we verify that we got parent reference and proxy info
+ # assert device.parent_id
+ # assert device.proxy_address.device_id
+ # assert device.proxy_address.channel_id
+ #
+ # # register for proxied messages right away
+ # self.proxy_address = device.proxy_address
+ # self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+
+ # populate device info
+ device.root = False
+ device.vendor = 'ponsim'
+ device.model = 'n/a'
+ device.connect_status = ConnectStatus.REACHABLE
+ yield self.adapter_agent.device_update(device)
+
+ # register physical ports
+ self.uni_port = Port(
+ port_no=2,
+ label='UNI facing Ethernet port',
+ type=Port.ETHERNET_UNI,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
+ )
+ self.pon_port = Port(
+ port_no=1,
+ label='PON port',
+ type=Port.PON_ONU,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE,
+ peers=[
+ Port.PeerPort(
+ device_id=device.parent_id,
+ port_no=device.parent_port_no
+ )
+ ]
+ )
+ self.adapter_agent.port_created(device.id, self.uni_port)
+ self.adapter_agent.port_created(device.id, self.pon_port)
+
+ yield self.adapter_agent.device_state_update(device.id, oper_status=OperStatus.ACTIVE)
+
+
+ def get_ofp_port_info(self, device, port_no):
+ # Since the adapter created the device port then it has the reference of the port to
+ # return the capability. TODO: Do a lookup on the NNI port number and return the
+ # appropriate attributes
+ self.log.info('get_ofp_port_info', port_no=port_no, device_id=device.id)
+ # port_no = device.proxy_address.channel_id
+ cap = OFPPF_1GB_FD | OFPPF_FIBER
+ return PortCapability(
+ port = LogicalPort (
+ id='uni-{}'.format(port_no),
+ ofp_port=ofp_port(
+ port_no=port_no,
+ hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
+ name='uni-{}'.format(port_no),
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=OFPPF_1GB_FD,
+ max_speed=OFPPF_1GB_FD
+ )
+ )
+ )
+
+ def _get_uni_port(self):
+ ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
+ if ports:
+ # For now, we use on one uni port
+ return ports[0]
+
+ def _get_pon_port(self):
+ ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
+ if ports:
+ # For now, we use on one uni port
+ return ports[0]
+
+ def reconcile(self, device):
+ self.log.info('reconciling-ONU-device-starts')
+
+ # first we verify that we got parent reference and proxy info
+ assert device.parent_id
+ assert device.proxy_address.device_id
+ assert device.proxy_address.channel_id
+
+ # register for proxied messages right away
+ self.proxy_address = device.proxy_address
+ self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+
+ # Set the connection status to REACHABLE
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ # TODO: Verify that the uni, pon and logical ports exists
+
+ # Mark the device as REACHABLE and ACTIVE
+ device = self.adapter_agent.get_device(device.id)
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+
+ self.log.info('reconciling-ONU-device-ends')
+
+ @inlineCallbacks
+ def update_flow_table(self, flows):
+
+ # we need to proxy through the OLT to get to the ONU
+
+ # reset response queue
+ while self.incoming_messages.pending:
+ yield self.incoming_messages.get()
+
+ msg = FlowTable(
+ port=self.proxy_address.channel_id,
+ flows=flows
+ )
+ self.adapter_agent.send_proxied_message(self.proxy_address, msg)
+
+ yield self.incoming_messages.get()
+
+ def remove_from_flow_table(self, flows):
+ self.log.debug('remove-from-flow-table', flows=flows)
+ # TODO: Update PONSIM code to accept incremental flow changes.
+ # Once completed, the accepts_add_remove_flow_updates for this
+ # device type can be set to True
+
+ def add_to_flow_table(self, flows):
+ self.log.debug('add-to-flow-table', flows=flows)
+ # TODO: Update PONSIM code to accept incremental flow changes
+ # Once completed, the accepts_add_remove_flow_updates for this
+ # device type can be set to True
+
+ @inlineCallbacks
+ def reboot(self):
+ self.log.info('rebooting', device_id=self.device_id)
+
+ # Update the operational status to ACTIVATING and connect status to
+ # UNREACHABLE
+ device = self.adapter_agent.get_device(self.device_id)
+ previous_oper_status = device.oper_status
+ previous_conn_status = device.connect_status
+ device.oper_status = OperStatus.ACTIVATING
+ device.connect_status = ConnectStatus.UNREACHABLE
+ self.adapter_agent.update_device(device)
+
+ # Sleep 10 secs, simulating a reboot
+ # TODO: send alert and clear alert after the reboot
+ yield asleep(10)
+
+ # Change the operational status back to its previous state. With a
+ # real OLT the operational state should be the state the device is
+ # after a reboot.
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+ device.oper_status = previous_oper_status
+ device.connect_status = previous_conn_status
+ self.adapter_agent.update_device(device)
+ self.log.info('rebooted', device_id=self.device_id)
+
+ def self_test_device(self, device):
+ """
+ This is called to Self a device based on a NBI call.
+ :param device: A Voltha.Device object.
+ :return: Will return result of self test
+ """
+ log.info('self-test-device', device=device.id)
+ raise NotImplementedError()
+
+ def disable(self):
+ self.log.info('disabling', device_id=self.device_id)
+
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+
+ # Disable all ports on that device
+ self.adapter_agent.disable_all_ports(self.device_id)
+
+ # Update the device operational status to UNKNOWN
+ device.oper_status = OperStatus.UNKNOWN
+ device.connect_status = ConnectStatus.UNREACHABLE
+ self.adapter_agent.update_device(device)
+
+ # Remove the uni logical port from the OLT, if still present
+ parent_device = self.adapter_agent.get_device(device.parent_id)
+ assert parent_device
+ logical_device_id = parent_device.parent_id
+ assert logical_device_id
+ port_no = device.proxy_address.channel_id
+ port_id = 'uni-{}'.format(port_no)
+ try:
+ port = self.adapter_agent.get_logical_port(logical_device_id,
+ port_id)
+ self.adapter_agent.delete_logical_port(logical_device_id, port)
+ except KeyError:
+ self.log.info('logical-port-not-found', device_id=self.device_id,
+ portid=port_id)
+
+ # Remove pon port from parent
+ self.pon_port = self._get_pon_port()
+ self.adapter_agent.delete_port_reference_from_parent(self.device_id,
+ self.pon_port)
+
+ # Just updating the port status may be an option as well
+ # port.ofp_port.config = OFPPC_NO_RECV
+ # yield self.adapter_agent.update_logical_port(logical_device_id,
+ # port)
+ # Unregister for proxied message
+ self.adapter_agent.unregister_for_proxied_messages(
+ device.proxy_address)
+
+ # TODO:
+ # 1) Remove all flows from the device
+ # 2) Remove the device from ponsim
+
+ self.log.info('disabled', device_id=device.id)
+
+ def reenable(self):
+ self.log.info('re-enabling', device_id=self.device_id)
+ try:
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+
+ # First we verify that we got parent reference and proxy info
+ assert device.parent_id
+ assert device.proxy_address.device_id
+ assert device.proxy_address.channel_id
+
+ # Re-register for proxied messages right away
+ self.proxy_address = device.proxy_address
+ self.adapter_agent.register_for_proxied_messages(
+ device.proxy_address)
+
+ # Re-enable the ports on that device
+ self.adapter_agent.enable_all_ports(self.device_id)
+
+ # Refresh the port reference
+ self.uni_port = self._get_uni_port()
+ self.pon_port = self._get_pon_port()
+
+ # Add the pon port reference to the parent
+ self.adapter_agent.add_port_reference_to_parent(device.id,
+ self.pon_port)
+
+ # Update the connect status to REACHABLE
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ # re-add uni port to logical device
+ parent_device = self.adapter_agent.get_device(device.parent_id)
+ logical_device_id = parent_device.parent_id
+ assert logical_device_id
+ port_no = device.proxy_address.channel_id
+ cap = OFPPF_1GB_FD | OFPPF_FIBER
+ self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
+ id='uni-{}'.format(port_no),
+ ofp_port=ofp_port(
+ port_no=port_no,
+ hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
+ name='uni-{}'.format(port_no),
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=OFPPF_1GB_FD,
+ max_speed=OFPPF_1GB_FD
+ ),
+ device_id=device.id,
+ device_port_no=self.uni_port.port_no
+ ))
+
+ device = self.adapter_agent.get_device(device.id)
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+
+ self.log.info('re-enabled', device_id=device.id)
+ except Exception, e:
+ self.log.exception('error-reenabling', e=e)
+
+ def delete(self):
+ self.log.info('deleting', device_id=self.device_id)
+
+ # A delete request may be received when an OLT is dsiabled
+
+ # TODO:
+ # 1) Remove all flows from the device
+ # 2) Remove the device from ponsim
+
+ self.log.info('deleted', device_id=self.device_id)
diff --git a/adapters/ponsim_onu/ponsim_onu.yml b/adapters/ponsim_onu/ponsim_onu.yml
new file mode 100644
index 0000000..1afafdf
--- /dev/null
+++ b/adapters/ponsim_onu/ponsim_onu.yml
@@ -0,0 +1,52 @@
+logging:
+ version: 1
+
+ formatters:
+ brief:
+ format: '%(message)s'
+ default:
+ format: '%(asctime)s.%(msecs)03d %(levelname)-8s %(threadName)s %(module)s.%(funcName)s %(message)s'
+ datefmt: '%Y%m%dT%H%M%S'
+
+ handlers:
+ console:
+ class : logging.StreamHandler
+ level: DEBUG
+ formatter: default
+ stream: ext://sys.stdout
+ localRotatingFile:
+ class: logging.handlers.RotatingFileHandler
+ filename: ponsim_onu.log
+ formatter: default
+ maxBytes: 2097152
+ backupCount: 10
+ level: DEBUG
+ null:
+ class: logging.NullHandler
+
+ loggers:
+ amqp:
+ handlers: [null]
+ propagate: False
+ conf:
+ propagate: False
+ '': # root logger
+ handlers: [console, localRotatingFile]
+ level: DEBUG # this can be bumped up/down by -q and -v command line
+ # options
+ propagate: False
+
+
+kafka-cluster-proxy:
+ event_bus_publisher:
+ topic_mappings:
+ 'model-change-events':
+ kafka_topic: 'voltha.events'
+ filters: [null]
+ 'alarms':
+ kafka_topic: 'voltha.alarms'
+ filters: [null]
+ 'kpis':
+ kafka_topic: 'voltha.kpis'
+ filters: [null]
+