VOL-2059 move ponsim-olt adapter to its own repo
Based on voltha-go commit 4f48884d490a3e6627687604ffdf885792326521
Change-Id: Idb23c4183888ed6bdc9ab24b109f505395e00821
diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..60f17bc
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,2 @@
+venv
+.tox
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..07bffa2
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,15 @@
+# testing related
+.coverage
+.tox/
+coverage.xml
+nose-results.xml
+test/unit/tmp
+
+# python distribution related
+dist
+*.egg-info
+
+# ide workspace
+.idea
+
+*.pyc
diff --git a/.gitreview b/.gitreview
new file mode 100644
index 0000000..4575583
--- /dev/null
+++ b/.gitreview
@@ -0,0 +1,5 @@
+[gerrit]
+host=gerrit.opencord.org
+port=29418
+project=voltha-ponsimonu-adapter
+defaultremote=origin
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..37bb9b6
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,146 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# set default shell
+SHELL = bash -e -o pipefail
+
+# Variables
+VERSION ?= $(shell cat ./VERSION)
+
+DOCKER_LABEL_VCS_DIRTY = false
+ifneq ($(shell git ls-files --others --modified --exclude-standard 2>/dev/null | wc -l | sed -e 's/ //g'),0)
+ DOCKER_LABEL_VCS_DIRTY = true
+endif
+## Docker related
+DOCKER_EXTRA_ARGS ?=
+DOCKER_REGISTRY ?=
+DOCKER_REPOSITORY ?=
+DOCKER_TAG ?= ${VERSION}$(shell [[ ${DOCKER_LABEL_VCS_DIRTY} == "true" ]] && echo "-dirty" || true)
+PONSIMONU_IMAGENAME := ${DOCKER_REGISTRY}${DOCKER_REPOSITORY}voltha-adapter-ponsim-onu
+
+## Docker labels. Only set ref and commit date if committed
+DOCKER_LABEL_VCS_URL ?= $(shell git remote get-url $(shell git remote))
+DOCKER_LABEL_VCS_REF = $(shell git rev-parse HEAD)
+DOCKER_LABEL_BUILD_DATE ?= $(shell date -u "+%Y-%m-%dT%H:%M:%SZ")
+DOCKER_LABEL_COMMIT_DATE = $(shell git show -s --format=%cd --date=iso-strict HEAD)
+
+DOCKER_BUILD_ARGS ?= \
+ ${DOCKER_EXTRA_ARGS} \
+ --build-arg org_label_schema_version="${VERSION}" \
+ --build-arg org_label_schema_vcs_url="${DOCKER_LABEL_VCS_URL}" \
+ --build-arg org_label_schema_vcs_ref="${DOCKER_LABEL_VCS_REF}" \
+ --build-arg org_label_schema_build_date="${DOCKER_LABEL_BUILD_DATE}" \
+ --build-arg org_opencord_vcs_commit_date="${DOCKER_LABEL_COMMIT_DATE}" \
+ --build-arg org_opencord_vcs_dirty="${DOCKER_LABEL_VCS_DIRTY}"
+
+DOCKER_BUILD_ARGS_LOCAL ?= ${DOCKER_BUILD_ARGS} \
+ --build-arg LOCAL_PYVOLTHA=${LOCAL_PYVOLTHA} \
+ --build-arg LOCAL_PROTOS=${LOCAL_PROTOS}
+
+.PHONY: local-protos local-pyvoltha
+
+# This should to be the first and default target in this Makefile
+help:
+ @echo "Usage: make [<target>]"
+ @echo "where available targets are:"
+ @echo
+ @echo "build : Build the docker images."
+ @echo " - If this is the first time you are building, choose 'make build' option."
+ @echo "adapter_ponsim_onu : Build the ponsim onu adapter docker image"
+ @echo "venv : Build local Python virtualenv"
+ @echo "clean : Remove files created by the build and tests"
+ @echo "distclean : Remove venv directory"
+ @echo "docker-push : Push the docker images to an external repository"
+ @echo "lint-dockerfile : Perform static analysis on Dockerfiles"
+ @echo "lint : Shorthand for lint-style & lint-sanity"
+ @echo "test : Generate reports for all go tests"
+ @echo
+
+## Local Development Helpers
+local-protos:
+ @mkdir -p local_imports
+ifdef LOCAL_PROTOS
+ rm -rf local_imports/voltha-protos
+ mkdir -p local_imports/voltha-protos/dist
+ cp ../voltha-protos/dist/*.tar.gz local_imports/voltha-protos/dist/
+endif
+
+local-pyvoltha:
+ @mkdir -p local_imports
+ifdef LOCAL_PYVOLTHA
+ rm -rf local_imports/pyvoltha
+ mkdir -p local_imports/pyvoltha/dist
+ cp ../pyvoltha/dist/*.tar.gz local_imports/pyvoltha/dist/
+endif
+
+## Python venv dev environment
+
+VENVDIR := venv
+
+venv: distclean local-protos local-pyvoltha
+ virtualenv ${VENVDIR};\
+ source ./${VENVDIR}/bin/activate ; set -u ;\
+ rm -f ${VENVDIR}/local/bin ${VENVDIR}/local/lib ${VENVDIR}/local/include ;\
+ pip install -r requirements.txt
+ifdef LOCAL_PYVOLTHA
+ source ./${VENVDIR}/bin/activate ; set -u ;\
+ pip install local_imports/pyvoltha/dist/*.tar.gz
+endif
+ifdef LOCAL_PROTOS
+ source ./${VENVDIR}/bin/activate ; set -u ;\
+ pip install local_imports/voltha-protos/dist/*.tar.gz
+endif
+
+## Docker targets
+
+build: docker-build
+
+docker-build: adapter_ponsim_onu
+
+adapter_ponsim_onu: local-protos local-pyvoltha
+ docker build $(DOCKER_BUILD_ARGS_LOCAL) -t ${PONSIMONU_IMAGENAME}:${DOCKER_TAG} -t ${PONSIMONU_IMAGENAME}:latest -f docker/Dockerfile.adapter_ponsim_onu .
+
+docker-push:
+ docker push ${PONSIMONU_IMAGENAME}:${DOCKER_TAG}
+
+## lint and unit tests
+
+PATH:=$(GOPATH)/bin:$(PATH)
+HADOLINT=$(shell PATH=$(GOPATH):$(PATH) which hadolint)
+lint-dockerfile:
+ifeq (,$(shell PATH=$(GOPATH):$(PATH) which hadolint))
+ mkdir -p $(GOPATH)/bin
+ curl -o $(GOPATH)/bin/hadolint -sNSL https://github.com/hadolint/hadolint/releases/download/v1.17.1/hadolint-$(shell uname -s)-$(shell uname -m)
+ chmod 755 $(GOPATH)/bin/hadolint
+endif
+ @echo "Running Dockerfile lint check ..."
+ @hadolint $$(find . -name "Dockerfile.*")
+ @echo "Dockerfile lint check OK"
+
+lint: lint-dockerfile
+
+test:
+ @ echo "Executing unit tests w/tox"
+ tox
+
+clean:
+ rm -rf local_imports
+ find . -name '*.pyc' | xargs rm -f
+
+distclean: clean
+ rm -rf ${VENVDIR}
+
+# end file
diff --git a/VERSION b/VERSION
new file mode 100644
index 0000000..84e6d84
--- /dev/null
+++ b/VERSION
@@ -0,0 +1 @@
+2.2.1-dev
diff --git a/docker/Dockerfile.adapter_ponsim_onu b/docker/Dockerfile.adapter_ponsim_onu
new file mode 100644
index 0000000..1479b1f
--- /dev/null
+++ b/docker/Dockerfile.adapter_ponsim_onu
@@ -0,0 +1,80 @@
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+FROM ubuntu:16.04
+
+# Update to have latest images
+RUN apt-get update && \
+ apt-get install -y --no-install-recommends \
+ ca-certificates=20170717~16.04.2 \
+ python=2.7.12-1~16.04 \
+ openssl=1.0.2g-1ubuntu4.15 \
+ iproute2=4.3.0-1ubuntu3.16.04.5 \
+ libpcap-dev=1.7.4-2 \
+ wget=1.17.1-1ubuntu1.5 \
+ build-essential=12.1ubuntu2 \
+ git=1:2.7.4-0ubuntu1.6 \
+ binutils=2.26.1-1ubuntu1~16.04.8 \
+ python-dev=2.7.12-1~16.04 \
+ libffi-dev=3.2.1-4 \
+ libssl-dev=1.0.2g-1ubuntu4.15 \
+ && apt-get clean && rm -rf /var/lib/apt/lists/*
+
+# Install current version of pip rather than outdated pip from apt
+RUN wget -O /tmp/get-pip.py https://bootstrap.pypa.io/get-pip.py
+RUN python /tmp/get-pip.py
+
+# Install adapter requirements.
+COPY requirements.txt /tmp/requirements.txt
+RUN pip install -r /tmp/requirements.txt
+
+ARG LOCAL_PYVOLTHA
+ARG LOCAL_PROTOS
+COPY local_imports/ /local_imports/
+RUN if [ -n "$LOCAL_PYVOLTHA" ] ; then \
+ PYVOLTHA_PATH=$(ls /local_imports/pyvoltha/dist/) ; \
+ printf "/local_imports/pyvoltha/dist/%s\npyvoltha" "$PYVOLTHA_PATH" > pyvoltha-install.txt ; \
+ pip install -r pyvoltha-install.txt ; \
+fi
+
+RUN if [ -n "$LOCAL_PROTOS" ] ; then \
+ PROTOS_PATH=$(ls /local_imports/voltha-protos/dist/) ; \
+ printf "/local_imports/voltha-protos/dist/%s\nvoltha-protos" "$PROTOS_PATH" > protos-install.txt ; \
+ pip install -r protos-install.txt ; \
+ fi
+
+# Bundle app source
+RUN mkdir /voltha && touch /voltha/__init__.py
+ENV PYTHONPATH=/voltha
+COPY ponsim_onu /voltha/python/adapters/ponsim_onu
+COPY VERSION /voltha/python/adapters/ponsim_onu
+RUN touch /voltha/python/__init__.py
+RUN touch /voltha/python/adapters/__init__.py
+
+# Exposing process and default entry point
+CMD ["python", "/voltha/python/adapters/ponsim_onu/main.py"]
+
+# Label image
+ARG org_label_schema_version=unknown
+ARG org_label_schema_vcs_url=unknown
+ARG org_label_schema_vcs_ref=unknown
+ARG org_label_schema_build_date=unknown
+ARG org_opencord_vcs_commit_date=unknown
+
+LABEL org.label-schema.schema-version=1.0 \
+ org.label-schema.name=voltha-adapter-ponsim-onu \
+ org.label-schema.version=$org_label_schema_version \
+ org.label-schema.vcs-url=$org_label_schema_vcs_url \
+ org.label-schema.vcs-ref=$org_label_schema_vcs_ref \
+ org.label-schema.build-date=$org_label_schema_build_date \
+ org.opencord.vcs-commit-date=$org_opencord_vcs_commit_date
diff --git a/ponsim_onu/__init__.py b/ponsim_onu/__init__.py
new file mode 100644
index 0000000..4a82628
--- /dev/null
+++ b/ponsim_onu/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/ponsim_onu/main.py b/ponsim_onu/main.py
new file mode 100755
index 0000000..fee9cdd
--- /dev/null
+++ b/ponsim_onu/main.py
@@ -0,0 +1,494 @@
+#!/usr/bin/env python
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Ponsim ONU Adapter main entry point"""
+
+import argparse
+import os
+import time
+
+import arrow
+import yaml
+from packaging.version import Version
+from simplejson import dumps
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.task import LoopingCall
+from zope.interface import implementer
+
+from pyvoltha.common.structlog_setup import setup_logging, update_logging
+from pyvoltha.common.utils.asleep import asleep
+from pyvoltha.common.utils.deferred_utils import TimeOutError
+from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
+from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
+ get_my_primary_interface
+from pyvoltha.common.utils.registry import registry, IComponent
+from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
+from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
+from pyvoltha.adapters.kafka.core_proxy import CoreProxy
+from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
+ get_messaging_proxy
+from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from ponsim_onu import PonSimOnuAdapter
+from voltha_protos.adapter_pb2 import AdapterConfig
+
+defs = dict(
+ version_file='./VERSION',
+ config=os.environ.get('CONFIG', './ponsim_onu.yml'),
+ container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
+ '0-9]+)\..*$'),
+ consul=os.environ.get('CONSUL', 'localhost:8500'),
+ name=os.environ.get('NAME', 'ponsim_onu'),
+ vendor=os.environ.get('VENDOR', 'Voltha Project'),
+ device_type=os.environ.get('DEVICE_TYPE', 'ponsim_onu'),
+ accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
+ accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
+ etcd=os.environ.get('ETCD', 'localhost:2379'),
+ core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
+ interface=os.environ.get('INTERFACE', get_my_primary_interface()),
+ instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
+ kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
+ kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
+ backend=os.environ.get('BACKEND', 'none'),
+ retry_interval=os.environ.get('RETRY_INTERVAL', 2),
+ heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
+)
+
+
+def parse_args():
+ parser = argparse.ArgumentParser()
+
+ _help = ('Path to ponsim_onu.yml config file (default: %s). '
+ 'If relative, it is relative to main.py of ponsim adapter.'
+ % defs['config'])
+ parser.add_argument('-c', '--config',
+ dest='config',
+ action='store',
+ default=defs['config'],
+ help=_help)
+
+ _help = 'Regular expression for extracting conatiner number from ' \
+ 'container name (default: %s)' % defs['container_name_regex']
+ parser.add_argument('-X', '--container-number-extractor',
+ dest='container_name_regex',
+ action='store',
+ default=defs['container_name_regex'],
+ help=_help)
+
+ _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
+ parser.add_argument('-C', '--consul',
+ dest='consul',
+ action='store',
+ default=defs['consul'],
+ help=_help)
+
+ _help = 'name of this adapter (default: %s)' % defs['name']
+ parser.add_argument('-na', '--name',
+ dest='name',
+ action='store',
+ default=defs['name'],
+ help=_help)
+
+ _help = 'vendor of this adapter (default: %s)' % defs['vendor']
+ parser.add_argument('-ven', '--vendor',
+ dest='vendor',
+ action='store',
+ default=defs['vendor'],
+ help=_help)
+
+ _help = 'supported device type of this adapter (default: %s)' % defs[
+ 'device_type']
+ parser.add_argument('-dt', '--device_type',
+ dest='device_type',
+ action='store',
+ default=defs['device_type'],
+ help=_help)
+
+ _help = 'specifies whether the device type accepts bulk flow updates ' \
+ 'adapter (default: %s)' % defs['accept_bulk_flow']
+ parser.add_argument('-abf', '--accept_bulk_flow',
+ dest='accept_bulk_flow',
+ action='store',
+ default=defs['accept_bulk_flow'],
+ help=_help)
+
+ _help = 'specifies whether the device type accepts add/remove flow ' \
+ '(default: %s)' % defs['accept_atomic_flow']
+ parser.add_argument('-aaf', '--accept_atomic_flow',
+ dest='accept_atomic_flow',
+ action='store',
+ default=defs['accept_atomic_flow'],
+ help=_help)
+
+ _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
+ parser.add_argument('-e', '--etcd',
+ dest='etcd',
+ action='store',
+ default=defs['etcd'],
+ help=_help)
+
+ _help = ('unique string id of this container instance (default: %s)'
+ % defs['instance_id'])
+ parser.add_argument('-i', '--instance-id',
+ dest='instance_id',
+ action='store',
+ default=defs['instance_id'],
+ help=_help)
+
+ _help = 'ETH interface to recieve (default: %s)' % defs['interface']
+ parser.add_argument('-I', '--interface',
+ dest='interface',
+ action='store',
+ default=defs['interface'],
+ help=_help)
+
+ _help = 'omit startup banner log lines'
+ parser.add_argument('-n', '--no-banner',
+ dest='no_banner',
+ action='store_true',
+ default=False,
+ help=_help)
+
+ _help = 'do not emit periodic heartbeat log messages'
+ parser.add_argument('-N', '--no-heartbeat',
+ dest='no_heartbeat',
+ action='store_true',
+ default=False,
+ help=_help)
+
+ _help = "suppress debug and info logs"
+ parser.add_argument('-q', '--quiet',
+ dest='quiet',
+ action='count',
+ help=_help)
+
+ _help = 'enable verbose logging'
+ parser.add_argument('-v', '--verbose',
+ dest='verbose',
+ action='count',
+ help=_help)
+
+ _help = ('use docker container name as conatiner instance id'
+ ' (overrides -i/--instance-id option)')
+ parser.add_argument('--instance-id-is-container-name',
+ dest='instance_id_is_container_name',
+ action='store_true',
+ default=False,
+ help=_help)
+
+ _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
+ 'If not '
+ 'specified (None), the address from the config file is used'
+ % defs['kafka_adapter'])
+ parser.add_argument('-KA', '--kafka_adapter',
+ dest='kafka_adapter',
+ action='store',
+ default=defs['kafka_adapter'],
+ help=_help)
+
+ _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
+ 'If not '
+ 'specified (None), the address from the config file is used'
+ % defs['kafka_cluster'])
+ parser.add_argument('-KC', '--kafka_cluster',
+ dest='kafka_cluster',
+ action='store',
+ default=defs['kafka_cluster'],
+ help=_help)
+
+ _help = 'backend to use for config persitence'
+ parser.add_argument('-b', '--backend',
+ default=defs['backend'],
+ choices=['none', 'consul', 'etcd'],
+ help=_help)
+
+ _help = 'topic of core on the kafka bus'
+ parser.add_argument('-ct', '--core_topic',
+ dest='core_topic',
+ action='store',
+ default=defs['core_topic'],
+ help=_help)
+
+ args = parser.parse_args()
+
+ # post-processing
+
+ if args.instance_id_is_container_name:
+ args.instance_id = get_my_containers_name()
+
+ return args
+
+
+def load_config(args):
+ path = args.config
+ if path.startswith('.'):
+ dir = os.path.dirname(os.path.abspath(__file__))
+ path = os.path.join(dir, path)
+ path = os.path.abspath(path)
+ with open(path) as fd:
+ config = yaml.load(fd)
+ return config
+
+
+def print_banner(log):
+ log.info(' ____ _ ___ _ _ _ _ ')
+ log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| \ | | | | | ')
+ log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | \| | | | | ')
+ log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |\ | |_| | ')
+ log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_| \_|\___/ ')
+ log.info(' _ _ _ ')
+ log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
+ log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
+ log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
+ log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
+ log.info(' |_| ')
+ log.info('(to stop: press Ctrl-C)')
+
+
+@implementer(IComponent)
+class Main(object):
+
+ def __init__(self):
+
+ self.args = args = parse_args()
+ self.config = load_config(args)
+
+ verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
+ self.log = setup_logging(self.config.get('logging', {}),
+ args.instance_id,
+ verbosity_adjust=verbosity_adjust)
+ self.log.info('container-number-extractor',
+ regex=args.container_name_regex)
+
+ self.ponsim_olt_adapter_version = self.get_version()
+ self.log.info('Ponsim-ONU-Adapter-Version', version=
+ self.ponsim_olt_adapter_version)
+
+ if not args.no_banner:
+ print_banner(self.log)
+
+ self.adapter = None
+ # Create a unique instance id using the passed-in instance id and
+ # UTC timestamp
+ current_time = arrow.utcnow().timestamp
+ self.instance_id = self.args.instance_id + '_' + str(current_time)
+
+ self.core_topic = args.core_topic
+ self.listening_topic = args.name
+ self.startup_components()
+
+ if not args.no_heartbeat:
+ self.start_heartbeat()
+ self.start_kafka_cluster_heartbeat(self.instance_id)
+
+ def get_version(self):
+ path = defs['version_file']
+ if not path.startswith('/'):
+ dir = os.path.dirname(os.path.abspath(__file__))
+ path = os.path.join(dir, path)
+
+ path = os.path.abspath(path)
+ version_file = open(path, 'r')
+ v = version_file.read()
+
+ # Use Version to validate the version string - exception will be raised
+ # if the version is invalid
+ Version(v)
+
+ version_file.close()
+ return v
+
+ def start(self):
+ self.start_reactor() # will not return except Keyboard interrupt
+
+ def stop(self):
+ pass
+
+ def get_args(self):
+ """Allow access to command line args"""
+ return self.args
+
+ def get_config(self):
+ """Allow access to content of config file"""
+ return self.config
+
+ def _get_adapter_config(self):
+ cfg = AdapterConfig()
+ return cfg
+
+ @inlineCallbacks
+ def startup_components(self):
+ try:
+ self.log.info('starting-internal-components',
+ consul=self.args.consul,
+ etcd=self.args.etcd)
+
+ registry.register('main', self)
+
+ # Update the logger to output the vcore id.
+ self.log = update_logging(instance_id=self.instance_id,
+ vcore_id=None)
+
+ yield registry.register(
+ 'kafka_cluster_proxy',
+ KafkaProxy(
+ self.args.consul,
+ self.args.kafka_cluster,
+ config=self.config.get('kafka-cluster-proxy', {})
+ )
+ ).start()
+
+ config = self._get_adapter_config()
+
+ self.core_proxy = CoreProxy(
+ kafka_proxy=None,
+ default_core_topic=self.core_topic,
+ my_listening_topic=self.listening_topic)
+
+ self.adapter_proxy = AdapterProxy(
+ kafka_proxy=None,
+ core_topic=self.core_topic,
+ my_listening_topic=self.listening_topic)
+
+ self.adapter = PonSimOnuAdapter(
+ core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
+ config=config)
+ ponsim_request_handler = AdapterRequestFacade(
+ adapter=self.adapter, core_proxy=self.core_proxy)
+
+ yield registry.register(
+ 'kafka_adapter_proxy',
+ IKafkaMessagingProxy(
+ kafka_host_port=self.args.kafka_adapter,
+ # TODO: Add KV Store object reference
+ kv_store=self.args.backend,
+ default_topic=self.args.name,
+ group_id_prefix=self.args.instance_id,
+ target_cls=ponsim_request_handler
+ )
+ ).start()
+
+ self.core_proxy.kafka_proxy = get_messaging_proxy()
+ self.adapter_proxy.kafka_proxy = get_messaging_proxy()
+
+ # retry for ever
+ res = yield self._register_with_core(-1)
+
+ self.log.info('started-internal-services')
+
+ except Exception as e:
+ self.log.exception('Failure-to-start-all-components', e=e)
+
+ @inlineCallbacks
+ def shutdown_components(self):
+ """Execute before the reactor is shut down"""
+ self.log.info('exiting-on-keyboard-interrupt')
+ for component in reversed(registry.iterate()):
+ yield component.stop()
+
+ import threading
+ self.log.info('THREADS:')
+ main_thread = threading.current_thread()
+ for t in threading.enumerate():
+ if t is main_thread:
+ continue
+ if not t.isDaemon():
+ continue
+ self.log.info('joining thread {} {}'.format(
+ t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
+ t.join()
+
+ def start_reactor(self):
+ from twisted.internet import reactor
+ reactor.callWhenRunning(
+ lambda: self.log.info('twisted-reactor-started'))
+ reactor.addSystemEventTrigger('before', 'shutdown',
+ self.shutdown_components)
+ reactor.run()
+
+ @inlineCallbacks
+ def _register_with_core(self, retries):
+ while 1:
+ try:
+ resp = yield self.core_proxy.register(
+ self.adapter.adapter_descriptor(),
+ self.adapter.device_types())
+ if resp:
+ self.log.info('registered-with-core',
+ coreId=resp.instance_id)
+
+ returnValue(resp)
+ except TimeOutError as e:
+ self.log.warn("timeout-when-registering-with-core", e=e)
+ if retries == 0:
+ self.log.exception("no-more-retries", e=e)
+ raise
+ else:
+ retries = retries if retries < 0 else retries - 1
+ yield asleep(defs['retry_interval'])
+ except Exception as e:
+ self.log.exception("failed-registration", e=e)
+ raise
+
+ def start_heartbeat(self):
+
+ t0 = time.time()
+ t0s = time.ctime(t0)
+
+ def heartbeat():
+ self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
+
+ lc = LoopingCall(heartbeat)
+ lc.start(10)
+
+ # Temporary function to send a heartbeat message to the external kafka
+ # broker
+ def start_kafka_cluster_heartbeat(self, instance_id):
+ # For heartbeat we will send a message to a specific "voltha-heartbeat"
+ # topic. The message is a protocol buf
+ # message
+ message = dict(
+ type='heartbeat',
+ adapter=self.args.name,
+ instance=instance_id,
+ ip=get_my_primary_local_ipv4()
+ )
+ topic = defs['heartbeat_topic']
+
+ def send_msg(start_time):
+ try:
+ kafka_cluster_proxy = get_kafka_proxy()
+ if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
+ # self.log.debug('kafka-proxy-available')
+ message['ts'] = arrow.utcnow().timestamp
+ message['uptime'] = time.time() - start_time
+ # self.log.debug('start-kafka-heartbeat')
+ kafka_cluster_proxy.send_message(topic, dumps(message))
+ else:
+ self.log.error('kafka-proxy-unavailable')
+ except Exception, e:
+ self.log.exception('failed-sending-message-heartbeat', e=e)
+
+ try:
+ t0 = time.time()
+ lc = LoopingCall(send_msg, t0)
+ lc.start(10)
+ except Exception, e:
+ self.log.exception('failed-kafka-heartbeat', e=e)
+
+
+if __name__ == '__main__':
+ Main().start()
diff --git a/ponsim_onu/ponsim_onu.py b/ponsim_onu/ponsim_onu.py
new file mode 100644
index 0000000..0987147
--- /dev/null
+++ b/ponsim_onu/ponsim_onu.py
@@ -0,0 +1,514 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Represents an ONU device
+"""
+
+from uuid import uuid4
+
+import arrow
+import structlog
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+from twisted.internet.defer import DeferredQueue, inlineCallbacks, \
+ returnValue, Deferred
+from twisted.internet.task import LoopingCall
+
+from pyvoltha.common.utils.asleep import asleep
+from pyvoltha.adapters.iadapter import OnuAdapter
+from pyvoltha.adapters.kafka.kafka_proxy import get_kafka_proxy
+from voltha_protos.common_pb2 import OperStatus, ConnectStatus, AdminState
+from voltha_protos.inter_container_pb2 import PortCapability, \
+ InterAdapterMessageType, InterAdapterResponseBody
+from voltha_protos.device_pb2 import Port, PmConfig, PmConfigs
+from voltha_protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
+from voltha_protos.logical_device_pb2 import LogicalPort
+from voltha_protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
+ OFPPF_1GB_FD
+from voltha_protos.openflow_13_pb2 import ofp_port
+from voltha_protos.ponsim_pb2 import FlowTable, PonSimMetricsRequest, PonSimMetrics
+
+log = structlog.get_logger()
+
+
+def mac_str_to_tuple(mac):
+ return tuple(int(d, 16) for d in mac.split(':'))
+
+
+class AdapterPmMetrics:
+ def __init__(self, device):
+ self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
+ 'tx_256_511_pkts', 'tx_512_1023_pkts',
+ 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
+ 'rx_64_pkts', 'rx_65_127_pkts',
+ 'rx_128_255_pkts', 'rx_256_511_pkts',
+ 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
+ 'rx_1519_9k_pkts'}
+ self.device = device
+ self.id = device.id
+ self.name = 'ponsim_onu'
+ self.default_freq = 150
+ self.grouped = False
+ self.freq_override = False
+ self.pm_metrics = None
+ self.pon_metrics_config = dict()
+ self.uni_metrics_config = dict()
+ self.lc = None
+ for m in self.pm_names:
+ self.pon_metrics_config[m] = PmConfig(name=m,
+ type=PmConfig.COUNTER,
+ enabled=True)
+ self.uni_metrics_config[m] = PmConfig(name=m,
+ type=PmConfig.COUNTER,
+ enabled=True)
+
+ def update(self, pm_config):
+ if self.default_freq != pm_config.default_freq:
+ # Update the callback to the new frequency.
+ self.default_freq = pm_config.default_freq
+ self.lc.stop()
+ self.lc.start(interval=self.default_freq / 10)
+ for m in pm_config.metrics:
+ self.pon_metrics_config[m.name].enabled = m.enabled
+ self.uni_metrics_config[m.name].enabled = m.enabled
+
+ def make_proto(self):
+ pm_config = PmConfigs(
+ id=self.id,
+ default_freq=self.default_freq,
+ grouped=False,
+ freq_override=False)
+ for m in sorted(self.pon_metrics_config):
+ pm = self.pon_metrics_config[m] # Either will do they're the same
+ pm_config.metrics.extend([PmConfig(name=pm.name,
+ type=pm.type,
+ enabled=pm.enabled)])
+ return pm_config
+
+ def extract_metrics(self, stats):
+ rtrn_port_metrics = dict()
+ rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
+ rtrn_port_metrics['uni'] = self.extract_uni_metrics(stats)
+ return rtrn_port_metrics
+
+ def extract_pon_metrics(self, stats):
+ rtrn_pon_metrics = dict()
+ for m in stats.metrics:
+ if m.port_name == "pon":
+ for p in m.packets:
+ if self.pon_metrics_config[p.name].enabled:
+ rtrn_pon_metrics[p.name] = p.value
+ return rtrn_pon_metrics
+
+ def extract_uni_metrics(self, stats):
+ rtrn_pon_metrics = dict()
+ for m in stats.metrics:
+ if m.port_name == "uni":
+ for p in m.packets:
+ if self.pon_metrics_config[p.name].enabled:
+ rtrn_pon_metrics[p.name] = p.value
+ return rtrn_pon_metrics
+
+ def start_collector(self, callback):
+ log.info("starting-pm-collection", device_name=self.name,
+ device_id=self.device.id)
+ prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
+ self.lc = LoopingCall(callback, self.device.id, prefix)
+ self.lc.start(interval=self.default_freq / 10)
+
+ def stop_collector(self):
+ log.info("stopping-pm-collection", device_name=self.name,
+ device_id=self.device.id)
+ self.lc.stop()
+
+
+class PonSimOnuAdapter(OnuAdapter):
+ def __init__(self, core_proxy, adapter_proxy, config):
+ # DeviceType of ONU should be same as VENDOR ID of ONU Serial Number
+ # as specified by standard
+ # requires for identifying correct adapter or ranged ONU
+ super(PonSimOnuAdapter, self).__init__(core_proxy=core_proxy,
+ adapter_proxy=adapter_proxy,
+ config=config,
+ device_handler_class=PonSimOnuHandler,
+ name='ponsim_onu',
+ vendor='Voltha project',
+ version='0.4',
+ device_type='ponsim_onu',
+ vendor_id='PSMO',
+ accepts_bulk_flow_update=True,
+ accepts_add_remove_flow_updates=False)
+
+
+class PonSimOnuHandler(object):
+ def __init__(self, adapter, device_id):
+ self.adapter = adapter
+ self.core_proxy = adapter.core_proxy
+ self.adapter_proxy = adapter.adapter_proxy
+ self.device_id = device_id
+ self.device_parent_id = None
+ self.log = structlog.get_logger(device_id=device_id)
+ self.incoming_messages = DeferredQueue()
+ self.inter_adapter_message_deferred_map = {}
+ self.proxy_address = None
+ # reference of uni_port is required when re-enabling the device if
+ # it was disabled previously
+ self.uni_port = None
+ self.pon_port = None
+
+ def _to_string(self, unicode_str):
+ if unicode_str is not None:
+ if type(unicode_str) == unicode:
+ return unicode_str.encode('ascii', 'ignore')
+ else:
+ return unicode_str
+ else:
+ return ""
+
+ def receive_message(self, msg):
+ trns_id = self._to_string(msg.header.id)
+ if trns_id in self.inter_adapter_message_deferred_map:
+ self.inter_adapter_message_deferred_map[trns_id].callback(msg)
+ # self.incoming_messages.put(msg)
+
+ @inlineCallbacks
+ def activate(self, device):
+ self.log.info('activating')
+
+ self.device_parent_id = device.parent_id
+ self.proxy_address = device.proxy_address
+
+ # populate device info
+ device.root = False
+ device.vendor = 'ponsim'
+ device.model = 'n/a'
+ device.serial_number = device.serial_number
+ device.mac_address = "AA:BB:CC:DD:E0:00"
+ yield self.core_proxy.device_update(device)
+
+ # Now set the initial PM configuration for this device
+ self.pm_metrics = AdapterPmMetrics(device)
+ pm_config = self.pm_metrics.make_proto()
+ log.info("initial-pm-config", pm_config=pm_config)
+ yield self.core_proxy.device_pm_config_update(pm_config, init=True)
+
+ # Use the channel Id, assigned by the parent device to me, as the port number
+ uni_port = 2
+ if device.proxy_address is not None:
+ if device.proxy_address.channel_id != 0:
+ uni_port = device.proxy_address.channel_id
+
+ # register physical ports
+ self.uni_port = Port(
+ port_no=uni_port,
+ label="uni-" + str(uni_port),
+ type=Port.ETHERNET_UNI,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
+ )
+ self.pon_port = Port(
+ port_no=1,
+ label='pon-1',
+ type=Port.PON_ONU,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE,
+ peers=[
+ Port.PeerPort(
+ device_id=device.parent_id,
+ port_no=device.parent_port_no
+ )
+ ]
+ )
+ yield self.core_proxy.port_created(device.id, self.uni_port)
+ yield self.core_proxy.port_created(device.id, self.pon_port)
+
+ yield self.core_proxy.device_state_update(device.id,
+ connect_status=ConnectStatus.REACHABLE,
+ oper_status=OperStatus.ACTIVE)
+
+ # Start collecting stats from the device after a brief pause
+ self.start_kpi_collection(device.id)
+
+ # TODO: Return only port specific info
+ def get_ofp_port_info(self, device, port_no):
+ # Since the adapter created the device port then it has the reference
+ # of the port to
+ # return the capability. TODO: Do a lookup on the UNI port number
+ # and return the
+ # appropriate attributes
+ self.log.info('get_ofp_port_info', port_no=port_no, device_id=device.id)
+ cap = OFPPF_1GB_FD | OFPPF_FIBER
+ return PortCapability(
+ port=LogicalPort(
+ ofp_port=ofp_port(
+ hw_addr=mac_str_to_tuple('AA:BB:CC:DD:E0:%02x' % port_no),
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=OFPPF_1GB_FD,
+ max_speed=OFPPF_1GB_FD
+ ),
+ device_id=device.id,
+ device_port_no=port_no
+ )
+ )
+
+ @inlineCallbacks
+ def _get_uni_port(self):
+ ports = yield self.core_proxy.get_ports(self.device_id,
+ Port.ETHERNET_UNI)
+ returnValue(ports)
+
+ @inlineCallbacks
+ def _get_pon_port(self):
+ ports = yield self.core_proxy.get_ports(self.device_id, Port.PON_ONU)
+ returnValue(ports)
+
+ def reconcile(self, device):
+ self.log.info('reconciling-ONU-device-starts')
+ # TODO: complete code
+
+ @inlineCallbacks
+ def update_flow_table(self, flows):
+ trnsId = None
+ try:
+ self.log.info('update_flow_table', flows=flows)
+ # we need to proxy through the OLT to get to the ONU
+
+ fb = FlowTable(
+ port=self.proxy_address.channel_id,
+ flows=flows
+ )
+
+ # Create a deferred to wait for the result as well as a transid
+ wait_for_result = Deferred()
+ trnsId = uuid4().hex
+ self.inter_adapter_message_deferred_map[
+ self._to_string(trnsId)] = wait_for_result
+
+ # Sends the request via proxy and wait for an ACK
+ yield self.adapter_proxy.send_inter_adapter_message(
+ msg=fb,
+ type=InterAdapterMessageType.FLOW_REQUEST,
+ from_adapter=self.adapter.name,
+ to_adapter=self.proxy_address.device_type,
+ to_device_id=self.device_id,
+ proxy_device_id=self.proxy_address.device_id,
+ message_id=trnsId
+ )
+ # Wait for the full response from the proxied adapter
+ res = yield wait_for_result
+ if res.header.type == InterAdapterMessageType.FLOW_RESPONSE:
+ body = InterAdapterResponseBody()
+ res.body.Unpack(body)
+ self.log.info('response-received', result=body.status)
+ except Exception as e:
+ self.log.exception("update-flow-error", e=e)
+ finally:
+ if trnsId in self.inter_adapter_message_deferred_map:
+ del self.inter_adapter_message_deferred_map[trnsId]
+
+ def process_inter_adapter_message(self, msg):
+ # We expect only responses on the ONU side
+ self.log.info('process-inter-adapter-message', msg=msg)
+ self.receive_message(msg)
+
+ def remove_from_flow_table(self, flows):
+ self.log.debug('remove-from-flow-table', flows=flows)
+ # TODO: Update PONSIM code to accept incremental flow changes.
+ # Once completed, the accepts_add_remove_flow_updates for this
+ # device type can be set to True
+
+ def add_to_flow_table(self, flows):
+ self.log.debug('add-to-flow-table', flows=flows)
+ # TODO: Update PONSIM code to accept incremental flow changes
+ # Once completed, the accepts_add_remove_flow_updates for this
+ # device type can be set to True
+
+ @inlineCallbacks
+ def reboot(self):
+ self.log.info('rebooting', device_id=self.device_id)
+
+ # Update the connect status to UNREACHABLE
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.UNREACHABLE)
+
+ # Sleep 10 secs, simulating a reboot
+ # TODO: send alert and clear alert after the reboot
+ yield asleep(10)
+
+ # Change the connection status back to REACHABLE. With a
+ # real ONU the connection state must be the actual state
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.REACHABLE)
+
+ self.log.info('rebooted', device_id=self.device_id)
+
+ def self_test_device(self, device):
+ """
+ This is called to Self a device based on a NBI call.
+ :param device: A Voltha.Device object.
+ :return: Will return result of self test
+ """
+ log.info('self-test-device', device=device.id)
+ raise NotImplementedError()
+
+ @inlineCallbacks
+ def disable(self):
+ self.log.info('disabling', device_id=self.device_id)
+
+ # Update the device operational status to UNKNOWN
+ yield self.core_proxy.device_state_update(self.device_id,
+ oper_status=OperStatus.UNKNOWN,
+ connect_status=ConnectStatus.UNREACHABLE)
+
+ self.stop_kpi_collection()
+
+ # TODO:
+ # 1) Remove all flows from the device
+ # 2) Remove the device from ponsim
+ self.log.info('disabled', device_id=self.device_id)
+
+ @inlineCallbacks
+ def reenable(self):
+ self.log.info('re-enabling', device_id=self.device_id)
+ try:
+
+ # Refresh the port reference - we only use one port for now
+ ports = yield self._get_uni_port()
+ self.log.info('re-enabling-uni-ports', ports=ports)
+ if ports.items:
+ self.uni_port = ports.items[0]
+
+ ports = yield self._get_pon_port()
+ self.log.info('re-enabling-pon-ports', ports=ports)
+ if ports.items:
+ self.pon_port = ports.items[0]
+
+ # Update the state of the UNI port
+ yield self.core_proxy.port_state_update(self.device_id,
+ port_type=Port.ETHERNET_UNI,
+ port_no=self.uni_port.port_no,
+ oper_status=OperStatus.ACTIVE)
+
+ # Update the state of the PON port
+ yield self.core_proxy.port_state_update(self.device_id,
+ port_type=Port.PON_ONU,
+ port_no=self.pon_port.port_no,
+ oper_status=OperStatus.ACTIVE)
+
+ yield self.core_proxy.device_state_update(self.device_id,
+ oper_status=OperStatus.ACTIVE,
+ connect_status=ConnectStatus.REACHABLE)
+
+ self.start_kpi_collection(self.device_id)
+
+ self.log.info('re-enabled', device_id=self.device_id)
+ except Exception, e:
+ self.log.exception('error-reenabling', e=e)
+
+ def delete(self):
+ self.log.info('deleting', device_id=self.device_id)
+
+ # TODO:
+ # 1) Remove all flows from the device
+ # 2) Remove the device from ponsim
+
+ self.log.info('deleted', device_id=self.device_id)
+
+ def start_kpi_collection(self, device_id):
+ kafka_cluster_proxy = get_kafka_proxy()
+
+ @inlineCallbacks
+ def _collect(device_id, prefix):
+ try:
+ self.log.debug("pm-collection-interval")
+ # Proxy a message to ponsim_olt. The OLT will then query the ONU
+ # for statistics. The reply will
+ # arrive proxied back to us in self.receive_message().
+ msg = PonSimMetricsRequest(port=self.proxy_address.channel_id)
+
+ # Create a deferred to wait for the result as well as a transid
+ wait_for_result = Deferred()
+ trnsId = uuid4().hex
+ self.inter_adapter_message_deferred_map[
+ self._to_string(trnsId)] = wait_for_result
+
+ # Sends the request via proxy and wait for an ACK
+ yield self.adapter_proxy.send_inter_adapter_message(
+ msg=msg,
+ type=InterAdapterMessageType.METRICS_REQUEST,
+ from_adapter=self.adapter.name,
+ to_adapter=self.proxy_address.device_type,
+ to_device_id=self.device_id,
+ proxy_device_id=self.proxy_address.device_id,
+ message_id=trnsId
+ )
+ # Wait for the full response from the proxied adapter
+ res = yield wait_for_result
+ # Remove the transaction from the transaction map
+ del self.inter_adapter_message_deferred_map[self._to_string(trnsId)]
+
+ # Message is a reply to an ONU statistics request. Push it out to
+ # Kafka via adapter.submit_kpis().
+ if res.header.type == InterAdapterMessageType.METRICS_RESPONSE:
+ msg = InterAdapterResponseBody()
+ res.body.Unpack(msg)
+ self.log.debug('metrics-response-received', result=msg.status)
+ if self.pm_metrics:
+ self.log.debug('Handling incoming ONU metrics')
+ response = PonSimMetrics()
+ msg.body.Unpack(response)
+ port_metrics = self.pm_metrics.extract_metrics(response)
+ try:
+ ts = arrow.utcnow().timestamp
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=ts,
+ prefixes={
+ # OLT NNI port
+ prefix + '.uni': MetricValuePairs(
+ metrics=port_metrics['uni']),
+ # OLT PON port
+ prefix + '.pon': MetricValuePairs(
+ metrics=port_metrics['pon'])
+ }
+ )
+
+ self.log.debug(
+ 'Submitting KPI for incoming ONU mnetrics')
+
+ # Step 3: submit directly to the kafka bus
+ if kafka_cluster_proxy:
+ if isinstance(kpi_event, Message):
+ kpi_event = dumps(
+ MessageToDict(kpi_event, True, True))
+ kafka_cluster_proxy.send_message("voltha.kpis",
+ kpi_event)
+
+ except Exception as e:
+ log.exception('failed-to-submit-kpis', e=e)
+ except Exception as e:
+ log.exception('failed-to-collect-metrics', e=e)
+
+ self.pm_metrics.start_collector(_collect)
+
+ def stop_kpi_collection(self):
+ self.pm_metrics.stop_collector()
diff --git a/ponsim_onu/ponsim_onu.yml b/ponsim_onu/ponsim_onu.yml
new file mode 100644
index 0000000..aa9d43c
--- /dev/null
+++ b/ponsim_onu/ponsim_onu.yml
@@ -0,0 +1,67 @@
+---
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+logging:
+ version: 1
+
+ formatters:
+ brief:
+ format: '%(message)s'
+ default:
+ format: '%(asctime)s.%(msecs)03d %(levelname)-8s %(threadName)s %(module)s.%(funcName)s %(message)s'
+ datefmt: '%Y%m%dT%H%M%S'
+
+ handlers:
+ console:
+ class : logging.StreamHandler
+ level: DEBUG
+ formatter: default
+ stream: ext://sys.stdout
+ localRotatingFile:
+ class: logging.handlers.RotatingFileHandler
+ filename: ponsim_onu.log
+ formatter: default
+ maxBytes: 2097152
+ backupCount: 10
+ level: DEBUG
+ null:
+ class: logging.NullHandler
+
+ loggers:
+ amqp:
+ handlers: [null]
+ propagate: False
+ conf:
+ propagate: False
+ '': # root logger
+ handlers: [console, localRotatingFile]
+ level: DEBUG # this can be bumped up/down by -q and -v command line
+ # options
+ propagate: False
+
+
+kafka-cluster-proxy:
+ event_bus_publisher:
+ topic_mappings:
+ 'model-change-events':
+ kafka_topic: 'voltha.events'
+ filters: [null]
+ 'alarms':
+ kafka_topic: 'voltha.alarms'
+ filters: [null]
+ 'kpis':
+ kafka_topic: 'voltha.kpis'
+ filters: [null]
+
diff --git a/ponsim_onu/test_ponsim_onu.py b/ponsim_onu/test_ponsim_onu.py
new file mode 100644
index 0000000..1b23cba
--- /dev/null
+++ b/ponsim_onu/test_ponsim_onu.py
@@ -0,0 +1,30 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import ponsim_onu
+
+
+class TestPonsimOnu (unittest.TestCase):
+ def test_mac_str_to_tuple(self):
+ result = ponsim_onu.mac_str_to_tuple("01:02:03:04:05:06")
+ self.assertEqual(result, (1, 2, 3, 4, 5, 6))
+
+
+def main():
+ unittest.main()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..b82f310
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,63 @@
+argparse==1.2.1
+arrow==0.10.0
+bitstring==3.1.5
+cmd2==0.7.0
+colorama==0.3.9
+confluent-kafka==0.11.5
+cython==0.24.1
+decorator==4.1.2
+docker-py==1.10.6
+fluent-logger==0.6.0
+grpc==0.3.post19
+grpcio==1.16.0
+grpcio-tools==1.16.0
+hash_ring==1.3.1
+hexdump==3.3
+jinja2==2.8
+jsonpatch==1.16
+kafka_python==1.3.5
+kafkaloghandler==0.9.0
+klein==17.10.0
+kubernetes==5.0.0
+netaddr==0.7.19
+networkx==2.0
+netifaces==0.10.6
+pcapy==0.11.1
+pep8==1.7.1
+pep8-naming>=0.3.3
+protobuf==3.6.1
+protobuf-to-dict==0.1.0
+pyflakes==2.1.0
+pylint==1.9.4
+pyOpenSSL==17.3.0
+PyYAML==3.12
+requests==2.18.4
+scapy==2.3.3
+service-identity==17.0.0
+simplejson==3.12.0
+jsonschema==2.6.0
+six==1.12.0
+structlog==17.2.0
+termcolor==1.1.0
+transitions==0.6.4
+treq==17.8.0
+Twisted==18.7.0
+txaioetcd==0.3.0
+urllib3==1.22
+pyang==1.7.3
+lxml==3.6.4
+nosexcover==1.0.11
+zmq==0.0.0
+pyzmq==16.0.3
+txZMQ==0.8.0
+ncclient==0.5.3
+xmltodict==0.11.0
+dicttoxml==1.7.4
+etcd3==0.7.0
+pyparsing==2.2.0
+packaging==17.1
+pexpect==4.6.0
+python-consul==0.6.2
+afkak==3.0.0.dev20181106
+voltha-protos==1.0.0
+pyvoltha==0.2.2
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..f7f2ca1
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,35 @@
+; Copyright 2017 the original author or authors.
+;
+; Licensed under the Apache License, Version 2.0 (the "License");
+; you may not use this file except in compliance with the License.
+; You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+
+[tox]
+envlist = py27
+skipsdist = True
+
+[testenv]
+commands =
+ nosetests -c tox.ini
+
+deps =
+ nose
+ mock
+ coverage
+ -rrequirements.txt
+
+[nosetests]
+with-xunit=1
+xunit-file=nose-results.xml
+with-coverage=1
+cover-xml=1
+cover-xml-file=coverage.xml
+cover-package=pyvoltha