VOL-1097 : Ofagent integration for voltha 2.0
- Created a common location for python based components
- Adjusted the ofagent component to interact with voltha 2.0
- Added streaming rpc methods for rcv/send of packets to voltha api
- Adjusted voltha.proto
Change-Id: I47fb7b80878ead060b4b42bd16cb4f8aa384fdb6
diff --git a/python/common/__init__.py b/python/common/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/common/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/event_bus.py b/python/common/event_bus.py
new file mode 100644
index 0000000..e717c16
--- /dev/null
+++ b/python/common/event_bus.py
@@ -0,0 +1,194 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A simple internal pub/sub event bus with topics and filter-based registration.
+"""
+import re
+
+import structlog
+
+
+log = structlog.get_logger()
+
+
+class _Subscription(object):
+
+ __slots__ = ('bus', 'predicate', 'callback', 'topic')
+ def __init__(self, bus, predicate, callback, topic=None):
+ self.bus = bus
+ self.predicate = predicate
+ self.callback = callback
+ self.topic = topic
+
+
+class EventBus(object):
+
+ def __init__(self):
+ self.subscriptions = {} # topic -> list of _Subscription objects
+ # topic None holds regexp based topic subs.
+ self.subs_topic_map = {} # to aid fast lookup when unsubscribing
+
+ def list_subscribers(self, topic=None):
+ if topic is None:
+ return sum(self.subscriptions.itervalues(), [])
+ else:
+ if topic in self.subscriptions:
+ return self.subscriptions[topic]
+ else:
+ return []
+
+ @staticmethod
+ def _get_topic_key(topic):
+ if isinstance(topic, str):
+ return topic
+ elif hasattr(topic, 'match'):
+ return None
+ else:
+ raise AttributeError('topic not a string nor a compiled regex')
+
+ def subscribe(self, topic, callback, predicate=None):
+ """
+ Subscribe to given topic with predicate and register the callback
+ :param topic: String topic (explicit) or regexp based topic filter.
+ :param callback: Callback method with signature def func(topic, msg)
+ :param predicate: Optional method/function signature def predicate(msg)
+ :return: Subscription object which can be used to unsubscribe
+ """
+ subscription = _Subscription(self, predicate, callback, topic)
+ topic_key = self._get_topic_key(topic)
+ self.subscriptions.setdefault(topic_key, []).append(subscription)
+ self.subs_topic_map[subscription] = topic_key
+ return subscription
+
+ def unsubscribe(self, subscription):
+ """
+ Remove given subscription
+ :param subscription: subscription object as was returned by subscribe
+ :return: None
+ """
+ topic_key = self.subs_topic_map[subscription]
+ self.subscriptions[topic_key].remove(subscription)
+
+ def publish(self, topic, msg):
+ """
+ Publish given message to all subscribers registered with topic taking
+ the predicate functions into account.
+ :param topic: String topic
+ :param msg: Arbitrary python data as message
+ :return: None
+ """
+ from copy import copy
+
+ def passes(msg, predicate):
+ try:
+ return predicate(msg)
+ except Exception, e:
+ return False # failed predicate function treated as no match
+
+ # lookup subscribers with explicit topic subscriptions
+ subscribers = self.subscriptions.get(topic, [])
+
+ # add matching regexp topic subscribers
+ subscribers.extend(s for s in self.subscriptions.get(None, [])
+ if s.topic.match(topic))
+
+ # iterate over a shallow-copy of subscribers
+ for candidate in copy(subscribers):
+ predicate = candidate.predicate
+ if predicate is None or passes(msg, predicate):
+ try:
+ candidate.callback(topic, msg)
+ except Exception, e:
+ log.exception('callback-failed', e=repr(e), topic=topic)
+
+
+
+default_bus = EventBus()
+
+
+class EventBusClient(object):
+ """
+ Primary interface to the EventBus. Usage:
+
+ Publish:
+ >>> events = EventBusClient()
+ >>> msg = dict(a=1, b='foo')
+ >>> events.publish('a.topic', msg)
+
+ Subscribe to get all messages on specific topic:
+ >>> def got_event(topic, msg):
+ >>> print topic, ':', msg
+ >>> events = EventBusClient()
+ >>> events.subscribe('a.topic', got_event)
+
+ Subscribe to get messages matching predicate on specific topic:
+ >>> def got_event(topic, msg):
+ >>> print topic, ':', msg
+ >>> events = EventBusClient()
+ >>> events.subscribe('a.topic', got_event, lambda msg: msg.len() < 100)
+
+ Use a DeferredQueue to buffer incoming messages
+ >>> queue = DeferredQueue()
+ >>> events = EventBusClient()
+ >>> events.subscribe('a.topic', lambda _, msg: queue.put(msg))
+
+ """
+ def __init__(self, bus=None):
+ """
+ Obtain a client interface for the pub/sub event bus.
+ :param bus: An optional specific event bus. Inteded for mainly test
+ use. If not provided, the process default bus will be used, which is
+ the preferred use (a process shall not need more than one bus).
+ """
+ self.bus = bus or default_bus
+
+ def publish(self, topic, msg):
+ """
+ Publish given msg to given topic.
+ :param topic: String topic
+ :param msg: Arbitrary python data as message
+ :return: None
+ """
+ self.bus.publish(topic, msg)
+
+ def subscribe(self, topic, callback, predicate=None):
+ """
+ Subscribe to given topic with predicate and register the callback
+ :param topic: String topic (explicit) or regexp based topic filter.
+ :param callback: Callback method with signature def func(topic, msg)
+ :param predicate: Optional method/function with signature
+ def predicate(msg)
+ :return: Subscription object which can be used to unsubscribe
+ """
+ return self.bus.subscribe(topic, callback, predicate)
+
+ def unsubscribe(self, subscription):
+ """
+ Remove given subscription
+ :param subscription: subscription object as was returned by subscribe
+ :return: None
+ """
+ return self.bus.unsubscribe(subscription)
+
+ def list_subscribers(self, topic=None):
+ """
+ Return list of subscribers. If topci is provided, it is filtered for
+ those subscribing to the topic.
+ :param topic: Optional topic
+ :return: List of subscriptions
+ """
+ return self.bus.list_subscribers(topic)
diff --git a/python/common/frameio/__init__.py b/python/common/frameio/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/common/frameio/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/frameio/frameio.py b/python/common/frameio/frameio.py
new file mode 100644
index 0000000..3f5bcf6
--- /dev/null
+++ b/python/common/frameio/frameio.py
@@ -0,0 +1,437 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A module that can send and receive raw ethernet frames on a set of interfaces
+and it can manage a set of vlan interfaces on top of existing
+interfaces. Due to reliance on raw sockets, this module requires
+root access. Also, raw sockets are hard to deal with in Twisted (not
+directly supported) we need to run the receiver select loop on a dedicated
+thread.
+"""
+
+import os
+import socket
+import struct
+import uuid
+from pcapy import BPFProgram
+from threading import Thread, Condition
+
+import fcntl
+
+import select
+import structlog
+import sys
+
+from scapy.data import ETH_P_ALL
+from twisted.internet import reactor
+from zope.interface import implementer
+
+from voltha.registry import IComponent
+
+if sys.platform.startswith('linux'):
+ from common.frameio.third_party.oftest import afpacket, netutils
+elif sys.platform == 'darwin':
+ from scapy.arch import pcapdnet, BIOCIMMEDIATE, dnet
+
+log = structlog.get_logger()
+
+
+def hexify(buffer):
+ """
+ Return a hexadecimal string encoding of input buffer
+ """
+ return ''.join('%02x' % ord(c) for c in buffer)
+
+
+class _SelectWakerDescriptor(object):
+ """
+ A descriptor that can be mixed into a select loop to wake it up.
+ """
+ def __init__(self):
+ self.pipe_read, self.pipe_write = os.pipe()
+ fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
+
+ def __del__(self):
+ os.close(self.pipe_read)
+ os.close(self.pipe_write)
+
+ def fileno(self):
+ return self.pipe_read
+
+ def wait(self):
+ os.read(self.pipe_read, 1)
+
+ def notify(self):
+ """Trigger a select loop"""
+ os.write(self.pipe_write, '\x00')
+
+
+class BpfProgramFilter(object):
+ """
+ Convenience packet filter based on the well-tried Berkeley Packet Filter,
+ used by many well known open source tools such as pcap and tcpdump.
+ """
+ def __init__(self, program_string):
+ """
+ Create a filter using the BPF command syntax. To learn more,
+ consult 'man pcap-filter'.
+ :param program_string: The textual definition of the filter. Examples:
+ 'vlan 1000'
+ 'vlan 1000 and ip src host 10.10.10.10'
+ """
+ self.bpf = BPFProgram(program_string)
+
+ def __call__(self, frame):
+ """
+ Return 1 if frame passes filter.
+ :param frame: Raw frame provided as Python string
+ :return: 1 if frame satisfies filter, 0 otherwise.
+ """
+ return self.bpf.filter(frame)
+
+
+class FrameIOPort(object):
+ """
+ Represents a network interface which we can send/receive raw
+ Ethernet frames.
+ """
+
+ RCV_SIZE_DEFAULT = 4096
+ ETH_P_ALL = 0x03
+ RCV_TIMEOUT = 10000
+ MIN_PKT_SIZE = 60
+
+ def __init__(self, iface_name):
+ self.iface_name = iface_name
+ self.proxies = []
+ self.socket = self.open_socket(self.iface_name)
+ log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
+ self.received = 0
+ self.discarded = 0
+
+ def add_proxy(self, proxy):
+ self.proxies.append(proxy)
+
+ def del_proxy(self, proxy):
+ self.proxies = [p for p in self.proxies if p.name != proxy.name]
+
+ def open_socket(self, iface_name):
+ raise NotImplementedError('to be implemented by derived class')
+
+ def rcv_frame(self):
+ raise NotImplementedError('to be implemented by derived class')
+
+ def __del__(self):
+ if self.socket:
+ self.socket.close()
+ self.socket = None
+ log.debug('socket-closed', iface=self.iface_name)
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def _dispatch(self, proxy, frame):
+ log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
+ try:
+ proxy.callback(proxy, frame)
+ except Exception as e:
+ log.exception('callback-error',
+ explanation='Callback failed while processing frame',
+ e=e)
+
+ def recv(self):
+ """Called on the select thread when a packet arrives"""
+ try:
+ frame = self.rcv_frame()
+ except RuntimeError as e:
+ # we observed this happens sometimes right after the socket was
+ # attached to a newly created veth interface. So we log it, but
+ # allow to continue.
+ log.warn('afpacket-recv-error', code=-1)
+ return
+
+ log.debug('frame-received', iface=self.iface_name, len=len(frame),
+ hex=hexify(frame))
+ self.received +=1
+ dispatched = False
+ for proxy in self.proxies:
+ if proxy.filter is None or proxy.filter(frame):
+ log.debug('frame-dispatched')
+ dispatched = True
+ reactor.callFromThread(self._dispatch, proxy, frame)
+
+ if not dispatched:
+ self.discarded += 1
+ log.debug('frame-discarded')
+
+ def send(self, frame):
+ log.debug('sending', len=len(frame), iface=self.iface_name)
+ sent_bytes = self.send_frame(frame)
+ if sent_bytes != len(frame):
+ log.error('send-error', iface=self.iface_name,
+ wanted_to_send=len(frame), actually_sent=sent_bytes)
+ return sent_bytes
+
+ def send_frame(self, frame):
+ try:
+ return self.socket.send(frame)
+ except socket.error, err:
+ if err[0] == os.errno.EINVAL:
+ if len(frame) < self.MIN_PKT_SIZE:
+ padding = '\x00' * (self.MIN_PKT_SIZE - len(frame))
+ frame = frame + padding
+ return self.socket.send(frame)
+ else:
+ raise
+
+ def up(self):
+ if sys.platform.startswith('darwin'):
+ pass
+ else:
+ os.system('ip link set {} up'.format(self.iface_name))
+ return self
+
+ def down(self):
+ if sys.platform.startswith('darwin'):
+ pass
+ else:
+ os.system('ip link set {} down'.format(self.iface_name))
+ return self
+
+ def statistics(self):
+ return self.received, self.discarded
+
+
+class LinuxFrameIOPort(FrameIOPort):
+
+ def open_socket(self, iface_name):
+ s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
+ afpacket.enable_auxdata(s)
+ s.bind((self.iface_name, self.ETH_P_ALL))
+ netutils.set_promisc(s, iface_name)
+ s.settimeout(self.RCV_TIMEOUT)
+ return s
+
+ def rcv_frame(self):
+ return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+
+
+class DarwinFrameIOPort(FrameIOPort):
+
+ def open_socket(self, iface_name):
+ sin = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
+ try:
+ fcntl.ioctl(sin.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
+ except:
+ pass
+
+ # need a different kind of socket for sending out
+ self.sout = dnet.eth(iface_name)
+
+ return sin
+
+ def send_frame(self, frame):
+ return self.sout.send(frame)
+
+ def rcv_frame(self):
+ pkt = self.socket.next()
+ if pkt is not None:
+ ts, pkt = pkt
+ return pkt
+
+
+if sys.platform == 'darwin':
+ _FrameIOPort = DarwinFrameIOPort
+elif sys.platform.startswith('linux'):
+ _FrameIOPort = LinuxFrameIOPort
+else:
+ raise Exception('Unsupported platform {}'.format(sys.platform))
+ sys.exit(1)
+
+
+class FrameIOPortProxy(object):
+ """Makes FrameIOPort sharable between multiple users"""
+
+ def __init__(self, frame_io_port, callback, filter=None, name=None):
+ self.frame_io_port = frame_io_port
+ self.callback = callback
+ self.filter = filter
+ self.name = uuid.uuid4().hex[:12] if name is None else name
+
+ @property
+ def iface_name(self):
+ return self.frame_io_port.iface_name
+
+ def get_iface_name(self):
+ return self.frame_io_port.iface_name
+
+ def send(self, frame):
+ return self.frame_io_port.send(frame)
+
+ def up(self):
+ self.frame_io_port.up()
+ return self
+
+ def down(self):
+ self.frame_io_port.down()
+ return self
+
+
+@implementer(IComponent)
+class FrameIOManager(Thread):
+ """
+ Packet/Frame IO manager that can be used to send/receive raw frames
+ on a set of network interfaces.
+ """
+ def __init__(self):
+ super(FrameIOManager, self).__init__()
+
+ self.ports = {} # iface_name -> ActiveFrameReceiver
+ self.queue = {} # iface_name -> TODO
+
+ self.cvar = Condition()
+ self.waker = _SelectWakerDescriptor()
+ self.stopped = False
+ self.ports_changed = False
+
+ # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
+
+ def start(self):
+ """
+ Start the IO manager and its select loop thread
+ """
+ log.debug('starting')
+ super(FrameIOManager, self).start()
+ log.info('started')
+ return self
+
+ def stop(self):
+ """
+ Stop the IO manager and its thread with the select loop
+ """
+ log.debug('stopping')
+ self.stopped = True
+ self.waker.notify()
+ self.join()
+ del self.ports
+ log.info('stopped')
+
+ def list_interfaces(self):
+ """
+ Return list of interfaces listened on
+ :return: List of FrameIOPort objects
+ """
+ return self.ports
+
+ def open_port(self, iface_name, callback, filter=None, name=None):
+ """
+ Add a new interface and start receiving on it.
+ :param iface_name: Name of the interface. Must be an existing Unix
+ interface (eth0, en0, etc.)
+ :param callback: Called on each received frame;
+ signature: def callback(port, frame) where port is the FrameIOPort
+ instance at which the frame was received, frame is the actual frame
+ received (as binay string)
+ :param filter: An optional filter (predicate), with signature:
+ def filter(frame). If provided, only frames for which filter evaluates
+ to True will be forwarded to callback.
+ :return: FrmaeIOPortProxy instance.
+ """
+
+ port = self.ports.get(iface_name)
+ if port is None:
+ port = _FrameIOPort(iface_name)
+ self.ports[iface_name] = port
+ self.ports_changed = True
+ self.waker.notify()
+
+ proxy = FrameIOPortProxy(port, callback, filter, name)
+ port.add_proxy(proxy)
+
+ return proxy
+
+ def close_port(self, proxy):
+ """
+ Remove the proxy. If this is the last proxy on an interface, stop and
+ remove the named interface as well
+ :param proxy: FrameIOPortProxy reference
+ :return: None
+ """
+ assert isinstance(proxy, FrameIOPortProxy)
+ iface_name = proxy.get_iface_name()
+ assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
+ port = self.ports[iface_name]
+ port.del_proxy(proxy)
+
+ if not port.proxies:
+ del self.ports[iface_name]
+ # need to exit select loop to reconstruct select fd lists
+ self.ports_changed = True
+ self.waker.notify()
+
+ def send(self, iface_name, frame):
+ """
+ Send frame on given interface
+ :param iface_name: Name of previously registered interface
+ :param frame: frame as string
+ :return: number of bytes sent
+ """
+ return self.ports[iface_name].send(frame)
+
+ # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
+
+ def run(self):
+ """
+ Called on the alien thread, this is the core multi-port receive loop
+ """
+
+ log.debug('select-loop-started')
+
+ # outer loop constructs sockets list for select
+ while not self.stopped:
+ sockets = [self.waker] + self.ports.values()
+ self.ports_changed = False
+ empty = []
+ # inner select loop
+
+ while not self.stopped:
+ try:
+ _in, _out, _err = select.select(sockets, empty, empty, 1)
+ except Exception as e:
+ log.exception('frame-io-select-error', e=e)
+ break
+ with self.cvar:
+ for port in _in:
+ if port is self.waker:
+ self.waker.wait()
+ continue
+ else:
+ port.recv()
+ self.cvar.notify_all()
+ if self.ports_changed:
+ break # break inner loop so we reconstruct sockets list
+
+ log.debug('select-loop-exited')
+
+ def del_interface(self, iface_name):
+ """
+ Delete interface for stopping
+ """
+
+ log.info('Delete interface')
+ del self.ports[iface_name]
+ log.info('Interface(port) is deleted')
diff --git a/python/common/frameio/third_party/__init__.py b/python/common/frameio/third_party/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/common/frameio/third_party/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/frameio/third_party/oftest/LICENSE b/python/common/frameio/third_party/oftest/LICENSE
new file mode 100644
index 0000000..3216042
--- /dev/null
+++ b/python/common/frameio/third_party/oftest/LICENSE
@@ -0,0 +1,36 @@
+OpenFlow Test Framework
+
+Copyright (c) 2010 The Board of Trustees of The Leland Stanford
+Junior University
+
+Except where otherwise noted, this software is distributed under
+the OpenFlow Software License. See
+http://www.openflowswitch.org/wp/legal/ for current details.
+
+We are making the OpenFlow specification and associated documentation
+(Software) available for public use and benefit with the expectation
+that others will use, modify and enhance the Software and contribute
+those enhancements back to the community. However, since we would like
+to make the Software available for broadest use, with as few
+restrictions as possible permission is hereby granted, free of charge,
+to any person obtaining a copy of this Software to deal in the
+Software under the copyrights without restriction, including without
+limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED -Y´AS IS¡, WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+The name and trademarks of copyright holder(s) may NOT be used in
+advertising or publicity pertaining to the Software or any derivatives
+without specific, written prior permission.
diff --git a/python/common/frameio/third_party/oftest/README.md b/python/common/frameio/third_party/oftest/README.md
new file mode 100644
index 0000000..f0cb649
--- /dev/null
+++ b/python/common/frameio/third_party/oftest/README.md
@@ -0,0 +1,6 @@
+Files in this directory are derived from the respective files
+in oftest (http://github.com/floodlight/oftest).
+
+For the licensing terms of these files, see LICENSE in this dir.
+
+
diff --git a/python/common/frameio/third_party/oftest/__init__.py b/python/common/frameio/third_party/oftest/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/common/frameio/third_party/oftest/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/frameio/third_party/oftest/afpacket.py b/python/common/frameio/third_party/oftest/afpacket.py
new file mode 100644
index 0000000..9ae8075
--- /dev/null
+++ b/python/common/frameio/third_party/oftest/afpacket.py
@@ -0,0 +1,124 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+AF_PACKET receive support
+
+When VLAN offload is enabled on the NIC Linux will not deliver the VLAN tag
+in the data returned by recv. Instead, it delivers the VLAN TCI in a control
+message. Python 2.x doesn't have built-in support for recvmsg, so we have to
+use ctypes to call it. The recv function exported by this module reconstructs
+the VLAN tag if it was offloaded.
+"""
+
+import struct
+from ctypes import *
+
+ETH_P_8021Q = 0x8100
+SOL_PACKET = 263
+PACKET_AUXDATA = 8
+TP_STATUS_VLAN_VALID = 1 << 4
+
+class struct_iovec(Structure):
+ _fields_ = [
+ ("iov_base", c_void_p),
+ ("iov_len", c_size_t),
+ ]
+
+class struct_msghdr(Structure):
+ _fields_ = [
+ ("msg_name", c_void_p),
+ ("msg_namelen", c_uint32),
+ ("msg_iov", POINTER(struct_iovec)),
+ ("msg_iovlen", c_size_t),
+ ("msg_control", c_void_p),
+ ("msg_controllen", c_size_t),
+ ("msg_flags", c_int),
+ ]
+
+class struct_cmsghdr(Structure):
+ _fields_ = [
+ ("cmsg_len", c_size_t),
+ ("cmsg_level", c_int),
+ ("cmsg_type", c_int),
+ ]
+
+class struct_tpacket_auxdata(Structure):
+ _fields_ = [
+ ("tp_status", c_uint),
+ ("tp_len", c_uint),
+ ("tp_snaplen", c_uint),
+ ("tp_mac", c_ushort),
+ ("tp_net", c_ushort),
+ ("tp_vlan_tci", c_ushort),
+ ("tp_padding", c_ushort),
+ ]
+
+libc = CDLL("libc.so.6")
+recvmsg = libc.recvmsg
+recvmsg.argtypes = [c_int, POINTER(struct_msghdr), c_int]
+recvmsg.retype = c_int
+
+def enable_auxdata(sk):
+ """
+ Ask the kernel to return the VLAN tag in a control message
+
+ Must be called on the socket before afpacket.recv.
+ """
+ sk.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1)
+
+def recv(sk, bufsize):
+ """
+ Receive a packet from an AF_PACKET socket
+ @sk Socket
+ @bufsize Maximum packet size
+ """
+ buf = create_string_buffer(bufsize)
+
+ ctrl_bufsize = sizeof(struct_cmsghdr) + sizeof(struct_tpacket_auxdata) + sizeof(c_size_t)
+ ctrl_buf = create_string_buffer(ctrl_bufsize)
+
+ iov = struct_iovec()
+ iov.iov_base = cast(buf, c_void_p)
+ iov.iov_len = bufsize
+
+ msghdr = struct_msghdr()
+ msghdr.msg_name = None
+ msghdr.msg_namelen = 0
+ msghdr.msg_iov = pointer(iov)
+ msghdr.msg_iovlen = 1
+ msghdr.msg_control = cast(ctrl_buf, c_void_p)
+ msghdr.msg_controllen = ctrl_bufsize
+ msghdr.msg_flags = 0
+
+ rv = recvmsg(sk.fileno(), byref(msghdr), 0)
+ if rv < 0:
+ raise RuntimeError("recvmsg failed: rv=%d", rv)
+
+ # The kernel only delivers control messages we ask for. We
+ # only enabled PACKET_AUXDATA, so we can assume it's the
+ # only control message.
+ assert msghdr.msg_controllen >= sizeof(struct_cmsghdr)
+
+ cmsghdr = struct_cmsghdr.from_buffer(ctrl_buf) # pylint: disable=E1101
+ assert cmsghdr.cmsg_level == SOL_PACKET
+ assert cmsghdr.cmsg_type == PACKET_AUXDATA
+
+ auxdata = struct_tpacket_auxdata.from_buffer(ctrl_buf, sizeof(struct_cmsghdr)) # pylint: disable=E1101
+
+ if auxdata.tp_vlan_tci != 0 or auxdata.tp_status & TP_STATUS_VLAN_VALID:
+ # Insert VLAN tag
+ tag = struct.pack("!HH", ETH_P_8021Q, auxdata.tp_vlan_tci)
+ return buf.raw[:12] + tag + buf.raw[12:rv]
+ else:
+ return buf.raw[:rv]
diff --git a/python/common/frameio/third_party/oftest/netutils.py b/python/common/frameio/third_party/oftest/netutils.py
new file mode 100644
index 0000000..092d490
--- /dev/null
+++ b/python/common/frameio/third_party/oftest/netutils.py
@@ -0,0 +1,73 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Network utilities for the OpenFlow test framework
+"""
+
+###########################################################################
+## ##
+## Promiscuous mode enable/disable ##
+## ##
+## Based on code from Scapy by Phillippe Biondi ##
+## ##
+## ##
+## This program is free software; you can redistribute it and/or modify it ##
+## under the terms of the GNU General Public License as published by the ##
+## Free Software Foundation; either version 2, or (at your option) any ##
+## later version. ##
+## ##
+## This program is distributed in the hope that it will be useful, but ##
+## WITHOUT ANY WARRANTY; without even the implied warranty of ##
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ##
+## General Public License for more details. ##
+## ##
+#############################################################################
+
+import socket
+from fcntl import ioctl
+import struct
+
+# From net/if_arp.h
+ARPHDR_ETHER = 1
+ARPHDR_LOOPBACK = 772
+
+# From bits/ioctls.h
+SIOCGIFHWADDR = 0x8927 # Get hardware address
+SIOCGIFINDEX = 0x8933 # name -> if_index mapping
+
+# From netpacket/packet.h
+PACKET_ADD_MEMBERSHIP = 1
+PACKET_DROP_MEMBERSHIP = 2
+PACKET_MR_PROMISC = 1
+
+# From bits/socket.h
+SOL_PACKET = 263
+
+def get_if(iff,cmd):
+ s=socket.socket()
+ ifreq = ioctl(s, cmd, struct.pack("16s16x",iff))
+ s.close()
+ return ifreq
+
+def get_if_index(iff):
+ return int(struct.unpack("I",get_if(iff, SIOCGIFINDEX)[16:20])[0])
+
+def set_promisc(s,iff,val=1):
+ mreq = struct.pack("IHH8s", get_if_index(iff), PACKET_MR_PROMISC, 0, "")
+ if val:
+ cmd = PACKET_ADD_MEMBERSHIP
+ else:
+ cmd = PACKET_DROP_MEMBERSHIP
+ s.setsockopt(SOL_PACKET, cmd, mreq)
+
diff --git a/python/common/kvstore/__init__.py b/python/common/kvstore/__init__.py
new file mode 100644
index 0000000..4a82628
--- /dev/null
+++ b/python/common/kvstore/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/kvstore/consul_client.py b/python/common/kvstore/consul_client.py
new file mode 100644
index 0000000..bc14759
--- /dev/null
+++ b/python/common/kvstore/consul_client.py
@@ -0,0 +1,304 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from common.kvstore.kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair, RETRY_BACKOFF
+from common.utils.asleep import asleep
+from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+from consul import ConsulException
+from consul.twisted import Consul
+from structlog import get_logger
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+
+log = get_logger()
+
+class ConsulClient(KVClient):
+
+ def __init__(self, kv_host, kv_port):
+ KVClient.__init__(self, kv_host, kv_port)
+ self.session_id = None
+ self.client = Consul(kv_host, kv_port)
+
+ def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
+ self._retriggering_watch(key, key_change_callback, timeout)
+
+ @inlineCallbacks
+ def _retriggering_watch(self, key, key_change_callback, timeout):
+ self.key_watches[key] = ConsulWatch(self.client, key, key_change_callback, timeout)
+ yield self.key_watches[key].start()
+
+ def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
+ if key in self.key_watches:
+ self.key_watches[key].stop()
+
+ @inlineCallbacks
+ def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
+ log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
+ err = None
+ result = None
+ while True:
+ try:
+ if operation == 'GET':
+ result = yield self._get(key, **kw)
+ elif operation == 'LIST':
+ result, err = yield self._list(key)
+ elif operation == 'PUT':
+ # Put returns a boolean response
+ result = yield self.client.kv.put(key, value)
+ if not result:
+ err = 'put-failed'
+ elif operation == 'DELETE':
+ # Delete returns a boolean response
+ result = yield self.client.kv.delete(key)
+ if not result:
+ err = 'delete-failed'
+ elif operation == 'RESERVE':
+ result, err = yield self._reserve(key, value, **kw)
+ elif operation == 'RENEW':
+ result, err = yield self._renew_reservation(key)
+ elif operation == 'RELEASE':
+ result, err = yield self._release_reservation(key)
+ elif operation == 'RELEASE-ALL':
+ err = yield self._release_all_reservations()
+ self._clear_backoff()
+ break
+ except ConsulException as ex:
+ if 'ConnectionRefusedError' in ex.message:
+ log.exception('comms-exception', ex=ex)
+ yield self._backoff('consul-not-up')
+ else:
+ log.error('consul-specific-exception', ex=ex)
+ err = ex
+ except Exception as ex:
+ log.error('consul-exception', ex=ex)
+ err = ex
+
+ if timeout > 0 and self.retry_time > timeout:
+ err = 'operation-timed-out'
+ if err is not None:
+ self._clear_backoff()
+ break
+
+ returnValue((result,err))
+
+ @inlineCallbacks
+ def _get(self, key, **kw):
+ kvp = None
+ index, rec = yield self.client.kv.get(key, **kw)
+ if rec is not None:
+ kvp = KVPair(rec['Key'], rec['Value'], index)
+ returnValue(kvp)
+
+ @inlineCallbacks
+ def _list(self, key):
+ err = None
+ list = []
+ index, recs = yield self.client.kv.get(key, recurse=True)
+ for rec in recs:
+ list.append(KVPair(rec['Key'], rec['Value'], rec['ModifyIndex']))
+ returnValue((list, err))
+
+ @inlineCallbacks
+ def _reserve(self, key, value, **kw):
+ for name, val in kw.items():
+ if name == 'ttl':
+ ttl = val
+ break
+ reserved = False
+ err = 'reservation-failed'
+ owner = None
+
+ # Create a session
+ self.session_id = yield self.client.session.create(behavior='delete',
+ ttl=ttl) # lock_delay=1)
+ log.debug('create-session', id=self.session_id)
+ # Try to acquire the key
+ result = yield self.client.kv.put(key, value, acquire=self.session_id)
+ log.debug('key-acquire', key=key, value=value, sess=self.session_id, result=result)
+
+ # Check if reservation succeeded
+ index, record = yield self.client.kv.get(key)
+ if record is not None and 'Value' in record:
+ owner = record['Value']
+ log.debug('get-key', session=record['Session'], owner=owner)
+ if record['Session'] == self.session_id and owner == value:
+ reserved = True
+ log.debug('key-reserved', key=key, value=value, ttl=ttl)
+ # Add key to reservation list
+ self.key_reservations[key] = self.session_id
+ else:
+ log.debug('reservation-held-by-another', owner=owner)
+
+ if reserved:
+ err = None
+ returnValue((owner, err))
+
+ @inlineCallbacks
+ def _renew_reservation(self, key):
+ result = None
+ err = None
+ if key not in self.key_reservations:
+ err = 'key-not-reserved'
+ else:
+ session_id = self.key_reservations[key]
+ # A successfully renewed session returns an object with fields:
+ # Node, CreateIndex, Name, ModifyIndex, ID, Behavior, TTL,
+ # LockDelay, and Checks
+ result = yield self.client.session.renew(session_id=session_id)
+ log.debug('session-renew', result=result)
+ if result is None:
+ err = 'session-renewal-failed'
+ returnValue((result, err))
+
+ @inlineCallbacks
+ def _release_reservation(self, key):
+ err = None
+ if key not in self.key_reservations:
+ err = 'key-not-reserved'
+ else:
+ session_id = self.key_reservations[key]
+ # A successfully destroyed session returns a boolean result
+ success = yield self.client.session.destroy(session_id)
+ log.debug('session-destroy', result=success)
+ if not success:
+ err = 'session-destroy-failed'
+ self.session_id = None
+ self.key_reservations.pop(key)
+ returnValue((success, err))
+
+ @inlineCallbacks
+ def _release_all_reservations(self):
+ err = None
+ keys_to_delete = []
+ for key in self.key_reservations:
+ session_id = self.key_reservations[key]
+ # A successfully destroyed session returns a boolean result
+ success = yield self.client.session.destroy(session_id)
+ if not success:
+ err = 'session-destroy-failed'
+ log.debug('session-destroy', id=session_id, result=success)
+ self.session_id = None
+ keys_to_delete.append(key)
+ for key in keys_to_delete:
+ self.key_reservations.pop(key)
+ returnValue(err)
+
+
+class ConsulWatch():
+
+ def __init__(self, consul, key, callback, timeout):
+ self.client = consul
+ self.key = key
+ self.index = None
+ self.callback = callback
+ self.timeout = timeout
+ self.period = 60
+ self.running = True
+ self.retries = 0
+ self.retry_time = 0
+
+ @inlineCallbacks
+ def start(self):
+ self.running = True
+ index, rec = yield self._get_with_retry(self.key, None,
+ timeout=self.timeout)
+ self.index = str(index)
+
+ @inlineCallbacks
+ def _get(key, deferred):
+ try:
+ index, rec = yield self._get_with_retry(key, None,
+ timeout=self.timeout,
+ index=self.index)
+ self.index = str(index)
+ if not deferred.called:
+ log.debug('got-result-cancelling-deferred')
+ deferred.callback((self.index, rec))
+ except Exception as e:
+ log.exception('got-exception', e=e)
+
+ while self.running:
+ try:
+ rcvd = DeferredWithTimeout(timeout=self.period)
+ _get(self.key, rcvd)
+ try:
+ # Update index for next watch iteration
+ index, rec = yield rcvd
+ log.debug('event-received', index=index, rec=rec)
+ # Notify client of key change event
+ if rec is None:
+ # Key has been deleted
+ self._send_event(Event(Event.DELETE, self.key, None))
+ else:
+ self._send_event(Event(Event.PUT, rec['Key'], rec['Value']))
+ except TimeOutError as e:
+ log.debug('no-events-over-watch-period', key=self.key)
+ except Exception as e:
+ log.exception('exception', e=e)
+ except Exception as e:
+ log.exception('exception', e=e)
+
+ log.debug('close-watch', key=self.key)
+
+ def stop(self):
+ self.running = False
+ self.callback = None
+
+ @inlineCallbacks
+ def _get_with_retry(self, key, value, timeout, *args, **kw):
+ log.debug('watch-period', key=key, period=self.period, timeout=timeout, args=args, kw=kw)
+ err = None
+ result = None
+ while True:
+ try:
+ result = yield self.client.kv.get(key, **kw)
+ self._clear_backoff()
+ break
+ except ConsulException as ex:
+ err = ex
+ if 'ConnectionRefusedError' in ex.message:
+ self._send_event(Event(Event.CONNECTION_DOWN, self.key, None))
+ log.exception('comms-exception', ex=ex)
+ yield self._backoff('consul-not-up')
+ else:
+ log.error('consul-specific-exception', ex=ex)
+ except Exception as ex:
+ err = ex
+ log.error('consul-exception', ex=ex)
+
+ if timeout > 0 and self.retry_time > timeout:
+ err = 'operation-timed-out'
+ if err is not None:
+ self._clear_backoff()
+ break
+
+ returnValue(result)
+
+ def _send_event(self, event):
+ if self.callback is not None:
+ self.callback(event)
+
+ def _backoff(self, msg):
+ wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
+ self.retry_time += wait_time
+ self.retries += 1
+ log.error(msg, next_retry_in_secs=wait_time,
+ total_delay_in_secs = self.retry_time,
+ retries=self.retries)
+ return asleep(wait_time)
+
+ def _clear_backoff(self):
+ if self.retries:
+ log.debug('reconnected-to-kv', after_retries=self.retries)
+ self.retries = 0
+ self.retry_time = 0
diff --git a/python/common/kvstore/etcd_client.py b/python/common/kvstore/etcd_client.py
new file mode 100644
index 0000000..a958b71
--- /dev/null
+++ b/python/common/kvstore/etcd_client.py
@@ -0,0 +1,240 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+################################################################################
+#
+# Most of the txaioetcd methods provide a timeout parameter. This parameter
+# is likely intended to limit the amount of time spent by any one method
+# waiting for a response from the etcd server. However, if the server is
+# down, the method immediately throws a ConnectionRefusedError exception;
+# it does not perform any retries. The timeout parameter provided by the
+# methods in EtcdClient cover this contingency.
+#
+################################################################################
+
+from common.kvstore.kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from twisted.internet.error import ConnectionRefusedError
+from txaioetcd import Client, CompVersion, Failed, KeySet, OpGet, OpSet, Transaction
+
+log = get_logger()
+
+class EtcdClient(KVClient):
+
+ def __init__(self, kv_host, kv_port):
+ KVClient.__init__(self, kv_host, kv_port)
+ self.url = u'http://' + kv_host + u':' + str(kv_port)
+ self.client = Client(reactor, self.url)
+
+ @inlineCallbacks
+ def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
+ self.key_watches[key] = key_change_callback
+ result = yield self._op_with_retry('WATCH', key, None, timeout, callback=self.key_changed)
+ returnValue(result)
+
+ def key_changed(self, kv):
+ key = kv.key
+ value = kv.value
+ log.debug('key-changed', key=key, value=value)
+ # Notify client of key change event
+ if value is not None:
+ evt = Event(Event.PUT, key, value)
+ else:
+ evt = Event(Event.DELETE, key, None)
+ if key in self.key_watches:
+ self.key_watches[key](evt)
+
+ def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
+ log.debug('close-watch', key=key)
+ if key in self.key_watches:
+ self.key_watches.pop(key)
+
+ @inlineCallbacks
+ def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
+ log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
+ err = None
+ result = None
+ if type(key) == str:
+ key = bytes(key)
+ if value is not None:
+ value = bytes(value)
+ while True:
+ try:
+ if operation == 'GET':
+ result = yield self._get(key)
+ elif operation == 'LIST':
+ result, err = yield self._list(key)
+ elif operation == 'PUT':
+ # Put returns an object of type Revision
+ result = yield self.client.set(key, value, **kw)
+ elif operation == 'DELETE':
+ # Delete returns an object of type Deleted
+ result = yield self.client.delete(key)
+ elif operation == 'RESERVE':
+ result, err = yield self._reserve(key, value, **kw)
+ elif operation == 'RENEW':
+ result, err = yield self._renew_reservation(key)
+ elif operation == 'RELEASE':
+ result, err = yield self._release_reservation(key)
+ elif operation == 'RELEASE-ALL':
+ err = yield self._release_all_reservations()
+ elif operation == 'WATCH':
+ for name, val in kw.items():
+ if name == 'callback':
+ callback = val
+ break
+ result = self.client.watch([KeySet(key, prefix=True)], callback)
+ self._clear_backoff()
+ break
+ except ConnectionRefusedError as ex:
+ log.error('comms-exception', ex=ex)
+ yield self._backoff('etcd-not-up')
+ except Exception as ex:
+ log.error('etcd-exception', ex=ex)
+ err = ex
+
+ if timeout > 0 and self.retry_time > timeout:
+ err = 'operation-timed-out'
+ if err is not None:
+ self._clear_backoff()
+ break
+
+ returnValue((result, err))
+
+ @inlineCallbacks
+ def _get(self, key):
+ kvp = None
+ resp = yield self.client.get(key)
+ if resp.kvs is not None and len(resp.kvs) == 1:
+ kv = resp.kvs[0]
+ kvp = KVPair(kv.key, kv.value, kv.mod_revision)
+ returnValue(kvp)
+
+ @inlineCallbacks
+ def _list(self, key):
+ err = None
+ list = []
+ resp = yield self.client.get(KeySet(key, prefix=True))
+ if resp.kvs is not None and len(resp.kvs) > 0:
+ for kv in resp.kvs:
+ list.append(KVPair(kv.key, kv.value, kv.mod_revision))
+ returnValue((list, err))
+
+ @inlineCallbacks
+ def _reserve(self, key, value, **kw):
+ for name, val in kw.items():
+ if name == 'ttl':
+ ttl = val
+ break
+ reserved = False
+ err = 'reservation-failed'
+ owner = None
+
+ # Create a lease
+ lease = yield self.client.lease(ttl)
+
+ # Create a transaction
+ txn = Transaction(
+ compare=[ CompVersion(key, '==', 0) ],
+ success=[ OpSet(key, bytes(value), lease=lease) ],
+ failure=[ OpGet(key) ]
+ )
+ newly_acquired = False
+ try:
+ result = yield self.client.submit(txn)
+ except Failed as failed:
+ log.debug('key-already-present', key=key)
+ if len(failed.responses) > 0:
+ response = failed.responses[0]
+ if response.kvs is not None and len(response.kvs) > 0:
+ kv = response.kvs[0]
+ log.debug('key-already-present', value=kv.value)
+ if kv.value == value:
+ reserved = True
+ log.debug('key-already-reserved', key = kv.key, value=kv.value)
+ else:
+ newly_acquired = True
+ log.debug('key-was-absent', key=key, result=result)
+
+ # Check if reservation succeeded
+ resp = yield self.client.get(key)
+ if resp.kvs is not None and len(resp.kvs) == 1:
+ owner = resp.kvs[0].value
+ if owner == value:
+ if newly_acquired:
+ log.debug('key-reserved', key=key, value=value, ttl=ttl,
+ lease_id=lease.lease_id)
+ reserved = True
+ # Add key to reservation list
+ self.key_reservations[key] = lease
+ else:
+ log.debug("reservation-still-held")
+ else:
+ log.debug('reservation-held-by-another', value=owner)
+
+ if reserved:
+ err = None
+ returnValue((owner, err))
+
+ @inlineCallbacks
+ def _renew_reservation(self, key):
+ result = None
+ err = None
+ if key not in self.key_reservations:
+ err = 'key-not-reserved'
+ else:
+ lease = self.key_reservations[key]
+ # A successfully refreshed lease returns an object of type Header
+ result = yield lease.refresh()
+ if result is None:
+ err = 'lease-refresh-failed'
+ returnValue((result, err))
+
+ @inlineCallbacks
+ def _release_reservation(self, key):
+ err = None
+ if key not in self.key_reservations:
+ err = 'key-not-reserved'
+ else:
+ lease = self.key_reservations[key]
+ time_left = yield lease.remaining()
+ # A successfully revoked lease returns an object of type Header
+ log.debug('release-reservation', key=key, lease_id=lease.lease_id,
+ time_left_in_secs=time_left)
+ result = yield lease.revoke()
+ if result is None:
+ err = 'lease-revoke-failed'
+ self.key_reservations.pop(key)
+ returnValue((result, err))
+
+ @inlineCallbacks
+ def _release_all_reservations(self):
+ err = None
+ keys_to_delete = []
+ for key in self.key_reservations:
+ lease = self.key_reservations[key]
+ time_left = yield lease.remaining()
+ # A successfully revoked lease returns an object of type Header
+ log.debug('release-reservation', key=key, lease_id=lease.lease_id,
+ time_left_in_secs=time_left)
+ result = yield lease.revoke()
+ if result is None:
+ err = 'lease-revoke-failed'
+ log.debug('lease-revoke', result=result)
+ keys_to_delete.append(key)
+ for key in keys_to_delete:
+ self.key_reservations.pop(key)
+ returnValue(err)
diff --git a/python/common/kvstore/kv_client.py b/python/common/kvstore/kv_client.py
new file mode 100644
index 0000000..69a6480
--- /dev/null
+++ b/python/common/kvstore/kv_client.py
@@ -0,0 +1,206 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from common.utils.asleep import asleep
+from structlog import get_logger
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+log = get_logger()
+
+class KVPair():
+ def __init__(self, key, value, index):
+ self.key = key
+ self.value = value
+ self.index = index
+
+class Event():
+ PUT = 0
+ DELETE = 1
+ CONNECTION_DOWN = 2
+
+ def __init__(self, event_type, key, value):
+ self.event_type = event_type
+ self.key = key
+ self.value = value
+
+RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+DEFAULT_TIMEOUT = 0.0
+for i in range(len(RETRY_BACKOFF)):
+ DEFAULT_TIMEOUT += RETRY_BACKOFF[i]
+
+class KVClient():
+
+ def __init__(self, kv_host, kv_port):
+ self.host = kv_host
+ self.port = kv_port
+ self.key_reservations = {}
+ self.key_watches = {}
+ self.retries = 0
+ self.retry_time = 0
+
+ @inlineCallbacks
+ def get(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method returns the value of the given key in KV store.
+
+ :param key: The key whose value is requested
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: (KVPair, error) where KVPair is None if an error occurred
+ '''
+ result = yield self._op_with_retry('GET', key, None, timeout)
+ returnValue(result)
+
+ @inlineCallbacks
+ def list(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ The list method returns an array of key-value pairs all of which
+ share the same key prefix.
+
+ :param key: The key prefix
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: ([]KVPair, error) where []KVPair is a list of KVPair objects
+ '''
+ result = yield self._op_with_retry('LIST', key, None, timeout)
+ returnValue(result)
+
+ @inlineCallbacks
+ def put(self, key, value, timeout=DEFAULT_TIMEOUT):
+ '''
+ The put method writes a value to the given key in KV store.
+ Do NOT modify a reserved key in an etcd store; doing so seems
+ to nullify the TTL of the key. In other words, the key lasts
+ forever.
+
+ :param key: The key to be written to
+ :param value: The value of the key
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: error, which is set to None for a successful write
+ '''
+ _, err = yield self._op_with_retry('PUT', key, value, timeout)
+ returnValue(err)
+
+ @inlineCallbacks
+ def delete(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ The delete method removes a key from the KV store.
+
+ :param key: The key to be deleted
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: error, which is set to None for a successful deletion
+ '''
+ _, err = yield self._op_with_retry('DELETE', key, None, timeout)
+ returnValue(err)
+
+ @inlineCallbacks
+ def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method acts essentially like a semaphore. The underlying mechanism
+ differs depending on the KV store: etcd uses a test-and-set transaction;
+ consul uses an acquire lock. If using etcd, do NOT write to the key
+ subsequent to the initial reservation; the TTL functionality may become
+ impaired (i.e. the reservation never expires).
+
+ :param key: The key under reservation
+ :param value: The reservation owner
+ :param ttl: The time-to-live (TTL) for the reservation. The key is unreserved
+ by the KV store when the TTL expires.
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: (key_value, error) If the key is acquired, then the value returned will
+ be the value passed in. If the key is already acquired, then the value assigned
+ to that key will be returned.
+ '''
+ result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
+ returnValue(result)
+
+ @inlineCallbacks
+ def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method renews the reservation for a given key. A reservation expires
+ after the TTL (Time To Live) period specified when reserving the key.
+
+ :param key: The reserved key
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: error, which is set to None for a successful renewal
+ '''
+ result, err = yield self._op_with_retry('RENEW', key, None, timeout)
+ returnValue(err)
+
+ @inlineCallbacks
+ def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ The release_reservation method cancels the reservation for a given key.
+
+ :param key: The reserved key
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: error, which is set to None for a successful cancellation
+ '''
+ result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
+ returnValue(err)
+
+ @inlineCallbacks
+ def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method cancels all key reservations made previously
+ using the reserve API.
+
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: error, which is set to None for a successful cancellation
+ '''
+ result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
+ returnValue(err)
+
+ @inlineCallbacks
+ def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method provides a watch capability for the given key. If the value of the key
+ changes or the key is deleted, then an event indicating the change is passed to
+ the given callback function.
+
+ :param key: The key to be watched
+ :param key_change_callback: The function invoked whenever the key changes
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: There is no return; key change events are passed to the callback function
+ '''
+ raise NotImplementedError('Method not implemented')
+
+ @inlineCallbacks
+ def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method closes the watch on the given key. Once the watch is closed, key
+ change events are no longer passed to the key change callback function.
+
+ :param key: The key under watch
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: There is no return
+ '''
+ raise NotImplementedError('Method not implemented')
+
+ @inlineCallbacks
+ def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
+ raise NotImplementedError('Method not implemented')
+
+ def _backoff(self, msg):
+ wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
+ self.retry_time += wait_time
+ self.retries += 1
+ log.error(msg, next_retry_in_secs=wait_time,
+ total_delay_in_secs = self.retry_time,
+ retries=self.retries)
+ return asleep(wait_time)
+
+ def _clear_backoff(self):
+ if self.retries:
+ log.debug('reset-backoff', after_retries=self.retries)
+ self.retries = 0
+ self.retry_time = 0
\ No newline at end of file
diff --git a/python/common/kvstore/kvstore.py b/python/common/kvstore/kvstore.py
new file mode 100644
index 0000000..662b34d
--- /dev/null
+++ b/python/common/kvstore/kvstore.py
@@ -0,0 +1,31 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from common.kvstore.consul_client import ConsulClient
+from common.kvstore.etcd_client import EtcdClient
+
+def create_kv_client(kv_store, host, port):
+ '''
+ Factory for creating a client interface to a KV store
+
+ :param kv_store: Specify either 'etcd' or 'consul'
+ :param host: Name or IP address of host serving the KV store
+ :param port: Port number (integer) of the KV service
+ :return: Reference to newly created client interface
+ '''
+ if kv_store == 'etcd':
+ return EtcdClient(host, port)
+ elif kv_store == 'consul':
+ return ConsulClient(host, port)
+ return None
diff --git a/python/common/manhole.py b/python/common/manhole.py
new file mode 100644
index 0000000..c00c900
--- /dev/null
+++ b/python/common/manhole.py
@@ -0,0 +1,129 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import rlcompleter
+from pprint import pprint
+
+import structlog
+from twisted.conch import manhole_ssh
+from twisted.conch.manhole import ColoredManhole
+from twisted.conch.ssh import keys
+from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
+from twisted.cred.portal import Portal
+from twisted.internet import reactor
+
+log = structlog.get_logger()
+
+
+MANHOLE_SERVER_RSA_PRIVATE = './manhole_rsa_key'
+MANHOLE_SERVER_RSA_PUBLIC = './manhole_rsa_key.pub'
+
+
+def get_rsa_keys():
+ if not (os.path.exists(MANHOLE_SERVER_RSA_PUBLIC) and \
+ os.path.exists(MANHOLE_SERVER_RSA_PRIVATE)):
+ # generate a RSA keypair
+ log.info('generate-rsa-keypair')
+ from Crypto.PublicKey import RSA
+ rsa_key = RSA.generate(1024)
+ public_key_str = rsa_key.publickey().exportKey(format='OpenSSH')
+ private_key_str = rsa_key.exportKey()
+
+ # save keys for next time
+ file(MANHOLE_SERVER_RSA_PUBLIC, 'w+b').write(public_key_str)
+ file(MANHOLE_SERVER_RSA_PRIVATE, 'w+b').write(private_key_str)
+ log.debug('saved-rsa-keypair', public=MANHOLE_SERVER_RSA_PUBLIC,
+ private=MANHOLE_SERVER_RSA_PRIVATE)
+ else:
+ public_key_str = file(MANHOLE_SERVER_RSA_PUBLIC).read()
+ private_key_str = file(MANHOLE_SERVER_RSA_PRIVATE).read()
+ return public_key_str, private_key_str
+
+
+class ManholeWithCompleter(ColoredManhole):
+
+ def __init__(self, namespace):
+ namespace['manhole'] = self
+ super(ManholeWithCompleter, self).__init__(namespace)
+ self.last_tab = None
+ self.completer = rlcompleter.Completer(self.namespace)
+
+ def handle_TAB(self):
+ if self.last_tab != self.lineBuffer:
+ self.last_tab = self.lineBuffer
+ return
+
+ buffer = ''.join(self.lineBuffer)
+ completions = []
+ maxlen = 3
+ for c in xrange(1000):
+ candidate = self.completer.complete(buffer, c)
+ if not candidate:
+ break
+
+ if len(candidate) > maxlen:
+ maxlen = len(candidate)
+
+ completions.append(candidate)
+
+ if len(completions) == 1:
+ rest = completions[0][len(buffer):]
+ self.terminal.write(rest)
+ self.lineBufferIndex += len(rest)
+ self.lineBuffer.extend(rest)
+
+ elif len(completions):
+ maxlen += 3
+ numcols = self.width / maxlen
+ self.terminal.nextLine()
+ for idx, candidate in enumerate(completions):
+ self.terminal.write('%%-%ss' % maxlen % candidate)
+ if not ((idx + 1) % numcols):
+ self.terminal.nextLine()
+ self.terminal.nextLine()
+ self.drawInputLine()
+
+
+class Manhole(object):
+
+ def __init__(self, port, pws, **kw):
+ kw.update(globals())
+ kw['pp'] = pprint
+
+ realm = manhole_ssh.TerminalRealm()
+ manhole = ManholeWithCompleter(kw)
+
+ def windowChanged(_, win_size):
+ manhole.terminalSize(*reversed(win_size[:2]))
+
+ realm.sessionFactory.windowChanged = windowChanged
+ realm.chainedProtocolFactory.protocolFactory = lambda _: manhole
+ portal = Portal(realm)
+ portal.registerChecker(InMemoryUsernamePasswordDatabaseDontUse(**pws))
+ factory = manhole_ssh.ConchFactory(portal)
+ public_key_str, private_key_str = get_rsa_keys()
+ factory.publicKeys = {
+ 'ssh-rsa': keys.Key.fromString(public_key_str)
+ }
+ factory.privateKeys = {
+ 'ssh-rsa': keys.Key.fromString(private_key_str)
+ }
+ reactor.listenTCP(port, factory, interface='localhost')
+
+
+if __name__ == '__main__':
+ Manhole(12222, dict(admin='admin'))
+ reactor.run()
diff --git a/python/common/pon_resource_manager/__init__.py b/python/common/pon_resource_manager/__init__.py
new file mode 100644
index 0000000..2d104e0
--- /dev/null
+++ b/python/common/pon_resource_manager/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/pon_resource_manager/resource_kv_store.py b/python/common/pon_resource_manager/resource_kv_store.py
new file mode 100644
index 0000000..a1a5c14
--- /dev/null
+++ b/python/common/pon_resource_manager/resource_kv_store.py
@@ -0,0 +1,107 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Resource KV store - interface between Resource Manager and backend store."""
+import structlog
+
+from voltha.core.config.config_backend import ConsulStore
+from voltha.core.config.config_backend import EtcdStore
+
+# KV store uses this prefix to store resource info
+PATH_PREFIX = 'resource_manager/{}'
+
+
+class ResourceKvStore(object):
+ """Implements apis to store/get/remove resource in backend store."""
+
+ def __init__(self, technology, device_id, backend, host, port):
+ """
+ Create ResourceKvStore object.
+
+ Based on backend ('consul' and 'etcd' use the host and port
+ to create the respective object.
+
+ :param technology: PON technology
+ :param device_id: OLT device id
+ :param backend: Type of backend storage (etcd or consul)
+ :param host: host ip info for backend storage
+ :param port: port for the backend storage
+ :raises exception when invalid backend store passed as an argument
+ """
+ # logger
+ self._log = structlog.get_logger()
+
+ path = PATH_PREFIX.format(technology)
+ try:
+ if backend == 'consul':
+ self._kv_store = ConsulStore(host, port, path)
+ elif backend == 'etcd':
+ self._kv_store = EtcdStore(host, port, path)
+ else:
+ self._log.error('Invalid-backend')
+ raise Exception("Invalid-backend-for-kv-store")
+ except Exception as e:
+ self._log.exception("exception-in-init")
+ raise Exception(e)
+
+ def update_to_kv_store(self, path, resource):
+ """
+ Update resource.
+
+ :param path: path to update the resource
+ :param resource: updated resource
+ """
+ try:
+ self._kv_store[path] = str(resource)
+ self._log.debug("Resource-updated-in-kv-store", path=path)
+ return True
+ except BaseException:
+ self._log.exception("Resource-update-in-kv-store-failed",
+ path=path, resource=resource)
+ return False
+
+ def get_from_kv_store(self, path):
+ """
+ Get resource.
+
+ :param path: path to get the resource
+ """
+ resource = None
+ try:
+ resource = self._kv_store[path]
+ self._log.debug("Got-resource-from-kv-store", path=path)
+ except KeyError:
+ self._log.info("Resource-not-found-updating-resource",
+ path=path)
+ except BaseException:
+ self._log.exception("Getting-resource-from-kv-store-failed",
+ path=path)
+ return resource
+
+ def remove_from_kv_store(self, path):
+ """
+ Remove resource.
+
+ :param path: path to remove the resource
+ """
+ try:
+ del self._kv_store[path]
+ self._log.debug("Resource-deleted-in-kv-store", path=path)
+ return True
+ except BaseException:
+ self._log.exception("Resource-delete-in-kv-store-failed",
+ path=path)
+ return False
diff --git a/python/common/pon_resource_manager/resource_manager.py b/python/common/pon_resource_manager/resource_manager.py
new file mode 100644
index 0000000..17b2871
--- /dev/null
+++ b/python/common/pon_resource_manager/resource_manager.py
@@ -0,0 +1,677 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Resource Manager will be unique for each OLT device.
+
+It exposes APIs to create/free alloc_ids/onu_ids/gemport_ids. Resource Manager
+uses a KV store in backend to ensure resiliency of the data.
+"""
+import json
+import structlog
+from bitstring import BitArray
+from ast import literal_eval
+import shlex
+from argparse import ArgumentParser, ArgumentError
+
+from common.pon_resource_manager.resource_kv_store import ResourceKvStore
+
+
+# Used to parse extra arguments to OpenOlt adapter from the NBI
+class OltVendorArgumentParser(ArgumentParser):
+ # Must override the exit command to prevent it from
+ # calling sys.exit(). Return exception instead.
+ def exit(self, status=0, message=None):
+ raise Exception(message)
+
+
+class PONResourceManager(object):
+ """Implements APIs to initialize/allocate/release alloc/gemport/onu IDs."""
+
+ # Constants to identify resource pool
+ ONU_ID = 'ONU_ID'
+ ALLOC_ID = 'ALLOC_ID'
+ GEMPORT_ID = 'GEMPORT_ID'
+
+ # The resource ranges for a given device vendor_type should be placed
+ # at 'resource_manager/<technology>/resource_ranges/<olt_vendor_type>'
+ # path on the KV store.
+ # If Resource Range parameters are to be read from the external KV store,
+ # they are expected to be stored in the following format.
+ # Note: All parameters are MANDATORY for now.
+ '''
+ {
+ "onu_id_start": 1,
+ "onu_id_end": 127,
+ "alloc_id_start": 1024,
+ "alloc_id_end": 2816,
+ "gemport_id_start": 1024,
+ "gemport_id_end": 8960,
+ "pon_ports": 16
+ }
+
+ '''
+ # constants used as keys to reference the resource range parameters from
+ # and external KV store.
+ ONU_START_IDX = "onu_id_start"
+ ONU_END_IDX = "onu_id_end"
+ ALLOC_ID_START_IDX = "alloc_id_start"
+ ALLOC_ID_END_IDX = "alloc_id_end"
+ GEM_PORT_ID_START_IDX = "gemport_id_start"
+ GEM_PORT_ID_END_IDX = "gemport_id_end"
+ NUM_OF_PON_PORT = "pon_ports"
+
+ # PON Resource range configuration on the KV store.
+ # Format: 'resource_manager/<technology>/resource_ranges/<olt_vendor_type>'
+ # The KV store backend is initialized with a path prefix and we need to
+ # provide only the suffix.
+ PON_RESOURCE_RANGE_CONFIG_PATH = 'resource_ranges/{}'
+
+ # resource path suffix
+ ALLOC_ID_POOL_PATH = '{}/alloc_id_pool/{}'
+ GEMPORT_ID_POOL_PATH = '{}/gemport_id_pool/{}'
+ ONU_ID_POOL_PATH = '{}/onu_id_pool/{}'
+
+ # Path on the KV store for storing list of alloc IDs for a given ONU
+ # Format: <device_id>/<(pon_intf_id, onu_id)>/alloc_ids
+ ALLOC_ID_RESOURCE_MAP_PATH = '{}/{}/alloc_ids'
+
+ # Path on the KV store for storing list of gemport IDs for a given ONU
+ # Format: <device_id>/<(pon_intf_id, onu_id)>/gemport_ids
+ GEMPORT_ID_RESOURCE_MAP_PATH = '{}/{}/gemport_ids'
+
+ # Constants for internal usage.
+ PON_INTF_ID = 'pon_intf_id'
+ START_IDX = 'start_idx'
+ END_IDX = 'end_idx'
+ POOL = 'pool'
+
+ def __init__(self, technology, extra_args, device_id,
+ backend, host, port):
+ """
+ Create PONResourceManager object.
+
+ :param technology: PON technology
+ :param: extra_args: This string contains extra arguments passed during
+ pre-provisioning of OLT and specifies the OLT Vendor type
+ :param device_id: OLT device id
+ :param backend: backend store
+ :param host: ip of backend store
+ :param port: port on which backend store listens
+ :raises exception when invalid backend store passed as an argument
+ """
+ # logger
+ self._log = structlog.get_logger()
+
+ try:
+ self.technology = technology
+ self.extra_args = extra_args
+ self.device_id = device_id
+ self.backend = backend
+ self.host = host
+ self.port = port
+ self.olt_vendor = None
+ self._kv_store = ResourceKvStore(technology, device_id, backend,
+ host, port)
+ # Below attribute, pon_resource_ranges, should be initialized
+ # by reading from KV store.
+ self.pon_resource_ranges = dict()
+ except Exception as e:
+ self._log.exception("exception-in-init")
+ raise Exception(e)
+
+ def init_resource_ranges_from_kv_store(self):
+ """
+ Initialize PON resource ranges with config fetched from kv store.
+
+ :return boolean: True if PON resource ranges initialized else false
+ """
+ self.olt_vendor = self._get_olt_vendor()
+ # Try to initialize the PON Resource Ranges from KV store based on the
+ # OLT vendor key, if available
+ if self.olt_vendor is None:
+ self._log.info("olt-vendor-unavailable--not-reading-from-kv-store")
+ return False
+
+ path = self.PON_RESOURCE_RANGE_CONFIG_PATH.format(self.olt_vendor)
+ try:
+ # get resource from kv store
+ result = self._kv_store.get_from_kv_store(path)
+
+ if result is None:
+ self._log.debug("resource-range-config-unavailable-on-kvstore")
+ return False
+
+ resource_range_config = result
+
+ if resource_range_config is not None:
+ self.pon_resource_ranges = json.loads(resource_range_config)
+ self._log.debug("Init-resource-ranges-from-kvstore-success",
+ pon_resource_ranges=self.pon_resource_ranges,
+ path=path)
+ return True
+
+ except Exception as e:
+ self._log.exception("error-initializing-resource-range-from-kv-store",
+ e=e)
+ return False
+
+ def init_default_pon_resource_ranges(self, onu_start_idx=1,
+ onu_end_idx=127,
+ alloc_id_start_idx=1024,
+ alloc_id_end_idx=2816,
+ gem_port_id_start_idx=1024,
+ gem_port_id_end_idx=8960,
+ num_of_pon_ports=16):
+ """
+ Initialize default PON resource ranges
+
+ :param onu_start_idx: onu id start index
+ :param onu_end_idx: onu id end index
+ :param alloc_id_start_idx: alloc id start index
+ :param alloc_id_end_idx: alloc id end index
+ :param gem_port_id_start_idx: gemport id start index
+ :param gem_port_id_end_idx: gemport id end index
+ :param num_of_pon_ports: number of PON ports
+ """
+ self._log.info("initialize-default-resource-range-values")
+ self.pon_resource_ranges[
+ PONResourceManager.ONU_START_IDX] = onu_start_idx
+ self.pon_resource_ranges[PONResourceManager.ONU_END_IDX] = onu_end_idx
+ self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_START_IDX] = alloc_id_start_idx
+ self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_END_IDX] = alloc_id_end_idx
+ self.pon_resource_ranges[
+ PONResourceManager.GEM_PORT_ID_START_IDX] = gem_port_id_start_idx
+ self.pon_resource_ranges[
+ PONResourceManager.GEM_PORT_ID_END_IDX] = gem_port_id_end_idx
+ self.pon_resource_ranges[
+ PONResourceManager.NUM_OF_PON_PORT] = num_of_pon_ports
+
+ def init_device_resource_pool(self):
+ """
+ Initialize resource pool for all PON ports.
+ """
+ i = 0
+ while i < self.pon_resource_ranges[PONResourceManager.NUM_OF_PON_PORT]:
+ self.init_resource_id_pool(
+ pon_intf_id=i,
+ resource_type=PONResourceManager.ONU_ID,
+ start_idx=self.pon_resource_ranges[
+ PONResourceManager.ONU_START_IDX],
+ end_idx=self.pon_resource_ranges[
+ PONResourceManager.ONU_END_IDX])
+
+ i += 1
+
+ # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+ # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+ # for both the resource types. This may need to change later.
+ self.init_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.ALLOC_ID,
+ start_idx=self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_START_IDX],
+ end_idx=self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_END_IDX])
+
+ self.init_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.GEMPORT_ID,
+ start_idx=self.pon_resource_ranges[
+ PONResourceManager.GEM_PORT_ID_START_IDX],
+ end_idx=self.pon_resource_ranges[
+ PONResourceManager.GEM_PORT_ID_END_IDX])
+
+ def clear_device_resource_pool(self):
+ """
+ Clear resource pool of all PON ports.
+ """
+ i = 0
+ while i < self.pon_resource_ranges[PONResourceManager.NUM_OF_PON_PORT]:
+ self.clear_resource_id_pool(
+ pon_intf_id=i,
+ resource_type=PONResourceManager.ONU_ID,
+ )
+ i += 1
+
+ self.clear_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.ALLOC_ID,
+ )
+
+ self.clear_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.GEMPORT_ID,
+ )
+
+ def init_resource_id_pool(self, pon_intf_id, resource_type, start_idx,
+ end_idx):
+ """
+ Initialize Resource ID pool for a given Resource Type on a given PON Port
+
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param start_idx: start index for onu id pool
+ :param end_idx: end index for onu id pool
+ :return boolean: True if resource id pool initialized else false
+ """
+ status = False
+ path = self._get_path(pon_intf_id, resource_type)
+ if path is None:
+ return status
+
+ try:
+ # In case of adapter reboot and reconciliation resource in kv store
+ # checked for its presence if not kv store update happens
+ resource = self._get_resource(path)
+
+ if resource is not None:
+ self._log.info("Resource-already-present-in-store", path=path)
+ status = True
+ else:
+ resource = self._format_resource(pon_intf_id, start_idx,
+ end_idx)
+ self._log.info("Resource-initialized", path=path)
+
+ # Add resource as json in kv store.
+ result = self._kv_store.update_to_kv_store(path, resource)
+ if result is True:
+ status = True
+
+ except Exception as e:
+ self._log.exception("error-initializing-resource-pool", e=e)
+
+ return status
+
+ def get_resource_id(self, pon_intf_id, resource_type, num_of_id=1):
+ """
+ Create alloc/gemport/onu id for given OLT PON interface.
+
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param num_of_id: required number of ids
+ :return list/int/None: list, int or None if resource type is
+ alloc_id/gemport_id, onu_id or invalid type
+ respectively
+ """
+ result = None
+
+ # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+ # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+ # for both the resource types. This may need to change later.
+ # Override the incoming pon_intf_id to PON0
+ if resource_type == PONResourceManager.GEMPORT_ID or \
+ resource_type == PONResourceManager.ALLOC_ID:
+ pon_intf_id = 0
+
+ path = self._get_path(pon_intf_id, resource_type)
+ if path is None:
+ return result
+
+ try:
+ resource = self._get_resource(path)
+ if resource is not None and resource_type == \
+ PONResourceManager.ONU_ID:
+ result = self._generate_next_id(resource)
+ elif resource is not None and (
+ resource_type == PONResourceManager.GEMPORT_ID or
+ resource_type == PONResourceManager.ALLOC_ID):
+ result = list()
+ while num_of_id > 0:
+ result.append(self._generate_next_id(resource))
+ num_of_id -= 1
+ else:
+ raise Exception("get-resource-failed")
+
+ self._log.debug("Get-" + resource_type + "-success", result=result,
+ path=path)
+ # Update resource in kv store
+ self._update_resource(path, resource)
+
+ except Exception as e:
+ self._log.exception("Get-" + resource_type + "-id-failed",
+ path=path, e=e)
+ return result
+
+ def free_resource_id(self, pon_intf_id, resource_type, release_content):
+ """
+ Release alloc/gemport/onu id for given OLT PON interface.
+
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param release_content: required number of ids
+ :return boolean: True if all IDs in given release_content released
+ else False
+ """
+ status = False
+
+ # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+ # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+ # for both the resource types. This may need to change later.
+ # Override the incoming pon_intf_id to PON0
+ if resource_type == PONResourceManager.GEMPORT_ID or \
+ resource_type == PONResourceManager.ALLOC_ID:
+ pon_intf_id = 0
+
+ path = self._get_path(pon_intf_id, resource_type)
+ if path is None:
+ return status
+
+ try:
+ resource = self._get_resource(path)
+ if resource is not None and resource_type == \
+ PONResourceManager.ONU_ID:
+ self._release_id(resource, release_content)
+ elif resource is not None and (
+ resource_type == PONResourceManager.ALLOC_ID or
+ resource_type == PONResourceManager.GEMPORT_ID):
+ for content in release_content:
+ self._release_id(resource, content)
+ else:
+ raise Exception("get-resource-failed")
+
+ self._log.debug("Free-" + resource_type + "-success", path=path)
+
+ # Update resource in kv store
+ status = self._update_resource(path, resource)
+
+ except Exception as e:
+ self._log.exception("Free-" + resource_type + "-failed",
+ path=path, e=e)
+ return status
+
+ def clear_resource_id_pool(self, pon_intf_id, resource_type):
+ """
+ Clear Resource Pool for a given Resource Type on a given PON Port.
+
+ :return boolean: True if removed else False
+ """
+ path = self._get_path(pon_intf_id, resource_type)
+ if path is None:
+ return False
+
+ try:
+ result = self._kv_store.remove_from_kv_store(path)
+ if result is True:
+ self._log.debug("Resource-pool-cleared",
+ device_id=self.device_id,
+ path=path)
+ return True
+ except Exception as e:
+ self._log.exception("error-clearing-resource-pool", e=e)
+
+ self._log.error("Clear-resource-pool-failed", device_id=self.device_id,
+ path=path)
+ return False
+
+ def init_resource_map(self, pon_intf_onu_id):
+ """
+ Initialize resource map
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ # initialize pon_intf_onu_id tuple to alloc_ids map
+ alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ alloc_ids = list()
+ self._kv_store.update_to_kv_store(
+ alloc_id_path, json.dumps(alloc_ids)
+ )
+
+ # initialize pon_intf_onu_id tuple to gemport_ids map
+ gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ gemport_ids = list()
+ self._kv_store.update_to_kv_store(
+ gemport_id_path, json.dumps(gemport_ids)
+ )
+
+ def remove_resource_map(self, pon_intf_onu_id):
+ """
+ Remove resource map
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ # remove pon_intf_onu_id tuple to alloc_ids map
+ alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.remove_from_kv_store(alloc_id_path)
+
+ # remove pon_intf_onu_id tuple to gemport_ids map
+ gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.remove_from_kv_store(gemport_id_path)
+
+ def get_current_alloc_ids_for_onu(self, pon_intf_onu_id):
+ """
+ Get currently configured alloc ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id))
+ value = self._kv_store.get_from_kv_store(path)
+ if value is not None:
+ alloc_id_list = json.loads(value)
+ if len(alloc_id_list) > 0:
+ return alloc_id_list
+
+ return None
+
+ def get_current_gemport_ids_for_onu(self, pon_intf_onu_id):
+ """
+ Get currently configured gemport ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+
+ path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id))
+ value = self._kv_store.get_from_kv_store(path)
+ if value is not None:
+ gemport_id_list = json.loads(value)
+ if len(gemport_id_list) > 0:
+ return gemport_id_list
+
+ return None
+
+ def update_alloc_ids_for_onu(self, pon_intf_onu_id, alloc_ids):
+ """
+ Update currently configured alloc ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.update_to_kv_store(
+ path, json.dumps(alloc_ids)
+ )
+
+ def update_gemport_ids_for_onu(self, pon_intf_onu_id, gemport_ids):
+ """
+ Update currently configured gemport ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.update_to_kv_store(
+ path, json.dumps(gemport_ids)
+ )
+
+ def _get_olt_vendor(self):
+ """
+ Get olt vendor variant
+
+ :return: type of olt vendor
+ """
+ olt_vendor = None
+ if self.extra_args and len(self.extra_args) > 0:
+ parser = OltVendorArgumentParser(add_help=False)
+ parser.add_argument('--olt_vendor', '-o', action='store',
+ choices=['default', 'asfvolt16', 'cigolt24'],
+ default='default')
+ try:
+ args = parser.parse_args(shlex.split(self.extra_args))
+ self._log.debug('parsing-extra-arguments', args=args)
+ olt_vendor = args.olt_vendor
+ except ArgumentError as e:
+ self._log.exception('invalid-arguments: {}', e=e)
+ except Exception as e:
+ self._log.exception('option-parsing-error: {}', e=e)
+
+ return olt_vendor
+
+ def _generate_next_id(self, resource):
+ """
+ Generate unique id having OFFSET as start index.
+
+ :param resource: resource used to generate ID
+ :return int: generated id
+ """
+ pos = resource[PONResourceManager.POOL].find('0b0')
+ resource[PONResourceManager.POOL].set(1, pos)
+ return pos[0] + resource[PONResourceManager.START_IDX]
+
+ def _release_id(self, resource, unique_id):
+ """
+ Release unique id having OFFSET as start index.
+
+ :param resource: resource used to release ID
+ :param unique_id: id need to be released
+ """
+ pos = ((int(unique_id)) - resource[PONResourceManager.START_IDX])
+ resource[PONResourceManager.POOL].set(0, pos)
+
+ def _get_path(self, pon_intf_id, resource_type):
+ """
+ Get path for given resource type.
+
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :return: path for given resource type
+ """
+ path = None
+ if resource_type == PONResourceManager.ONU_ID:
+ path = self._get_onu_id_resource_path(pon_intf_id)
+ elif resource_type == PONResourceManager.ALLOC_ID:
+ path = self._get_alloc_id_resource_path(pon_intf_id)
+ elif resource_type == PONResourceManager.GEMPORT_ID:
+ path = self._get_gemport_id_resource_path(pon_intf_id)
+ else:
+ self._log.error("invalid-resource-pool-identifier")
+ return path
+
+ def _get_alloc_id_resource_path(self, pon_intf_id):
+ """
+ Get alloc id resource path.
+
+ :param pon_intf_id: OLT PON interface id
+ :return: alloc id resource path
+ """
+ return PONResourceManager.ALLOC_ID_POOL_PATH.format(
+ self.device_id, pon_intf_id)
+
+ def _get_gemport_id_resource_path(self, pon_intf_id):
+ """
+ Get gemport id resource path.
+
+ :param pon_intf_id: OLT PON interface id
+ :return: gemport id resource path
+ """
+ return PONResourceManager.GEMPORT_ID_POOL_PATH.format(
+ self.device_id, pon_intf_id)
+
+ def _get_onu_id_resource_path(self, pon_intf_id):
+ """
+ Get onu id resource path.
+
+ :param pon_intf_id: OLT PON interface id
+ :return: onu id resource path
+ """
+ return PONResourceManager.ONU_ID_POOL_PATH.format(
+ self.device_id, pon_intf_id)
+
+ def _update_resource(self, path, resource):
+ """
+ Update resource in resource kv store.
+
+ :param path: path to update resource
+ :param resource: resource need to be updated
+ :return boolean: True if resource updated in kv store else False
+ """
+ resource[PONResourceManager.POOL] = \
+ resource[PONResourceManager.POOL].bin
+ result = self._kv_store.update_to_kv_store(path, json.dumps(resource))
+ if result is True:
+ return True
+ return False
+
+ def _get_resource(self, path):
+ """
+ Get resource from kv store.
+
+ :param path: path to get resource
+ :return: resource if resource present in kv store else None
+ """
+ # get resource from kv store
+ result = self._kv_store.get_from_kv_store(path)
+ if result is None:
+ return result
+ self._log.info("dumping resource", result=result)
+ resource = result
+
+ if resource is not None:
+ # decode resource fetched from backend store to dictionary
+ resource = json.loads(resource)
+
+ # resource pool in backend store stored as binary string whereas to
+ # access the pool to generate/release IDs it need to be converted
+ # as BitArray
+ resource[PONResourceManager.POOL] = \
+ BitArray('0b' + resource[PONResourceManager.POOL])
+
+ return resource
+
+ def _format_resource(self, pon_intf_id, start_idx, end_idx):
+ """
+ Format resource as json.
+
+ :param pon_intf_id: OLT PON interface id
+ :param start_idx: start index for id pool
+ :param end_idx: end index for id pool
+ :return dictionary: resource formatted as dictionary
+ """
+ # Format resource as json to be stored in backend store
+ resource = dict()
+ resource[PONResourceManager.PON_INTF_ID] = pon_intf_id
+ resource[PONResourceManager.START_IDX] = start_idx
+ resource[PONResourceManager.END_IDX] = end_idx
+
+ # resource pool stored in backend store as binary string
+ resource[PONResourceManager.POOL] = BitArray(end_idx).bin
+
+ return json.dumps(resource)
diff --git a/python/common/structlog_setup.py b/python/common/structlog_setup.py
new file mode 100644
index 0000000..cbbda89
--- /dev/null
+++ b/python/common/structlog_setup.py
@@ -0,0 +1,132 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Setting up proper logging for Voltha"""
+
+import logging
+import logging.config
+from collections import OrderedDict
+
+import structlog
+from structlog.stdlib import BoundLogger, INFO
+
+try:
+ from thread import get_ident as _get_ident
+except ImportError:
+ from dummy_thread import get_ident as _get_ident
+
+
+class StructuredLogRenderer(object):
+ def __call__(self, logger, name, event_dict):
+ # in order to keep structured log data in event_dict to be forwarded as
+ # is, we need to pass it into the logger framework as the first
+ # positional argument.
+ args = (event_dict,)
+ kwargs = {}
+ return args, kwargs
+
+
+class PlainRenderedOrderedDict(OrderedDict):
+ """Our special version of OrderedDict that renders into string as a dict,
+ to make the log stream output cleaner.
+ """
+ def __repr__(self, _repr_running={}):
+ 'od.__repr__() <==> repr(od)'
+ call_key = id(self), _get_ident()
+ if call_key in _repr_running:
+ return '...'
+ _repr_running[call_key] = 1
+ try:
+ if not self:
+ return '{}'
+ return '{%s}' % ", ".join("%s: %s" % (k, v)
+ for k, v in self.items())
+ finally:
+ del _repr_running[call_key]
+
+
+def setup_logging(log_config, instance_id, verbosity_adjust=0):
+ """
+ Set up logging such that:
+ - The primary logging entry method is structlog
+ (see http://structlog.readthedocs.io/en/stable/index.html)
+ - By default, the logging backend is Python standard lib logger
+ """
+
+ def add_exc_info_flag_for_exception(_, name, event_dict):
+ if name == 'exception':
+ event_dict['exc_info'] = True
+ return event_dict
+
+ def add_instance_id(_, __, event_dict):
+ event_dict['instance_id'] = instance_id
+ return event_dict
+
+ # Configure standard logging
+ logging.config.dictConfig(log_config)
+ logging.root.level -= 10 * verbosity_adjust
+
+ processors = [
+ add_exc_info_flag_for_exception,
+ structlog.processors.StackInfoRenderer(),
+ structlog.processors.format_exc_info,
+ add_instance_id,
+ StructuredLogRenderer(),
+ ]
+ structlog.configure(logger_factory=structlog.stdlib.LoggerFactory(),
+ context_class=PlainRenderedOrderedDict,
+ wrapper_class=BoundLogger,
+ processors=processors)
+
+ # Mark first line of log
+ log = structlog.get_logger()
+ log.info("first-line")
+ return log
+
+
+def update_logging(instance_id, vcore_id):
+ """
+ Add the vcore id to the structured logger
+ :param vcore_id: The assigned vcore id
+ :return: structure logger
+ """
+ def add_exc_info_flag_for_exception(_, name, event_dict):
+ if name == 'exception':
+ event_dict['exc_info'] = True
+ return event_dict
+
+ def add_instance_id(_, __, event_dict):
+ event_dict['instance_id'] = instance_id
+ return event_dict
+
+ def add_vcore_id(_, __, event_dict):
+ event_dict['vcore_id'] = vcore_id
+ return event_dict
+
+ processors = [
+ add_exc_info_flag_for_exception,
+ structlog.processors.StackInfoRenderer(),
+ structlog.processors.format_exc_info,
+ add_instance_id,
+ add_vcore_id,
+ StructuredLogRenderer(),
+ ]
+ structlog.configure(processors=processors)
+
+ # Mark first line of log
+ log = structlog.get_logger()
+ log.info("updated-logger")
+ return log
diff --git a/python/common/utils/__init__.py b/python/common/utils/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/common/utils/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/utils/asleep.py b/python/common/utils/asleep.py
new file mode 100644
index 0000000..10d1ce3
--- /dev/null
+++ b/python/common/utils/asleep.py
@@ -0,0 +1,31 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Async sleep (asleep) method and other twisted goodies """
+
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+
+
+def asleep(dt):
+ """
+ Async (event driven) wait for given time period (in seconds)
+ :param dt: Delay in seconds
+ :return: Deferred to be fired with value None when time expires.
+ """
+ d = Deferred()
+ reactor.callLater(dt, lambda: d.callback(None))
+ return d
diff --git a/python/common/utils/consulhelpers.py b/python/common/utils/consulhelpers.py
new file mode 100644
index 0000000..df4dd58
--- /dev/null
+++ b/python/common/utils/consulhelpers.py
@@ -0,0 +1,178 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Some consul related convenience functions
+"""
+
+from structlog import get_logger
+from consul import Consul
+from random import randint
+from common.utils.nethelpers import get_my_primary_local_ipv4
+
+log = get_logger()
+
+
+def connect_to_consult(consul_endpoint):
+ log.debug('getting-service-endpoint', consul=consul_endpoint)
+
+ host = consul_endpoint.split(':')[0].strip()
+ port = int(consul_endpoint.split(':')[1].strip())
+
+ return Consul(host=host, port=port)
+
+
+def verify_all_services_healthy(consul_endpoint, service_name=None,
+ number_of_expected_services=None):
+ """
+ Verify in consul if any service is healthy
+ :param consul_endpoint: a <host>:<port> string
+ :param service_name: name of service to check, optional
+ :param number_of_expected_services number of services to check for, optional
+ :return: true if healthy, false otherwise
+ """
+
+ def check_health(service):
+ _, serv_health = consul.health.service(service, passing=True)
+ return not serv_health == []
+
+ consul = connect_to_consult(consul_endpoint)
+
+ if service_name is not None:
+ return check_health(service_name)
+
+ services = get_all_services(consul_endpoint)
+
+ items = services.keys()
+
+ if number_of_expected_services is not None and \
+ len(items) != number_of_expected_services:
+ return False
+
+ for item in items:
+ if not check_health(item):
+ return False
+
+ return True
+
+
+def get_all_services(consul_endpoint):
+ log.debug('getting-service-verify-health')
+
+ consul = connect_to_consult(consul_endpoint)
+ _, services = consul.catalog.services()
+
+ return services
+
+
+def get_all_instances_of_service(consul_endpoint, service_name):
+ log.debug('getting-all-instances-of-service', service=service_name)
+
+ consul = connect_to_consult(consul_endpoint)
+ _, services = consul.catalog.service(service_name)
+
+ for service in services:
+ log.debug('service',
+ name=service['ServiceName'],
+ serviceid=service['ServiceID'],
+ serviceport=service['ServicePort'],
+ createindex=service['CreateIndex'])
+
+ return services
+
+
+def get_endpoint_from_consul(consul_endpoint, service_name):
+ """
+ Get endpoint of service_name from consul.
+ :param consul_endpoint: a <host>:<port> string
+ :param service_name: name of service for which endpoint
+ needs to be found.
+ :return: service endpoint if available, else exit.
+ """
+ log.debug('getting-service-info', service=service_name)
+
+ consul = connect_to_consult(consul_endpoint)
+ _, services = consul.catalog.service(service_name)
+
+ if len(services) == 0:
+ raise Exception(
+ 'Cannot find service {} in consul'.format(service_name))
+ os.exit(1)
+
+ """ Get host IPV4 address
+ """
+ local_ipv4 = get_my_primary_local_ipv4()
+ """ If host IP address from where the request came in matches
+ the IP address of the requested service's host IP address,
+ pick the endpoint
+ """
+ for i in range(len(services)):
+ service = services[i]
+ if service['ServiceAddress'] == local_ipv4:
+ log.debug("picking address locally")
+ endpoint = '{}:{}'.format(service['ServiceAddress'],
+ service['ServicePort'])
+ return endpoint
+
+ """ If service is not available locally, picak a random
+ endpoint for the service from the list
+ """
+ service = services[randint(0, len(services) - 1)]
+ endpoint = '{}:{}'.format(service['ServiceAddress'],
+ service['ServicePort'])
+
+ return endpoint
+
+
+def get_healthy_instances(consul_endpoint, service_name=None,
+ number_of_expected_services=None):
+ """
+ Verify in consul if any service is healthy
+ :param consul_endpoint: a <host>:<port> string
+ :param service_name: name of service to check, optional
+ :param number_of_expected_services number of services to check for, optional
+ :return: true if healthy, false otherwise
+ """
+
+ def check_health(service):
+ _, serv_health = consul.health.service(service, passing=True)
+ return not serv_health == []
+
+ consul = connect_to_consult(consul_endpoint)
+
+ if service_name is not None:
+ return check_health(service_name)
+
+ services = get_all_services(consul_endpoint)
+
+ items = services.keys()
+
+ if number_of_expected_services is not None and \
+ len(items) != number_of_expected_services:
+ return False
+
+ for item in items:
+ if not check_health(item):
+ return False
+
+ return True
+
+
+if __name__ == '__main__':
+ # print get_endpoint_from_consul('10.100.198.220:8500', 'kafka')
+ # print get_healthy_instances('10.100.198.220:8500', 'voltha-health')
+ # print get_healthy_instances('10.100.198.220:8500')
+ get_all_instances_of_service('10.100.198.220:8500', 'voltha-grpc')
diff --git a/python/common/utils/deferred_utils.py b/python/common/utils/deferred_utils.py
new file mode 100644
index 0000000..3c55c1a
--- /dev/null
+++ b/python/common/utils/deferred_utils.py
@@ -0,0 +1,56 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.internet.error import AlreadyCalled
+
+
+class TimeOutError(Exception): pass
+
+
+class DeferredWithTimeout(Deferred):
+ """
+ Deferred with a timeout. If neither the callback nor the errback method
+ is not called within the given time, the deferred's errback will be called
+ with a TimeOutError() exception.
+
+ All other uses are the same as of Deferred().
+ """
+ def __init__(self, timeout=1.0):
+ Deferred.__init__(self)
+ self._timeout = timeout
+ self.timer = reactor.callLater(timeout, self.timed_out)
+
+ def timed_out(self):
+ self.errback(
+ TimeOutError('timed out after {} seconds'.format(self._timeout)))
+
+ def callback(self, result):
+ self._cancel_timer()
+ return Deferred.callback(self, result)
+
+ def errback(self, fail):
+ self._cancel_timer()
+ return Deferred.errback(self, fail)
+
+ def cancel(self):
+ self._cancel_timer()
+ return Deferred.cancel(self)
+
+ def _cancel_timer(self):
+ try:
+ self.timer.cancel()
+ except AlreadyCalled:
+ pass
+
diff --git a/python/common/utils/dockerhelpers.py b/python/common/utils/dockerhelpers.py
new file mode 100644
index 0000000..4620aef
--- /dev/null
+++ b/python/common/utils/dockerhelpers.py
@@ -0,0 +1,75 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Some docker related convenience functions
+"""
+from datetime import datetime
+from concurrent.futures import ThreadPoolExecutor
+
+import os
+import socket
+from structlog import get_logger
+
+from docker import Client, errors
+
+
+docker_socket = os.environ.get('DOCKER_SOCK', 'unix://tmp/docker.sock')
+log = get_logger()
+
+def get_my_containers_name():
+ """
+ Return the docker containers name in which this process is running.
+ To look up the container name, we use the container ID extracted from the
+ $HOSTNAME environment variable (which is set by docker conventions).
+ :return: String with the docker container name (or None if any issue is
+ encountered)
+ """
+ my_container_id = os.environ.get('HOSTNAME', None)
+
+ try:
+ docker_cli = Client(base_url=docker_socket)
+ info = docker_cli.inspect_container(my_container_id)
+
+ except Exception, e:
+ log.exception('failed', my_container_id=my_container_id, e=e)
+ raise
+
+ name = info['Name'].lstrip('/')
+
+ return name
+
+def get_all_running_containers():
+ try:
+ docker_cli = Client(base_url=docker_socket)
+ containers = docker_cli.containers()
+
+ except Exception, e:
+ log.exception('failed', e=e)
+ raise
+
+ return containers
+
+def inspect_container(id):
+ try:
+ docker_cli = Client(base_url=docker_socket)
+ info = docker_cli.inspect_container(id)
+ except Exception, e:
+ log.exception('failed-inspect-container', id=id, e=e)
+ raise
+
+ return info
+
diff --git a/python/common/utils/grpc_utils.py b/python/common/utils/grpc_utils.py
new file mode 100644
index 0000000..8df630e
--- /dev/null
+++ b/python/common/utils/grpc_utils.py
@@ -0,0 +1,109 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Utilities to handle gRPC server and client side code in a Twisted environment
+"""
+import structlog
+from concurrent.futures import Future
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.python.threadable import isInIOThread
+
+
+log = structlog.get_logger()
+
+
+def twisted_async(func):
+ """
+ This decorator can be used to implement a gRPC method on the twisted
+ thread, allowing asynchronous programming in Twisted while serving
+ a gRPC call.
+
+ gRPC methods normally are called on the futures.ThreadPool threads,
+ so these methods cannot directly use Twisted protocol constructs.
+ If the implementation of the methods needs to touch Twisted, it is
+ safer (or mandatory) to wrap the method with this decorator, which will
+ call the inner method from the external thread and ensure that the
+ result is passed back to the foreign thread.
+
+ Example usage:
+
+ When implementing a gRPC server, typical pattern is:
+
+ class SpamService(SpamServicer):
+
+ def GetBadSpam(self, request, context):
+ '''this is called from a ThreadPoolExecutor thread'''
+ # generally unsafe to make Twisted calls
+
+ @twisted_async
+ def GetSpamSafely(self, request, context):
+ '''this method now is executed on the Twisted main thread
+ # safe to call any Twisted protocol functions
+
+ @twisted_async
+ @inlineCallbacks
+ def GetAsyncSpam(self, request, context):
+ '''this generator can use inlineCallbacks Twisted style'''
+ result = yield some_async_twisted_call(request)
+ returnValue(result)
+
+ """
+ def in_thread_wrapper(*args, **kw):
+
+ if isInIOThread():
+
+ return func(*args, **kw)
+
+ f = Future()
+
+ def twisted_wrapper():
+ try:
+ d = func(*args, **kw)
+ if isinstance(d, Deferred):
+
+ def _done(result):
+ f.set_result(result)
+ f.done()
+
+ def _error(e):
+ f.set_exception(e)
+ f.done()
+
+ d.addCallback(_done)
+ d.addErrback(_error)
+
+ else:
+ f.set_result(d)
+ f.done()
+
+ except Exception, e:
+ f.set_exception(e)
+ f.done()
+
+ reactor.callFromThread(twisted_wrapper)
+ try:
+ result = f.result()
+ except Exception, e:
+ log.exception(e=e, func=func, args=args, kw=kw)
+ raise
+
+ return result
+
+ return in_thread_wrapper
+
+
diff --git a/python/common/utils/id_generation.py b/python/common/utils/id_generation.py
new file mode 100644
index 0000000..e0fea1c
--- /dev/null
+++ b/python/common/utils/id_generation.py
@@ -0,0 +1,116 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# """ ID generation utils """
+
+from uuid import uuid4
+
+
+BROADCAST_CORE_ID=hex(0xFFFF)[2:]
+
+def get_next_core_id(current_id_in_hex_str):
+ """
+ :param current_id_in_hex_str: a hex string of the maximum core id
+ assigned without the leading 0x characters
+ :return: current_id_in_hex_str + 1 in hex string
+ """
+ if not current_id_in_hex_str or current_id_in_hex_str == '':
+ return '0001'
+ else:
+ return format(int(current_id_in_hex_str, 16) + 1, '04x')
+
+
+def create_cluster_logical_device_ids(core_id, switch_id):
+ """
+ Creates a logical device id and an OpenFlow datapath id that is unique
+ across the Voltha cluster.
+ The returned logical device id represents a 64 bits integer where the
+ lower 48 bits is the switch id and the upper 16 bits is the core id. For
+ the datapath id the core id is set to '0000' as it is not used for voltha
+ core routing
+ :param core_id: string
+ :param switch_id:int
+ :return: cluster logical device id and OpenFlow datapath id
+ """
+ switch_id = format(switch_id, '012x')
+ core_in_hex=format(int(core_id, 16), '04x')
+ ld_id = '{}{}'.format(core_in_hex[-4:], switch_id[-12:])
+ dpid_id = '{}{}'.format('0000', switch_id[-12:])
+ return ld_id, int(dpid_id, 16)
+
+def is_broadcast_core_id(id):
+ assert id and len(id) == 16
+ return id[:4] == BROADCAST_CORE_ID
+
+def create_empty_broadcast_id():
+ """
+ Returns an empty broadcast id (ffff000000000000). The id is used to
+ dispatch xPON objects across all the Voltha instances.
+ :return: An empty broadcast id
+ """
+ return '{}{}'.format(BROADCAST_CORE_ID, '0'*12)
+
+def create_cluster_id():
+ """
+ Returns an id that is common across all voltha instances. The id
+ is a str of 64 bits. The lower 48 bits refers to an id specific to that
+ object while the upper 16 bits refers a broadcast core_id
+ :return: An common id across all Voltha instances
+ """
+ return '{}{}'.format(BROADCAST_CORE_ID, uuid4().hex[:12])
+
+def create_cluster_device_id(core_id):
+ """
+ Creates a device id that is unique across the Voltha cluster.
+ The device id is a str of 64 bits. The lower 48 bits refers to the
+ device id while the upper 16 bits refers to the core id.
+ :param core_id: string
+ :return: cluster device id
+ """
+ return '{}{}'.format(format(int(core_id), '04x'), uuid4().hex[:12])
+
+
+def get_core_id_from_device_id(device_id):
+ # Device id is a string and the first 4 characters represent the core_id
+ assert device_id and len(device_id) == 16
+ # Get the leading 4 hexs and remove leading 0's
+ return device_id[:4]
+
+
+def get_core_id_from_logical_device_id(logical_device_id):
+ """
+ Logical Device id is a string and the first 4 characters represent the
+ core_id
+ :param logical_device_id:
+ :return: core_id string
+ """
+ assert logical_device_id and len(logical_device_id) == 16
+ # Get the leading 4 hexs and remove leading 0's
+ return logical_device_id[:4]
+
+
+def get_core_id_from_datapath_id(datapath_id):
+ """
+ datapath id is a uint64 where:
+ - low 48 bits -> switch_id
+ - high 16 bits -> core id
+ :param datapath_id:
+ :return: core_id string
+ """
+ assert datapath_id
+ # Get the hex string and remove the '0x' prefix
+ id_in_hex_str = hex(datapath_id)[2:]
+ assert len(id_in_hex_str) > 12
+ return id_in_hex_str[:-12]
diff --git a/python/common/utils/indexpool.py b/python/common/utils/indexpool.py
new file mode 100644
index 0000000..858cb3a
--- /dev/null
+++ b/python/common/utils/indexpool.py
@@ -0,0 +1,64 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from bitstring import BitArray
+import structlog
+
+log = structlog.get_logger()
+
+class IndexPool(object):
+ def __init__(self, max_entries, offset):
+ self.max_entries = max_entries
+ self.offset = offset
+ self.indices = BitArray(self.max_entries)
+
+ def get_next(self):
+ try:
+ _pos = self.indices.find('0b0')
+ self.indices.set(1, _pos)
+ return self.offset + _pos[0]
+ except IndexError:
+ log.info("exception-fail-to-allocate-id-all-bits-in-use")
+ return None
+
+ def allocate(self, index):
+ try:
+ _pos = index - self.offset
+ if not (0 <= _pos < self.max_entries):
+ log.info("{}-out-of-range".format(index))
+ return None
+ if self.indices[_pos]:
+ log.info("{}-is-already-allocated".format(index))
+ return None
+ self.indices.set(1, _pos)
+ return index
+
+ except IndexError:
+ return None
+
+ def release(self, index):
+ index -= self.offset
+ _pos = (index,)
+ try:
+ self.indices.set(0, _pos)
+ except IndexError:
+ log.info("bit-position-{}-out-of-range".format(index))
+
+ #index or multiple indices to set all of them to 1 - need to be a tuple
+ def pre_allocate(self, index):
+ if(isinstance(index, tuple)):
+ _lst = list(index)
+ for i in range(len(_lst)):
+ _lst[i] -= self.offset
+ index = tuple(_lst)
+ self.indices.set(1, index)
diff --git a/python/common/utils/json_format.py b/python/common/utils/json_format.py
new file mode 100644
index 0000000..c18d013
--- /dev/null
+++ b/python/common/utils/json_format.py
@@ -0,0 +1,105 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Monkey patched json_format to allow best effort decoding of Any fields.
+Use the additional flag (strict_any_handling=False) to trigger the
+best-effort behavior. Omit the flag, or just use the original json_format
+module fot the strict behavior.
+"""
+
+from google.protobuf import json_format
+
+class _PatchedPrinter(json_format._Printer):
+
+ def __init__(self, including_default_value_fields=False,
+ preserving_proto_field_name=False,
+ strict_any_handling=False):
+ super(_PatchedPrinter, self).__init__(including_default_value_fields,
+ preserving_proto_field_name)
+ self.strict_any_handling = strict_any_handling
+
+ def _BestEffortAnyMessageToJsonObject(self, msg):
+ try:
+ res = self._AnyMessageToJsonObject(msg)
+ except TypeError:
+ res = self._RegularMessageToJsonObject(msg, {})
+ return res
+
+
+def MessageToDict(message,
+ including_default_value_fields=False,
+ preserving_proto_field_name=False,
+ strict_any_handling=False):
+ """Converts protobuf message to a JSON dictionary.
+
+ Args:
+ message: The protocol buffers message instance to serialize.
+ including_default_value_fields: If True, singular primitive fields,
+ repeated fields, and map fields will always be serialized. If
+ False, only serialize non-empty fields. Singular message fields
+ and oneof fields are not affected by this option.
+ preserving_proto_field_name: If True, use the original proto field
+ names as defined in the .proto file. If False, convert the field
+ names to lowerCamelCase.
+ strict_any_handling: If True, converion will error out (like in the
+ original method) if an Any field with value for which the Any type
+ is not loaded is encountered. If False, the conversion will leave
+ the field un-packed, but otherwise will continue.
+
+ Returns:
+ A dict representation of the JSON formatted protocol buffer message.
+ """
+ printer = _PatchedPrinter(including_default_value_fields,
+ preserving_proto_field_name,
+ strict_any_handling=strict_any_handling)
+ # pylint: disable=protected-access
+ return printer._MessageToJsonObject(message)
+
+
+def MessageToJson(message,
+ including_default_value_fields=False,
+ preserving_proto_field_name=False,
+ strict_any_handling=False):
+ """Converts protobuf message to JSON format.
+
+ Args:
+ message: The protocol buffers message instance to serialize.
+ including_default_value_fields: If True, singular primitive fields,
+ repeated fields, and map fields will always be serialized. If
+ False, only serialize non-empty fields. Singular message fields
+ and oneof fields are not affected by this option.
+ preserving_proto_field_name: If True, use the original proto field
+ names as defined in the .proto file. If False, convert the field
+ names to lowerCamelCase.
+ strict_any_handling: If True, converion will error out (like in the
+ original method) if an Any field with value for which the Any type
+ is not loaded is encountered. If False, the conversion will leave
+ the field un-packed, but otherwise will continue.
+
+ Returns:
+ A string containing the JSON formatted protocol buffer message.
+ """
+ printer = _PatchedPrinter(including_default_value_fields,
+ preserving_proto_field_name,
+ strict_any_handling=strict_any_handling)
+ return printer.ToJsonString(message)
+
+
+json_format._WKTJSONMETHODS['google.protobuf.Any'] = [
+ '_BestEffortAnyMessageToJsonObject',
+ '_ConvertAnyMessage'
+]
+
+json_format._Printer._BestEffortAnyMessageToJsonObject = \
+ json_format._Printer._AnyMessageToJsonObject
diff --git a/python/common/utils/message_queue.py b/python/common/utils/message_queue.py
new file mode 100644
index 0000000..2b4257a
--- /dev/null
+++ b/python/common/utils/message_queue.py
@@ -0,0 +1,89 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from twisted.internet.defer import Deferred
+from twisted.internet.defer import succeed
+
+
+class MessageQueue(object):
+ """
+ An event driven queue, similar to twisted.internet.defer.DeferredQueue
+ but which allows selective dequeing based on a predicate function.
+ Unlike DeferredQueue, there is no limit on backlog, and there is no queue
+ limit.
+ """
+
+ def __init__(self):
+ self.waiting = [] # tuples of (d, predicate)
+ self.queue = [] # messages piling up here if no one is waiting
+
+ def reset(self):
+ """
+ Purge all content as well as waiters (by errback-ing their entries).
+ :return: None
+ """
+ for d, _ in self.waiting:
+ d.errback(Exception('mesage queue reset() was called'))
+ self.waiting = []
+ self.queue = []
+
+ def _cancelGet(self, d):
+ """
+ Remove a deferred from our waiting list.
+ :param d: The deferred that was been canceled.
+ :return: None
+ """
+ for i in range(len(self.waiting)):
+ if self.waiting[i][0] is d:
+ self.waiting.pop(i)
+
+ def put(self, obj):
+ """
+ Add an object to this queue
+ :param obj: arbitrary object that will be added to the queue
+ :return:
+ """
+
+ # if someone is waiting for this, return right away
+ for i in range(len(self.waiting)):
+ d, predicate = self.waiting[i]
+ if predicate is None or predicate(obj):
+ self.waiting.pop(i)
+ d.callback(obj)
+ return
+
+ # otherwise...
+ self.queue.append(obj)
+
+ def get(self, predicate=None):
+ """
+ Attempt to retrieve and remove an object from the queue that
+ matches the optional predicate.
+ :return: Deferred which fires with the next object available.
+ If predicate was provided, only objects for which
+ predicate(obj) is True will be considered.
+ """
+ for i in range(len(self.queue)):
+ msg = self.queue[i]
+ if predicate is None or predicate(msg):
+ self.queue.pop(i)
+ return succeed(msg)
+
+ # there were no matching entries if we got here, so we wait
+ d = Deferred(canceller=self._cancelGet)
+ self.waiting.append((d, predicate))
+ return d
+
+
diff --git a/python/common/utils/nethelpers.py b/python/common/utils/nethelpers.py
new file mode 100644
index 0000000..b17aced
--- /dev/null
+++ b/python/common/utils/nethelpers.py
@@ -0,0 +1,86 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Some network related convenience functions
+"""
+
+from netifaces import AF_INET
+
+import netifaces as ni
+import netaddr
+
+
+def _get_all_interfaces():
+ m_interfaces = []
+ for iface in ni.interfaces():
+ m_interfaces.append((iface, ni.ifaddresses(iface)))
+ return m_interfaces
+
+
+def _get_my_primary_interface():
+ gateways = ni.gateways()
+ assert 'default' in gateways, \
+ ("No default gateway on host/container, "
+ "cannot determine primary interface")
+ default_gw_index = gateways['default'].keys()[0]
+ # gateways[default_gw_index] has the format (example):
+ # [('10.15.32.1', 'en0', True)]
+ interface_name = gateways[default_gw_index][0][1]
+ return interface_name
+
+
+def get_my_primary_local_ipv4(inter_core_subnet=None, ifname=None):
+ if not inter_core_subnet:
+ return _get_my_primary_local_ipv4(ifname)
+ # My IP should belong to the specified subnet
+ for iface in ni.interfaces():
+ addresses = ni.ifaddresses(iface)
+ if AF_INET in addresses:
+ m_ip = addresses[AF_INET][0]['addr']
+ _ip = netaddr.IPAddress(m_ip).value
+ m_network = netaddr.IPNetwork(inter_core_subnet)
+ if _ip >= m_network.first and _ip <= m_network.last:
+ return m_ip
+ return None
+
+
+def get_my_primary_interface(pon_subnet=None):
+ if not pon_subnet:
+ return _get_my_primary_interface()
+ # My interface should have an IP that belongs to the specified subnet
+ for iface in ni.interfaces():
+ addresses = ni.ifaddresses(iface)
+ if AF_INET in addresses:
+ m_ip = addresses[AF_INET][0]['addr']
+ m_ip = netaddr.IPAddress(m_ip).value
+ m_network = netaddr.IPNetwork(pon_subnet)
+ if m_ip >= m_network.first and m_ip <= m_network.last:
+ return iface
+ return None
+
+
+def _get_my_primary_local_ipv4(ifname=None):
+ try:
+ ifname = get_my_primary_interface() if ifname is None else ifname
+ addresses = ni.ifaddresses(ifname)
+ ipv4 = addresses[AF_INET][0]['addr']
+ return ipv4
+ except Exception as e:
+ return None
+
+if __name__ == '__main__':
+ print get_my_primary_local_ipv4()
diff --git a/python/common/utils/ordered_weakvalue_dict.py b/python/common/utils/ordered_weakvalue_dict.py
new file mode 100644
index 0000000..9ea739a
--- /dev/null
+++ b/python/common/utils/ordered_weakvalue_dict.py
@@ -0,0 +1,48 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from _weakref import ref
+from weakref import KeyedRef
+from collections import OrderedDict
+
+
+class OrderedWeakValueDict(OrderedDict):
+ """
+ Modified OrderedDict to use weak references as values. Entries disappear
+ automatically if the referred value has no more strong reference pointing
+ ot it.
+
+ Warning, this is not a complete implementation, only what is needed for
+ now. See test_ordered_wealvalue_dict.py to see what is tested behavior.
+ """
+ def __init__(self, *args, **kw):
+ def remove(wr, selfref=ref(self)):
+ self = selfref()
+ if self is not None:
+ super(OrderedWeakValueDict, self).__delitem__(wr.key)
+ self._remove = remove
+ super(OrderedWeakValueDict, self).__init__(*args, **kw)
+
+ def __setitem__(self, key, value):
+ super(OrderedWeakValueDict, self).__setitem__(
+ key, KeyedRef(value, self._remove, key))
+
+ def __getitem__(self, key):
+ o = super(OrderedWeakValueDict, self).__getitem__(key)()
+ if o is None:
+ raise KeyError, key
+ else:
+ return o
+