VOL-1097 : Ofagent integration for voltha 2.0

- Created a common location for python based components
- Adjusted the ofagent component to interact with voltha 2.0
- Added streaming rpc methods for rcv/send of packets to voltha api
- Adjusted voltha.proto

Change-Id: I47fb7b80878ead060b4b42bd16cb4f8aa384fdb6
diff --git a/python/common/__init__.py b/python/common/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/common/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/event_bus.py b/python/common/event_bus.py
new file mode 100644
index 0000000..e717c16
--- /dev/null
+++ b/python/common/event_bus.py
@@ -0,0 +1,194 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A simple internal pub/sub event bus with topics and filter-based registration.
+"""
+import re
+
+import structlog
+
+
+log = structlog.get_logger()
+
+
+class _Subscription(object):
+
+    __slots__ = ('bus', 'predicate', 'callback', 'topic')
+    def __init__(self, bus, predicate, callback, topic=None):
+        self.bus = bus
+        self.predicate = predicate
+        self.callback = callback
+        self.topic = topic
+
+
+class EventBus(object):
+
+    def __init__(self):
+        self.subscriptions = {}  # topic -> list of _Subscription objects
+                                 # topic None holds regexp based topic subs.
+        self.subs_topic_map = {} # to aid fast lookup when unsubscribing
+
+    def list_subscribers(self, topic=None):
+        if topic is None:
+            return sum(self.subscriptions.itervalues(), [])
+        else:
+            if topic in self.subscriptions:
+                return self.subscriptions[topic]
+            else:
+                return []
+
+    @staticmethod
+    def _get_topic_key(topic):
+        if isinstance(topic, str):
+            return topic
+        elif hasattr(topic, 'match'):
+            return None
+        else:
+            raise AttributeError('topic not a string nor a compiled regex')
+
+    def subscribe(self, topic, callback, predicate=None):
+        """
+        Subscribe to given topic with predicate and register the callback
+        :param topic: String topic (explicit) or regexp based topic filter.
+        :param callback: Callback method with signature def func(topic, msg)
+        :param predicate: Optional method/function signature def predicate(msg)
+        :return: Subscription object which can be used to unsubscribe
+        """
+        subscription = _Subscription(self, predicate, callback, topic)
+        topic_key = self._get_topic_key(topic)
+        self.subscriptions.setdefault(topic_key, []).append(subscription)
+        self.subs_topic_map[subscription] = topic_key
+        return subscription
+
+    def unsubscribe(self, subscription):
+        """
+        Remove given subscription
+        :param subscription: subscription object as was returned by subscribe
+        :return: None
+        """
+        topic_key = self.subs_topic_map[subscription]
+        self.subscriptions[topic_key].remove(subscription)
+
+    def publish(self, topic, msg):
+        """
+        Publish given message to all subscribers registered with topic taking
+        the predicate functions into account.
+        :param topic: String topic
+        :param msg: Arbitrary python data as message
+        :return: None
+        """
+        from copy import copy
+
+        def passes(msg, predicate):
+            try:
+                return predicate(msg)
+            except Exception, e:
+                return False  # failed predicate function treated as no match
+
+        # lookup subscribers with explicit topic subscriptions
+        subscribers = self.subscriptions.get(topic, [])
+
+        # add matching regexp topic subscribers
+        subscribers.extend(s for s in self.subscriptions.get(None, [])
+                           if s.topic.match(topic))
+
+        # iterate over a shallow-copy of subscribers
+        for candidate in copy(subscribers):
+            predicate = candidate.predicate
+            if predicate is None or passes(msg, predicate):
+                try:
+                    candidate.callback(topic, msg)
+                except Exception, e:
+                    log.exception('callback-failed', e=repr(e), topic=topic)
+
+
+
+default_bus = EventBus()
+
+
+class EventBusClient(object):
+    """
+    Primary interface to the EventBus. Usage:
+
+    Publish:
+    >>> events = EventBusClient()
+    >>> msg = dict(a=1, b='foo')
+    >>> events.publish('a.topic', msg)
+
+    Subscribe to get all messages on specific topic:
+    >>> def got_event(topic, msg):
+    >>>     print topic, ':', msg
+    >>> events = EventBusClient()
+    >>> events.subscribe('a.topic', got_event)
+
+    Subscribe to get messages matching predicate on specific topic:
+    >>> def got_event(topic, msg):
+    >>>     print topic, ':', msg
+    >>> events = EventBusClient()
+    >>> events.subscribe('a.topic', got_event, lambda msg: msg.len() < 100)
+
+    Use a DeferredQueue to buffer incoming messages
+    >>> queue = DeferredQueue()
+    >>> events = EventBusClient()
+    >>> events.subscribe('a.topic', lambda _, msg: queue.put(msg))
+
+    """
+    def __init__(self, bus=None):
+        """
+        Obtain a client interface for the pub/sub event bus.
+        :param bus: An optional specific event bus. Inteded for mainly test
+        use. If not provided, the process default bus will be used, which is
+        the preferred use (a process shall not need more than one bus).
+        """
+        self.bus = bus or default_bus
+
+    def publish(self, topic, msg):
+        """
+        Publish given msg to given topic.
+        :param topic: String topic
+        :param msg: Arbitrary python data as message
+        :return: None
+        """
+        self.bus.publish(topic, msg)
+
+    def subscribe(self, topic, callback, predicate=None):
+        """
+        Subscribe to given topic with predicate and register the callback
+        :param topic: String topic (explicit) or regexp based topic filter.
+        :param callback: Callback method with signature def func(topic, msg)
+        :param predicate: Optional method/function with signature
+        def predicate(msg)
+        :return: Subscription object which can be used to unsubscribe
+        """
+        return self.bus.subscribe(topic, callback, predicate)
+
+    def unsubscribe(self, subscription):
+        """
+        Remove given subscription
+        :param subscription: subscription object as was returned by subscribe
+        :return: None
+        """
+        return self.bus.unsubscribe(subscription)
+
+    def list_subscribers(self, topic=None):
+        """
+        Return list of subscribers. If topci is provided, it is filtered for
+        those subscribing to the topic.
+        :param topic: Optional topic
+        :return: List of subscriptions
+        """
+        return self.bus.list_subscribers(topic)
diff --git a/python/common/frameio/__init__.py b/python/common/frameio/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/common/frameio/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/frameio/frameio.py b/python/common/frameio/frameio.py
new file mode 100644
index 0000000..3f5bcf6
--- /dev/null
+++ b/python/common/frameio/frameio.py
@@ -0,0 +1,437 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A module that can send and receive raw ethernet frames on a set of interfaces
+and it can manage a set of vlan interfaces on top of existing
+interfaces. Due to reliance on raw sockets, this module requires
+root access. Also, raw sockets are hard to deal with in Twisted (not
+directly supported) we need to run the receiver select loop on a dedicated
+thread.
+"""
+
+import os
+import socket
+import struct
+import uuid
+from pcapy import BPFProgram
+from threading import Thread, Condition
+
+import fcntl
+
+import select
+import structlog
+import sys
+
+from scapy.data import ETH_P_ALL
+from twisted.internet import reactor
+from zope.interface import implementer
+
+from voltha.registry import IComponent
+
+if sys.platform.startswith('linux'):
+    from common.frameio.third_party.oftest import afpacket, netutils
+elif sys.platform == 'darwin':
+    from scapy.arch import pcapdnet, BIOCIMMEDIATE, dnet
+
+log = structlog.get_logger()
+
+
+def hexify(buffer):
+    """
+    Return a hexadecimal string encoding of input buffer
+    """
+    return ''.join('%02x' % ord(c) for c in buffer)
+
+
+class _SelectWakerDescriptor(object):
+    """
+    A descriptor that can be mixed into a select loop to wake it up.
+    """
+    def __init__(self):
+        self.pipe_read, self.pipe_write = os.pipe()
+        fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
+
+    def __del__(self):
+        os.close(self.pipe_read)
+        os.close(self.pipe_write)
+
+    def fileno(self):
+        return self.pipe_read
+
+    def wait(self):
+        os.read(self.pipe_read, 1)
+
+    def notify(self):
+        """Trigger a select loop"""
+        os.write(self.pipe_write, '\x00')
+
+
+class BpfProgramFilter(object):
+    """
+    Convenience packet filter based on the well-tried Berkeley Packet Filter,
+    used by many well known open source tools such as pcap and tcpdump.
+    """
+    def __init__(self, program_string):
+        """
+        Create a filter using the BPF command syntax. To learn more,
+        consult 'man pcap-filter'.
+        :param program_string: The textual definition of the filter. Examples:
+        'vlan 1000'
+        'vlan 1000 and ip src host 10.10.10.10'
+        """
+        self.bpf = BPFProgram(program_string)
+
+    def __call__(self, frame):
+        """
+        Return 1 if frame passes filter.
+        :param frame: Raw frame provided as Python string
+        :return: 1 if frame satisfies filter, 0 otherwise.
+        """
+        return self.bpf.filter(frame)
+
+
+class FrameIOPort(object):
+    """
+    Represents a network interface which we can send/receive raw
+    Ethernet frames.
+    """
+
+    RCV_SIZE_DEFAULT = 4096
+    ETH_P_ALL = 0x03
+    RCV_TIMEOUT = 10000
+    MIN_PKT_SIZE = 60
+
+    def __init__(self, iface_name):
+        self.iface_name = iface_name
+        self.proxies = []
+        self.socket = self.open_socket(self.iface_name)
+        log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
+        self.received = 0
+        self.discarded = 0
+
+    def add_proxy(self, proxy):
+        self.proxies.append(proxy)
+
+    def del_proxy(self, proxy):
+        self.proxies = [p for p in self.proxies if p.name != proxy.name]
+
+    def open_socket(self, iface_name):
+        raise NotImplementedError('to be implemented by derived class')
+
+    def rcv_frame(self):
+        raise NotImplementedError('to be implemented by derived class')
+
+    def __del__(self):
+        if self.socket:
+            self.socket.close()
+            self.socket = None
+        log.debug('socket-closed', iface=self.iface_name)
+
+    def fileno(self):
+        return self.socket.fileno()
+
+    def _dispatch(self, proxy, frame):
+        log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
+        try:
+            proxy.callback(proxy, frame)
+        except Exception as e:
+            log.exception('callback-error',
+                          explanation='Callback failed while processing frame',
+                          e=e)
+
+    def recv(self):
+        """Called on the select thread when a packet arrives"""
+        try:
+            frame = self.rcv_frame()
+        except RuntimeError as e:
+            # we observed this happens sometimes right after the socket was
+            # attached to a newly created veth interface. So we log it, but
+            # allow to continue.
+            log.warn('afpacket-recv-error', code=-1)
+            return
+
+        log.debug('frame-received', iface=self.iface_name, len=len(frame),
+                  hex=hexify(frame))
+        self.received +=1
+        dispatched = False
+        for proxy in self.proxies:
+            if proxy.filter is None or proxy.filter(frame):
+                log.debug('frame-dispatched')
+                dispatched = True
+                reactor.callFromThread(self._dispatch, proxy, frame)
+
+        if not dispatched:
+            self.discarded += 1
+            log.debug('frame-discarded')
+
+    def send(self, frame):
+        log.debug('sending', len=len(frame), iface=self.iface_name)
+        sent_bytes = self.send_frame(frame)
+        if sent_bytes != len(frame):
+            log.error('send-error', iface=self.iface_name,
+                      wanted_to_send=len(frame), actually_sent=sent_bytes)
+        return sent_bytes
+
+    def send_frame(self, frame):
+        try:
+            return self.socket.send(frame)
+        except socket.error, err:
+            if err[0] == os.errno.EINVAL:
+                if len(frame) < self.MIN_PKT_SIZE:
+                    padding = '\x00' * (self.MIN_PKT_SIZE - len(frame))
+                    frame = frame + padding
+                    return self.socket.send(frame)
+            else:
+                raise
+
+    def up(self):
+        if sys.platform.startswith('darwin'):
+            pass
+        else:
+            os.system('ip link set {} up'.format(self.iface_name))
+        return self
+
+    def down(self):
+        if sys.platform.startswith('darwin'):
+            pass
+        else:
+            os.system('ip link set {} down'.format(self.iface_name))
+        return self
+
+    def statistics(self):
+        return self.received, self.discarded
+
+
+class LinuxFrameIOPort(FrameIOPort):
+
+    def open_socket(self, iface_name):
+        s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
+        afpacket.enable_auxdata(s)
+        s.bind((self.iface_name, self.ETH_P_ALL))
+        netutils.set_promisc(s, iface_name)
+        s.settimeout(self.RCV_TIMEOUT)
+        return s
+
+    def rcv_frame(self):
+        return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+
+
+class DarwinFrameIOPort(FrameIOPort):
+
+    def open_socket(self, iface_name):
+        sin = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
+        try:
+            fcntl.ioctl(sin.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
+        except:
+            pass
+
+        # need a different kind of socket for sending out
+        self.sout = dnet.eth(iface_name)
+
+        return sin
+
+    def send_frame(self, frame):
+        return self.sout.send(frame)
+
+    def rcv_frame(self):
+        pkt = self.socket.next()
+        if pkt is not None:
+            ts, pkt = pkt
+        return pkt
+
+
+if sys.platform == 'darwin':
+    _FrameIOPort = DarwinFrameIOPort
+elif sys.platform.startswith('linux'):
+    _FrameIOPort = LinuxFrameIOPort
+else:
+    raise Exception('Unsupported platform {}'.format(sys.platform))
+    sys.exit(1)
+
+
+class FrameIOPortProxy(object):
+    """Makes FrameIOPort sharable between multiple users"""
+
+    def __init__(self, frame_io_port, callback, filter=None, name=None):
+        self.frame_io_port = frame_io_port
+        self.callback = callback
+        self.filter = filter
+        self.name = uuid.uuid4().hex[:12] if name is None else name
+
+    @property
+    def iface_name(self):
+        return self.frame_io_port.iface_name
+
+    def get_iface_name(self):
+        return self.frame_io_port.iface_name
+
+    def send(self, frame):
+        return self.frame_io_port.send(frame)
+
+    def up(self):
+        self.frame_io_port.up()
+        return self
+
+    def down(self):
+        self.frame_io_port.down()
+        return self
+
+
+@implementer(IComponent)
+class FrameIOManager(Thread):
+    """
+    Packet/Frame IO manager that can be used to send/receive raw frames
+    on a set of network interfaces.
+    """
+    def __init__(self):
+        super(FrameIOManager, self).__init__()
+
+        self.ports = {}  # iface_name -> ActiveFrameReceiver
+        self.queue = {}  # iface_name -> TODO
+
+        self.cvar = Condition()
+        self.waker = _SelectWakerDescriptor()
+        self.stopped = False
+        self.ports_changed = False
+
+    # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
+
+    def start(self):
+        """
+        Start the IO manager and its select loop thread
+        """
+        log.debug('starting')
+        super(FrameIOManager, self).start()
+        log.info('started')
+        return self
+
+    def stop(self):
+        """
+        Stop the IO manager and its thread with the select loop
+        """
+        log.debug('stopping')
+        self.stopped = True
+        self.waker.notify()
+        self.join()
+        del self.ports
+        log.info('stopped')
+
+    def list_interfaces(self):
+        """
+        Return list of interfaces listened on
+        :return: List of FrameIOPort objects
+        """
+        return self.ports
+
+    def open_port(self, iface_name, callback, filter=None, name=None):
+        """
+        Add a new interface and start receiving on it.
+        :param iface_name: Name of the interface. Must be an existing Unix
+        interface (eth0, en0, etc.)
+        :param callback: Called on each received frame;
+        signature: def callback(port, frame) where port is the FrameIOPort
+        instance at which the frame was received, frame is the actual frame
+        received (as binay string)
+        :param filter: An optional filter (predicate), with signature:
+        def filter(frame). If provided, only frames for which filter evaluates
+        to True will be forwarded to callback.
+        :return: FrmaeIOPortProxy instance.
+        """
+
+        port = self.ports.get(iface_name)
+        if port is None:
+            port = _FrameIOPort(iface_name)
+            self.ports[iface_name] = port
+            self.ports_changed = True
+            self.waker.notify()
+
+        proxy = FrameIOPortProxy(port, callback, filter, name)
+        port.add_proxy(proxy)
+
+        return proxy
+
+    def close_port(self, proxy):
+        """
+        Remove the proxy. If this is the last proxy on an interface, stop and
+        remove the named interface as well
+        :param proxy: FrameIOPortProxy reference
+        :return: None
+        """
+        assert isinstance(proxy, FrameIOPortProxy)
+        iface_name = proxy.get_iface_name()
+        assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
+        port = self.ports[iface_name]
+        port.del_proxy(proxy)
+
+        if not port.proxies:
+            del self.ports[iface_name]
+            # need to exit select loop to reconstruct select fd lists
+            self.ports_changed = True
+            self.waker.notify()
+
+    def send(self, iface_name, frame):
+        """
+        Send frame on given interface
+        :param iface_name: Name of previously registered interface
+        :param frame: frame as string
+        :return: number of bytes sent
+        """
+        return self.ports[iface_name].send(frame)
+
+    # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
+
+    def run(self):
+        """
+        Called on the alien thread, this is the core multi-port receive loop
+        """
+
+        log.debug('select-loop-started')
+
+        # outer loop constructs sockets list for select
+        while not self.stopped:
+            sockets = [self.waker] + self.ports.values()
+            self.ports_changed = False
+            empty = []
+            # inner select loop
+
+            while not self.stopped:
+                try:
+                    _in, _out, _err = select.select(sockets, empty, empty, 1)
+                except Exception as e:
+                    log.exception('frame-io-select-error', e=e)
+                    break
+                with self.cvar:
+                    for port in _in:
+                        if port is self.waker:
+                            self.waker.wait()
+                            continue
+                        else:
+                            port.recv()
+                    self.cvar.notify_all()
+                if self.ports_changed:
+                    break  # break inner loop so we reconstruct sockets list
+
+        log.debug('select-loop-exited')
+
+    def del_interface(self, iface_name):
+        """
+            Delete interface for stopping
+        """
+
+        log.info('Delete interface')
+        del self.ports[iface_name]
+        log.info('Interface(port) is deleted')
diff --git a/python/common/frameio/third_party/__init__.py b/python/common/frameio/third_party/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/common/frameio/third_party/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/frameio/third_party/oftest/LICENSE b/python/common/frameio/third_party/oftest/LICENSE
new file mode 100644
index 0000000..3216042
--- /dev/null
+++ b/python/common/frameio/third_party/oftest/LICENSE
@@ -0,0 +1,36 @@
+OpenFlow Test Framework
+
+Copyright (c) 2010 The Board of Trustees of The Leland Stanford
+Junior University
+
+Except where otherwise noted, this software is distributed under
+the OpenFlow Software License.  See
+http://www.openflowswitch.org/wp/legal/ for current details.
+
+We are making the OpenFlow specification and associated documentation
+(Software) available for public use and benefit with the expectation
+that others will use, modify and enhance the Software and contribute
+those enhancements back to the community. However, since we would like
+to make the Software available for broadest use, with as few
+restrictions as possible permission is hereby granted, free of charge,
+to any person obtaining a copy of this Software to deal in the
+Software under the copyrights without restriction, including without
+limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED -Y´AS IS¡, WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+The name and trademarks of copyright holder(s) may NOT be used in
+advertising or publicity pertaining to the Software or any derivatives
+without specific, written prior permission.
diff --git a/python/common/frameio/third_party/oftest/README.md b/python/common/frameio/third_party/oftest/README.md
new file mode 100644
index 0000000..f0cb649
--- /dev/null
+++ b/python/common/frameio/third_party/oftest/README.md
@@ -0,0 +1,6 @@
+Files in this directory are derived from the respective files
+in oftest (http://github.com/floodlight/oftest).
+ 
+For the licensing terms of these files, see LICENSE in this dir.
+ 
+
diff --git a/python/common/frameio/third_party/oftest/__init__.py b/python/common/frameio/third_party/oftest/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/common/frameio/third_party/oftest/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/frameio/third_party/oftest/afpacket.py b/python/common/frameio/third_party/oftest/afpacket.py
new file mode 100644
index 0000000..9ae8075
--- /dev/null
+++ b/python/common/frameio/third_party/oftest/afpacket.py
@@ -0,0 +1,124 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+AF_PACKET receive support
+
+When VLAN offload is enabled on the NIC Linux will not deliver the VLAN tag
+in the data returned by recv. Instead, it delivers the VLAN TCI in a control
+message. Python 2.x doesn't have built-in support for recvmsg, so we have to
+use ctypes to call it. The recv function exported by this module reconstructs
+the VLAN tag if it was offloaded.
+"""
+
+import struct
+from ctypes import *
+
+ETH_P_8021Q = 0x8100
+SOL_PACKET = 263
+PACKET_AUXDATA = 8
+TP_STATUS_VLAN_VALID = 1 << 4
+
+class struct_iovec(Structure):
+    _fields_ = [
+        ("iov_base", c_void_p),
+        ("iov_len", c_size_t),
+    ]
+
+class struct_msghdr(Structure):
+    _fields_ = [
+        ("msg_name", c_void_p),
+        ("msg_namelen", c_uint32),
+        ("msg_iov", POINTER(struct_iovec)),
+        ("msg_iovlen", c_size_t),
+        ("msg_control", c_void_p),
+        ("msg_controllen", c_size_t),
+        ("msg_flags", c_int),
+    ]
+
+class struct_cmsghdr(Structure):
+    _fields_ = [
+        ("cmsg_len", c_size_t),
+        ("cmsg_level", c_int),
+        ("cmsg_type", c_int),
+    ]
+
+class struct_tpacket_auxdata(Structure):
+    _fields_ = [
+        ("tp_status", c_uint),
+        ("tp_len", c_uint),
+        ("tp_snaplen", c_uint),
+        ("tp_mac", c_ushort),
+        ("tp_net", c_ushort),
+        ("tp_vlan_tci", c_ushort),
+        ("tp_padding", c_ushort),
+    ]
+
+libc = CDLL("libc.so.6")
+recvmsg = libc.recvmsg
+recvmsg.argtypes = [c_int, POINTER(struct_msghdr), c_int]
+recvmsg.retype = c_int
+
+def enable_auxdata(sk):
+    """
+    Ask the kernel to return the VLAN tag in a control message
+
+    Must be called on the socket before afpacket.recv.
+    """
+    sk.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1)
+
+def recv(sk, bufsize):
+    """
+    Receive a packet from an AF_PACKET socket
+    @sk Socket
+    @bufsize Maximum packet size
+    """
+    buf = create_string_buffer(bufsize)
+
+    ctrl_bufsize = sizeof(struct_cmsghdr) + sizeof(struct_tpacket_auxdata) + sizeof(c_size_t)
+    ctrl_buf = create_string_buffer(ctrl_bufsize)
+
+    iov = struct_iovec()
+    iov.iov_base = cast(buf, c_void_p)
+    iov.iov_len = bufsize
+
+    msghdr = struct_msghdr()
+    msghdr.msg_name = None
+    msghdr.msg_namelen = 0
+    msghdr.msg_iov = pointer(iov)
+    msghdr.msg_iovlen = 1
+    msghdr.msg_control = cast(ctrl_buf, c_void_p)
+    msghdr.msg_controllen = ctrl_bufsize
+    msghdr.msg_flags = 0
+
+    rv = recvmsg(sk.fileno(), byref(msghdr), 0)
+    if rv < 0:
+        raise RuntimeError("recvmsg failed: rv=%d", rv)
+
+    # The kernel only delivers control messages we ask for. We
+    # only enabled PACKET_AUXDATA, so we can assume it's the
+    # only control message.
+    assert msghdr.msg_controllen >= sizeof(struct_cmsghdr)
+
+    cmsghdr = struct_cmsghdr.from_buffer(ctrl_buf) # pylint: disable=E1101
+    assert cmsghdr.cmsg_level == SOL_PACKET
+    assert cmsghdr.cmsg_type == PACKET_AUXDATA
+
+    auxdata = struct_tpacket_auxdata.from_buffer(ctrl_buf, sizeof(struct_cmsghdr)) # pylint: disable=E1101
+
+    if auxdata.tp_vlan_tci != 0 or auxdata.tp_status & TP_STATUS_VLAN_VALID:
+        # Insert VLAN tag
+        tag = struct.pack("!HH", ETH_P_8021Q, auxdata.tp_vlan_tci)
+        return buf.raw[:12] + tag + buf.raw[12:rv]
+    else:
+        return buf.raw[:rv]
diff --git a/python/common/frameio/third_party/oftest/netutils.py b/python/common/frameio/third_party/oftest/netutils.py
new file mode 100644
index 0000000..092d490
--- /dev/null
+++ b/python/common/frameio/third_party/oftest/netutils.py
@@ -0,0 +1,73 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Network utilities for the OpenFlow test framework
+"""
+
+###########################################################################
+##                                                                         ##
+## Promiscuous mode enable/disable                                         ##
+##                                                                         ##
+## Based on code from Scapy by Phillippe Biondi                            ##
+##                                                                         ##
+##                                                                         ##
+## This program is free software; you can redistribute it and/or modify it ##
+## under the terms of the GNU General Public License as published by the   ##
+## Free Software Foundation; either version 2, or (at your option) any     ##
+## later version.                                                          ##
+##                                                                         ##
+## This program is distributed in the hope that it will be useful, but     ##
+## WITHOUT ANY WARRANTY; without even the implied warranty of              ##
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU       ##
+## General Public License for more details.                                ##
+##                                                                         ##
+#############################################################################
+
+import socket
+from fcntl import ioctl
+import struct
+
+# From net/if_arp.h
+ARPHDR_ETHER = 1
+ARPHDR_LOOPBACK = 772
+
+# From bits/ioctls.h
+SIOCGIFHWADDR  = 0x8927          # Get hardware address
+SIOCGIFINDEX   = 0x8933          # name -> if_index mapping
+
+# From netpacket/packet.h
+PACKET_ADD_MEMBERSHIP  = 1
+PACKET_DROP_MEMBERSHIP = 2
+PACKET_MR_PROMISC      = 1
+
+# From bits/socket.h
+SOL_PACKET = 263
+
+def get_if(iff,cmd):
+  s=socket.socket()
+  ifreq = ioctl(s, cmd, struct.pack("16s16x",iff))
+  s.close()
+  return ifreq
+
+def get_if_index(iff):
+  return int(struct.unpack("I",get_if(iff, SIOCGIFINDEX)[16:20])[0])
+
+def set_promisc(s,iff,val=1):
+  mreq = struct.pack("IHH8s", get_if_index(iff), PACKET_MR_PROMISC, 0, "")
+  if val:
+      cmd = PACKET_ADD_MEMBERSHIP
+  else:
+      cmd = PACKET_DROP_MEMBERSHIP
+  s.setsockopt(SOL_PACKET, cmd, mreq)
+
diff --git a/python/common/kvstore/__init__.py b/python/common/kvstore/__init__.py
new file mode 100644
index 0000000..4a82628
--- /dev/null
+++ b/python/common/kvstore/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/kvstore/consul_client.py b/python/common/kvstore/consul_client.py
new file mode 100644
index 0000000..bc14759
--- /dev/null
+++ b/python/common/kvstore/consul_client.py
@@ -0,0 +1,304 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from common.kvstore.kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair, RETRY_BACKOFF
+from common.utils.asleep import asleep
+from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+from consul import ConsulException
+from consul.twisted import Consul
+from structlog import get_logger
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+
+log = get_logger()
+
+class ConsulClient(KVClient):
+
+    def __init__(self, kv_host, kv_port):
+        KVClient.__init__(self, kv_host, kv_port)
+        self.session_id = None
+        self.client = Consul(kv_host, kv_port)
+
+    def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
+        self._retriggering_watch(key, key_change_callback, timeout)
+
+    @inlineCallbacks
+    def _retriggering_watch(self, key, key_change_callback, timeout):
+        self.key_watches[key] = ConsulWatch(self.client, key, key_change_callback, timeout)
+        yield self.key_watches[key].start()
+
+    def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
+        if key in self.key_watches:
+            self.key_watches[key].stop()
+
+    @inlineCallbacks
+    def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
+        log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
+        err = None
+        result = None
+        while True:
+            try:
+                if operation == 'GET':
+                    result = yield self._get(key, **kw)
+                elif operation == 'LIST':
+                    result, err = yield self._list(key)
+                elif operation == 'PUT':
+                    # Put returns a boolean response
+                    result = yield self.client.kv.put(key, value)
+                    if not result:
+                        err = 'put-failed'
+                elif operation == 'DELETE':
+                    # Delete returns a boolean response
+                    result = yield self.client.kv.delete(key)
+                    if not result:
+                        err = 'delete-failed'
+                elif operation == 'RESERVE':
+                    result, err = yield self._reserve(key, value, **kw)
+                elif operation == 'RENEW':
+                    result, err = yield self._renew_reservation(key)
+                elif operation == 'RELEASE':
+                    result, err = yield self._release_reservation(key)
+                elif operation == 'RELEASE-ALL':
+                    err = yield self._release_all_reservations()
+                self._clear_backoff()
+                break
+            except ConsulException as ex:
+                if 'ConnectionRefusedError' in ex.message:
+                    log.exception('comms-exception', ex=ex)
+                    yield self._backoff('consul-not-up')
+                else:
+                    log.error('consul-specific-exception', ex=ex)
+                    err = ex
+            except Exception as ex:
+                log.error('consul-exception', ex=ex)
+                err = ex
+
+            if timeout > 0 and self.retry_time > timeout:
+                err = 'operation-timed-out'
+            if err is not None:
+                self._clear_backoff()
+                break
+
+        returnValue((result,err))
+
+    @inlineCallbacks
+    def _get(self, key, **kw):
+        kvp = None
+        index, rec = yield self.client.kv.get(key, **kw)
+        if rec is not None:
+            kvp = KVPair(rec['Key'], rec['Value'], index)
+        returnValue(kvp)
+
+    @inlineCallbacks
+    def _list(self, key):
+        err = None
+        list = []
+        index, recs = yield self.client.kv.get(key, recurse=True)
+        for rec in recs:
+            list.append(KVPair(rec['Key'], rec['Value'], rec['ModifyIndex']))
+        returnValue((list, err))
+
+    @inlineCallbacks
+    def _reserve(self, key, value, **kw):
+        for name, val in kw.items():
+            if name == 'ttl':
+                ttl = val
+                break
+        reserved = False
+        err = 'reservation-failed'
+        owner = None
+
+        # Create a session
+        self.session_id = yield self.client.session.create(behavior='delete',
+                                                           ttl=ttl) # lock_delay=1)
+        log.debug('create-session', id=self.session_id)
+        # Try to acquire the key
+        result = yield self.client.kv.put(key, value, acquire=self.session_id)
+        log.debug('key-acquire', key=key, value=value, sess=self.session_id, result=result)
+
+        # Check if reservation succeeded
+        index, record = yield self.client.kv.get(key)
+        if record is not None and 'Value' in record:
+            owner = record['Value']
+            log.debug('get-key', session=record['Session'], owner=owner)
+            if record['Session'] == self.session_id and owner == value:
+                reserved = True
+                log.debug('key-reserved', key=key, value=value, ttl=ttl)
+                # Add key to reservation list
+                self.key_reservations[key] = self.session_id
+            else:
+                log.debug('reservation-held-by-another', owner=owner)
+
+        if reserved:
+            err = None
+        returnValue((owner, err))
+
+    @inlineCallbacks
+    def _renew_reservation(self, key):
+        result = None
+        err = None
+        if key not in self.key_reservations:
+            err = 'key-not-reserved'
+        else:
+            session_id = self.key_reservations[key]
+            # A successfully renewed session returns an object with fields:
+            # Node, CreateIndex, Name, ModifyIndex, ID, Behavior, TTL,
+            # LockDelay, and Checks
+            result = yield self.client.session.renew(session_id=session_id)
+            log.debug('session-renew', result=result)
+        if result is None:
+            err = 'session-renewal-failed'
+        returnValue((result, err))
+
+    @inlineCallbacks
+    def _release_reservation(self, key):
+        err = None
+        if key not in self.key_reservations:
+            err = 'key-not-reserved'
+        else:
+            session_id = self.key_reservations[key]
+            # A successfully destroyed session returns a boolean result
+            success = yield self.client.session.destroy(session_id)
+            log.debug('session-destroy', result=success)
+            if not success:
+                err = 'session-destroy-failed'
+            self.session_id = None
+            self.key_reservations.pop(key)
+        returnValue((success, err))
+
+    @inlineCallbacks
+    def _release_all_reservations(self):
+        err = None
+        keys_to_delete = []
+        for key in self.key_reservations:
+            session_id = self.key_reservations[key]
+            # A successfully destroyed session returns a boolean result
+            success = yield self.client.session.destroy(session_id)
+            if not success:
+                err = 'session-destroy-failed'
+                log.debug('session-destroy', id=session_id, result=success)
+            self.session_id = None
+            keys_to_delete.append(key)
+        for key in keys_to_delete:
+            self.key_reservations.pop(key)
+        returnValue(err)
+
+
+class ConsulWatch():
+
+    def __init__(self, consul, key, callback, timeout):
+        self.client = consul
+        self.key = key
+        self.index = None
+        self.callback = callback
+        self.timeout = timeout
+        self.period = 60
+        self.running = True
+        self.retries = 0
+        self.retry_time = 0
+
+    @inlineCallbacks
+    def start(self):
+        self.running = True
+        index, rec = yield self._get_with_retry(self.key, None,
+                                              timeout=self.timeout)
+        self.index = str(index)
+
+        @inlineCallbacks
+        def _get(key, deferred):
+            try:
+                index, rec = yield self._get_with_retry(key, None,
+                                                     timeout=self.timeout,
+                                                     index=self.index)
+                self.index = str(index)
+                if not deferred.called:
+                    log.debug('got-result-cancelling-deferred')
+                    deferred.callback((self.index, rec))
+            except Exception as e:
+                log.exception('got-exception', e=e)
+
+        while self.running:
+            try:
+                rcvd = DeferredWithTimeout(timeout=self.period)
+                _get(self.key, rcvd)
+                try:
+                    # Update index for next watch iteration
+                    index, rec = yield rcvd
+                    log.debug('event-received', index=index, rec=rec)
+                    # Notify client of key change event
+                    if rec is None:
+                        # Key has been deleted
+                        self._send_event(Event(Event.DELETE, self.key, None))
+                    else:
+                        self._send_event(Event(Event.PUT, rec['Key'], rec['Value']))
+                except TimeOutError as e:
+                    log.debug('no-events-over-watch-period', key=self.key)
+                except Exception as e:
+                    log.exception('exception', e=e)
+            except Exception as e:
+                log.exception('exception', e=e)
+
+        log.debug('close-watch', key=self.key)
+
+    def stop(self):
+        self.running = False
+        self.callback = None
+
+    @inlineCallbacks
+    def _get_with_retry(self, key, value, timeout, *args, **kw):
+        log.debug('watch-period', key=key, period=self.period, timeout=timeout, args=args, kw=kw)
+        err = None
+        result = None
+        while True:
+            try:
+                result = yield self.client.kv.get(key, **kw)
+                self._clear_backoff()
+                break
+            except ConsulException as ex:
+                err = ex
+                if 'ConnectionRefusedError' in ex.message:
+                    self._send_event(Event(Event.CONNECTION_DOWN, self.key, None))
+                    log.exception('comms-exception', ex=ex)
+                    yield self._backoff('consul-not-up')
+                else:
+                    log.error('consul-specific-exception', ex=ex)
+            except Exception as ex:
+                err = ex
+                log.error('consul-exception', ex=ex)
+
+            if timeout > 0 and self.retry_time > timeout:
+                err = 'operation-timed-out'
+            if err is not None:
+                self._clear_backoff()
+                break
+
+        returnValue(result)
+
+    def _send_event(self, event):
+        if self.callback is not None:
+            self.callback(event)
+
+    def _backoff(self, msg):
+        wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
+        self.retry_time += wait_time
+        self.retries += 1
+        log.error(msg, next_retry_in_secs=wait_time,
+                  total_delay_in_secs = self.retry_time,
+                  retries=self.retries)
+        return asleep(wait_time)
+
+    def _clear_backoff(self):
+        if self.retries:
+            log.debug('reconnected-to-kv', after_retries=self.retries)
+            self.retries = 0
+            self.retry_time = 0
diff --git a/python/common/kvstore/etcd_client.py b/python/common/kvstore/etcd_client.py
new file mode 100644
index 0000000..a958b71
--- /dev/null
+++ b/python/common/kvstore/etcd_client.py
@@ -0,0 +1,240 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+################################################################################
+#
+# Most of the txaioetcd methods provide a timeout parameter. This parameter
+# is likely intended to limit the amount of time spent by any one method
+# waiting for a response from the etcd server. However, if the server is
+# down, the method immediately throws a ConnectionRefusedError exception;
+# it does not perform any retries. The timeout parameter provided by the
+# methods in EtcdClient cover this contingency.
+#
+################################################################################
+
+from common.kvstore.kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from twisted.internet.error import ConnectionRefusedError
+from txaioetcd import Client, CompVersion, Failed, KeySet, OpGet, OpSet, Transaction
+
+log = get_logger()
+
+class EtcdClient(KVClient):
+
+    def __init__(self, kv_host, kv_port):
+        KVClient.__init__(self, kv_host, kv_port)
+        self.url = u'http://' + kv_host + u':' + str(kv_port)
+        self.client = Client(reactor, self.url)
+
+    @inlineCallbacks
+    def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
+        self.key_watches[key] = key_change_callback
+        result = yield self._op_with_retry('WATCH', key, None, timeout, callback=self.key_changed)
+        returnValue(result)
+
+    def key_changed(self, kv):
+        key = kv.key
+        value = kv.value
+        log.debug('key-changed', key=key, value=value)
+        # Notify client of key change event
+        if value is not None:
+            evt = Event(Event.PUT, key, value)
+        else:
+            evt = Event(Event.DELETE, key, None)
+        if key in self.key_watches:
+            self.key_watches[key](evt)
+
+    def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
+        log.debug('close-watch', key=key)
+        if key in self.key_watches:
+            self.key_watches.pop(key)
+
+    @inlineCallbacks
+    def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
+        log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
+        err = None
+        result = None
+        if type(key) == str:
+            key = bytes(key)
+        if value is not None:
+           value = bytes(value)
+        while True:
+            try:
+                if operation == 'GET':
+                    result = yield self._get(key)
+                elif operation == 'LIST':
+                    result, err = yield self._list(key)
+                elif operation == 'PUT':
+                    # Put returns an object of type Revision
+                    result = yield self.client.set(key, value, **kw)
+                elif operation == 'DELETE':
+                    # Delete returns an object of type Deleted
+                    result = yield self.client.delete(key)
+                elif operation == 'RESERVE':
+                    result, err = yield self._reserve(key, value, **kw)
+                elif operation == 'RENEW':
+                    result, err = yield self._renew_reservation(key)
+                elif operation == 'RELEASE':
+                    result, err = yield self._release_reservation(key)
+                elif operation == 'RELEASE-ALL':
+                    err = yield self._release_all_reservations()
+                elif operation == 'WATCH':
+                    for name, val in kw.items():
+                        if name == 'callback':
+                            callback = val
+                            break
+                    result = self.client.watch([KeySet(key, prefix=True)], callback)
+                self._clear_backoff()
+                break
+            except ConnectionRefusedError as ex:
+                log.error('comms-exception', ex=ex)
+                yield self._backoff('etcd-not-up')
+            except Exception as ex:
+                log.error('etcd-exception', ex=ex)
+                err = ex
+
+            if timeout > 0 and self.retry_time > timeout:
+                err = 'operation-timed-out'
+            if err is not None:
+                self._clear_backoff()
+                break
+
+        returnValue((result, err))
+
+    @inlineCallbacks
+    def _get(self, key):
+        kvp = None
+        resp = yield self.client.get(key)
+        if resp.kvs is not None and len(resp.kvs) == 1:
+            kv = resp.kvs[0]
+            kvp = KVPair(kv.key, kv.value, kv.mod_revision)
+        returnValue(kvp)
+
+    @inlineCallbacks
+    def _list(self, key):
+        err = None
+        list = []
+        resp = yield self.client.get(KeySet(key, prefix=True))
+        if resp.kvs is not None and len(resp.kvs) > 0:
+            for kv in resp.kvs:
+                list.append(KVPair(kv.key, kv.value, kv.mod_revision))
+        returnValue((list, err))
+
+    @inlineCallbacks
+    def _reserve(self, key, value, **kw):
+        for name, val in kw.items():
+            if name == 'ttl':
+                ttl = val
+                break
+        reserved = False
+        err = 'reservation-failed'
+        owner = None
+
+        # Create a lease
+        lease = yield self.client.lease(ttl)
+
+        # Create a transaction
+        txn = Transaction(
+            compare=[ CompVersion(key, '==', 0) ],
+            success=[ OpSet(key, bytes(value), lease=lease) ],
+            failure=[ OpGet(key) ]
+        )
+        newly_acquired = False
+        try:
+            result = yield self.client.submit(txn)
+        except Failed as failed:
+            log.debug('key-already-present', key=key)
+            if len(failed.responses) > 0:
+                response = failed.responses[0]
+                if response.kvs is not None and len(response.kvs) > 0:
+                    kv = response.kvs[0]
+                    log.debug('key-already-present', value=kv.value)
+                    if kv.value == value:
+                        reserved = True
+                        log.debug('key-already-reserved', key = kv.key, value=kv.value)
+        else:
+            newly_acquired = True
+            log.debug('key-was-absent', key=key, result=result)
+
+        # Check if reservation succeeded
+        resp = yield self.client.get(key)
+        if resp.kvs is not None and len(resp.kvs) == 1:
+            owner = resp.kvs[0].value
+            if owner == value:
+                if newly_acquired:
+                    log.debug('key-reserved', key=key, value=value, ttl=ttl,
+                             lease_id=lease.lease_id)
+                    reserved = True
+                    # Add key to reservation list
+                    self.key_reservations[key] = lease
+                else:
+                    log.debug("reservation-still-held")
+            else:
+                log.debug('reservation-held-by-another', value=owner)
+
+        if reserved:
+            err = None
+        returnValue((owner, err))
+
+    @inlineCallbacks
+    def _renew_reservation(self, key):
+        result = None
+        err = None
+        if key not in self.key_reservations:
+            err = 'key-not-reserved'
+        else:
+            lease = self.key_reservations[key]
+            # A successfully refreshed lease returns an object of type Header
+            result = yield lease.refresh()
+        if result is None:
+            err = 'lease-refresh-failed'
+        returnValue((result, err))
+
+    @inlineCallbacks
+    def _release_reservation(self, key):
+        err = None
+        if key not in self.key_reservations:
+            err = 'key-not-reserved'
+        else:
+            lease = self.key_reservations[key]
+            time_left = yield lease.remaining()
+            # A successfully revoked lease returns an object of type Header
+            log.debug('release-reservation', key=key, lease_id=lease.lease_id,
+                      time_left_in_secs=time_left)
+            result = yield lease.revoke()
+            if result is None:
+                err = 'lease-revoke-failed'
+            self.key_reservations.pop(key)
+        returnValue((result, err))
+
+    @inlineCallbacks
+    def _release_all_reservations(self):
+        err = None
+        keys_to_delete = []
+        for key in self.key_reservations:
+            lease = self.key_reservations[key]
+            time_left = yield lease.remaining()
+            # A successfully revoked lease returns an object of type Header
+            log.debug('release-reservation', key=key, lease_id=lease.lease_id,
+                      time_left_in_secs=time_left)
+            result = yield lease.revoke()
+            if result is None:
+                err = 'lease-revoke-failed'
+                log.debug('lease-revoke', result=result)
+            keys_to_delete.append(key)
+        for key in keys_to_delete:
+            self.key_reservations.pop(key)
+        returnValue(err)
diff --git a/python/common/kvstore/kv_client.py b/python/common/kvstore/kv_client.py
new file mode 100644
index 0000000..69a6480
--- /dev/null
+++ b/python/common/kvstore/kv_client.py
@@ -0,0 +1,206 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from common.utils.asleep import asleep
+from structlog import get_logger
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+log = get_logger()
+
+class KVPair():
+    def __init__(self, key, value, index):
+        self.key = key
+        self.value = value
+        self.index = index
+
+class Event():
+    PUT = 0
+    DELETE = 1
+    CONNECTION_DOWN = 2
+
+    def __init__(self, event_type, key, value):
+        self.event_type = event_type
+        self.key = key
+        self.value = value
+
+RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+DEFAULT_TIMEOUT = 0.0
+for i in range(len(RETRY_BACKOFF)):
+    DEFAULT_TIMEOUT += RETRY_BACKOFF[i]
+
+class KVClient():
+
+    def __init__(self, kv_host, kv_port):
+        self.host = kv_host
+        self.port = kv_port
+        self.key_reservations = {}
+        self.key_watches = {}
+        self.retries = 0
+        self.retry_time = 0
+
+    @inlineCallbacks
+    def get(self, key, timeout=DEFAULT_TIMEOUT):
+        '''
+        This method returns the value of the given key in KV store.
+
+        :param key: The key whose value is requested
+        :param timeout: The length of time in seconds the method will wait for a response
+        :return: (KVPair, error) where KVPair is None if an error occurred
+        '''
+        result = yield self._op_with_retry('GET', key, None, timeout)
+        returnValue(result)
+
+    @inlineCallbacks
+    def list(self, key, timeout=DEFAULT_TIMEOUT):
+        '''
+        The list method returns an array of key-value pairs all of which
+        share the same key prefix.
+
+        :param key: The key prefix
+        :param timeout: The length of time in seconds the method will wait for a response
+        :return: ([]KVPair, error) where []KVPair is a list of KVPair objects
+        '''
+        result = yield self._op_with_retry('LIST', key, None, timeout)
+        returnValue(result)
+
+    @inlineCallbacks
+    def put(self, key, value, timeout=DEFAULT_TIMEOUT):
+        '''
+        The put method writes a value to the given key in KV store.
+        Do NOT modify a reserved key in an etcd store; doing so seems
+        to nullify the TTL of the key. In other words, the key lasts
+        forever.
+
+        :param key: The key to be written to
+        :param value: The value of the key
+        :param timeout: The length of time in seconds the method will wait for a response
+        :return: error, which is set to None for a successful write
+        '''
+        _, err = yield self._op_with_retry('PUT', key, value, timeout)
+        returnValue(err)
+
+    @inlineCallbacks
+    def delete(self, key, timeout=DEFAULT_TIMEOUT):
+        '''
+        The delete method removes a key from the KV store.
+
+        :param key: The key to be deleted
+        :param timeout: The length of time in seconds the method will wait for a response
+        :return: error, which is set to None for a successful deletion
+        '''
+        _, err = yield self._op_with_retry('DELETE', key, None, timeout)
+        returnValue(err)
+
+    @inlineCallbacks
+    def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
+        '''
+        This method acts essentially like a semaphore. The underlying mechanism
+        differs depending on the KV store: etcd uses a test-and-set transaction;
+        consul uses an acquire lock. If using etcd, do NOT write to the key
+        subsequent to the initial reservation; the TTL functionality may become
+        impaired (i.e. the reservation never expires).
+
+        :param key: The key under reservation
+        :param value: The reservation owner
+        :param ttl: The time-to-live (TTL) for the reservation. The key is unreserved
+        by the KV store when the TTL expires.
+        :param timeout: The length of time in seconds the method will wait for a response
+        :return: (key_value, error) If the key is acquired, then the value returned will
+        be the value passed in.  If the key is already acquired, then the value assigned
+        to that key will be returned.
+        '''
+        result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
+        returnValue(result)
+
+    @inlineCallbacks
+    def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
+        '''
+        This method renews the reservation for a given key. A reservation expires
+        after the TTL (Time To Live) period specified when reserving the key.
+
+        :param key: The reserved key
+        :param timeout: The length of time in seconds the method will wait for a response
+        :return: error, which is set to None for a successful renewal
+        '''
+        result, err = yield self._op_with_retry('RENEW', key, None, timeout)
+        returnValue(err)
+
+    @inlineCallbacks
+    def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
+        '''
+        The release_reservation method cancels the reservation for a given key.
+
+        :param key: The reserved key
+        :param timeout: The length of time in seconds the method will wait for a response
+        :return: error, which is set to None for a successful cancellation
+        '''
+        result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
+        returnValue(err)
+
+    @inlineCallbacks
+    def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
+        '''
+        This method cancels all key reservations made previously
+        using the reserve API.
+
+        :param timeout: The length of time in seconds the method will wait for a response
+        :return: error, which is set to None for a successful cancellation
+        '''
+        result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
+        returnValue(err)
+
+    @inlineCallbacks
+    def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
+        '''
+        This method provides a watch capability for the given key. If the value of the key
+        changes or the key is deleted, then an event indicating the change is passed to
+        the given callback function.
+
+        :param key: The key to be watched
+        :param key_change_callback: The function invoked whenever the key changes
+        :param timeout: The length of time in seconds the method will wait for a response
+        :return: There is no return; key change events are passed to the callback function
+        '''
+        raise NotImplementedError('Method not implemented')
+
+    @inlineCallbacks
+    def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
+        '''
+        This method closes the watch on the given key. Once the watch is closed, key
+        change events are no longer passed to the key change callback function.
+
+        :param key: The key under watch
+        :param timeout: The length of time in seconds the method will wait for a response
+        :return: There is no return
+        '''
+        raise NotImplementedError('Method not implemented')
+
+    @inlineCallbacks
+    def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
+        raise NotImplementedError('Method not implemented')
+
+    def _backoff(self, msg):
+        wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
+        self.retry_time += wait_time
+        self.retries += 1
+        log.error(msg, next_retry_in_secs=wait_time,
+                  total_delay_in_secs = self.retry_time,
+                  retries=self.retries)
+        return asleep(wait_time)
+
+    def _clear_backoff(self):
+        if self.retries:
+            log.debug('reset-backoff', after_retries=self.retries)
+            self.retries = 0
+            self.retry_time = 0
\ No newline at end of file
diff --git a/python/common/kvstore/kvstore.py b/python/common/kvstore/kvstore.py
new file mode 100644
index 0000000..662b34d
--- /dev/null
+++ b/python/common/kvstore/kvstore.py
@@ -0,0 +1,31 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from common.kvstore.consul_client import ConsulClient
+from common.kvstore.etcd_client import EtcdClient
+
+def create_kv_client(kv_store, host, port):
+    '''
+    Factory for creating a client interface to a KV store
+
+    :param kv_store: Specify either 'etcd' or 'consul'
+    :param host: Name or IP address of host serving the KV store
+    :param port: Port number (integer) of the KV service
+    :return: Reference to newly created client interface
+    '''
+    if kv_store == 'etcd':
+        return EtcdClient(host, port)
+    elif kv_store == 'consul':
+        return ConsulClient(host, port)
+    return None
diff --git a/python/common/manhole.py b/python/common/manhole.py
new file mode 100644
index 0000000..c00c900
--- /dev/null
+++ b/python/common/manhole.py
@@ -0,0 +1,129 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import rlcompleter
+from pprint import pprint
+
+import structlog
+from twisted.conch import manhole_ssh
+from twisted.conch.manhole import ColoredManhole
+from twisted.conch.ssh import keys
+from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
+from twisted.cred.portal import Portal
+from twisted.internet import reactor
+
+log = structlog.get_logger()
+
+
+MANHOLE_SERVER_RSA_PRIVATE = './manhole_rsa_key'
+MANHOLE_SERVER_RSA_PUBLIC = './manhole_rsa_key.pub'
+
+
+def get_rsa_keys():
+    if not (os.path.exists(MANHOLE_SERVER_RSA_PUBLIC) and \
+                    os.path.exists(MANHOLE_SERVER_RSA_PRIVATE)):
+        # generate a RSA keypair
+        log.info('generate-rsa-keypair')
+        from Crypto.PublicKey import RSA
+        rsa_key = RSA.generate(1024)
+        public_key_str = rsa_key.publickey().exportKey(format='OpenSSH')
+        private_key_str = rsa_key.exportKey()
+
+        # save keys for next time
+        file(MANHOLE_SERVER_RSA_PUBLIC, 'w+b').write(public_key_str)
+        file(MANHOLE_SERVER_RSA_PRIVATE, 'w+b').write(private_key_str)
+        log.debug('saved-rsa-keypair', public=MANHOLE_SERVER_RSA_PUBLIC,
+                  private=MANHOLE_SERVER_RSA_PRIVATE)
+    else:
+        public_key_str = file(MANHOLE_SERVER_RSA_PUBLIC).read()
+        private_key_str = file(MANHOLE_SERVER_RSA_PRIVATE).read()
+    return public_key_str, private_key_str
+
+
+class ManholeWithCompleter(ColoredManhole):
+
+    def __init__(self, namespace):
+        namespace['manhole'] = self
+        super(ManholeWithCompleter, self).__init__(namespace)
+        self.last_tab = None
+        self.completer = rlcompleter.Completer(self.namespace)
+
+    def handle_TAB(self):
+        if self.last_tab != self.lineBuffer:
+            self.last_tab = self.lineBuffer
+            return
+
+        buffer = ''.join(self.lineBuffer)
+        completions = []
+        maxlen = 3
+        for c in xrange(1000):
+            candidate = self.completer.complete(buffer, c)
+            if not candidate:
+                break
+
+            if len(candidate) > maxlen:
+                maxlen = len(candidate)
+
+            completions.append(candidate)
+
+        if len(completions) == 1:
+            rest = completions[0][len(buffer):]
+            self.terminal.write(rest)
+            self.lineBufferIndex += len(rest)
+            self.lineBuffer.extend(rest)
+
+        elif len(completions):
+            maxlen += 3
+            numcols = self.width / maxlen
+            self.terminal.nextLine()
+            for idx, candidate in enumerate(completions):
+                self.terminal.write('%%-%ss' % maxlen % candidate)
+                if not ((idx + 1) % numcols):
+                    self.terminal.nextLine()
+            self.terminal.nextLine()
+            self.drawInputLine()
+
+
+class Manhole(object):
+
+    def __init__(self, port, pws, **kw):
+        kw.update(globals())
+        kw['pp'] = pprint
+
+        realm = manhole_ssh.TerminalRealm()
+        manhole = ManholeWithCompleter(kw)
+
+        def windowChanged(_, win_size):
+            manhole.terminalSize(*reversed(win_size[:2]))
+
+        realm.sessionFactory.windowChanged = windowChanged
+        realm.chainedProtocolFactory.protocolFactory = lambda _: manhole
+        portal = Portal(realm)
+        portal.registerChecker(InMemoryUsernamePasswordDatabaseDontUse(**pws))
+        factory = manhole_ssh.ConchFactory(portal)
+        public_key_str, private_key_str = get_rsa_keys()
+        factory.publicKeys = {
+            'ssh-rsa': keys.Key.fromString(public_key_str)
+        }
+        factory.privateKeys = {
+            'ssh-rsa': keys.Key.fromString(private_key_str)
+        }
+        reactor.listenTCP(port, factory, interface='localhost')
+
+
+if __name__ == '__main__':
+    Manhole(12222, dict(admin='admin'))
+    reactor.run()
diff --git a/python/common/pon_resource_manager/__init__.py b/python/common/pon_resource_manager/__init__.py
new file mode 100644
index 0000000..2d104e0
--- /dev/null
+++ b/python/common/pon_resource_manager/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation

+#

+# Licensed under the Apache License, Version 2.0 (the "License");

+# you may not use this file except in compliance with the License.

+# You may obtain a copy of the License at

+#

+# http://www.apache.org/licenses/LICENSE-2.0

+#

+# Unless required by applicable law or agreed to in writing, software

+# distributed under the License is distributed on an "AS IS" BASIS,

+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+# See the License for the specific language governing permissions and

+# limitations under the License.

diff --git a/python/common/pon_resource_manager/resource_kv_store.py b/python/common/pon_resource_manager/resource_kv_store.py
new file mode 100644
index 0000000..a1a5c14
--- /dev/null
+++ b/python/common/pon_resource_manager/resource_kv_store.py
@@ -0,0 +1,107 @@
+#

+# Copyright 2018 the original author or authors.

+#

+# Licensed under the Apache License, Version 2.0 (the "License");

+# you may not use this file except in compliance with the License.

+# You may obtain a copy of the License at

+#

+# http://www.apache.org/licenses/LICENSE-2.0

+#

+# Unless required by applicable law or agreed to in writing, software

+# distributed under the License is distributed on an "AS IS" BASIS,

+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+# See the License for the specific language governing permissions and

+# limitations under the License.

+#

+

+"""Resource KV store - interface between Resource Manager and backend store."""

+import structlog

+

+from voltha.core.config.config_backend import ConsulStore

+from voltha.core.config.config_backend import EtcdStore

+

+# KV store uses this prefix to store resource info

+PATH_PREFIX = 'resource_manager/{}'

+

+

+class ResourceKvStore(object):

+    """Implements apis to store/get/remove resource in backend store."""

+

+    def __init__(self, technology, device_id, backend, host, port):

+        """

+        Create ResourceKvStore object.

+

+        Based on backend ('consul' and 'etcd' use the host and port

+        to create the respective object.

+

+        :param technology: PON technology

+        :param device_id: OLT device id

+        :param backend: Type of backend storage (etcd or consul)

+        :param host: host ip info for backend storage

+        :param port: port for the backend storage

+        :raises exception when invalid backend store passed as an argument

+        """

+        # logger

+        self._log = structlog.get_logger()

+

+        path = PATH_PREFIX.format(technology)

+        try:

+            if backend == 'consul':

+                self._kv_store = ConsulStore(host, port, path)

+            elif backend == 'etcd':

+                self._kv_store = EtcdStore(host, port, path)

+            else:

+                self._log.error('Invalid-backend')

+                raise Exception("Invalid-backend-for-kv-store")

+        except Exception as e:

+            self._log.exception("exception-in-init")

+            raise Exception(e)

+

+    def update_to_kv_store(self, path, resource):

+        """

+        Update resource.

+

+        :param path: path to update the resource

+        :param resource: updated resource

+        """

+        try:

+            self._kv_store[path] = str(resource)

+            self._log.debug("Resource-updated-in-kv-store", path=path)

+            return True

+        except BaseException:

+            self._log.exception("Resource-update-in-kv-store-failed",

+                                path=path, resource=resource)

+        return False

+

+    def get_from_kv_store(self, path):

+        """

+        Get resource.

+

+        :param path: path to get the resource

+        """

+        resource = None

+        try:

+            resource = self._kv_store[path]

+            self._log.debug("Got-resource-from-kv-store", path=path)

+        except KeyError:

+            self._log.info("Resource-not-found-updating-resource",

+                           path=path)

+        except BaseException:

+            self._log.exception("Getting-resource-from-kv-store-failed",

+                                path=path)

+        return resource

+

+    def remove_from_kv_store(self, path):

+        """

+        Remove resource.

+

+        :param path: path to remove the resource

+        """

+        try:

+            del self._kv_store[path]

+            self._log.debug("Resource-deleted-in-kv-store", path=path)

+            return True

+        except BaseException:

+            self._log.exception("Resource-delete-in-kv-store-failed",

+                                path=path)

+        return False

diff --git a/python/common/pon_resource_manager/resource_manager.py b/python/common/pon_resource_manager/resource_manager.py
new file mode 100644
index 0000000..17b2871
--- /dev/null
+++ b/python/common/pon_resource_manager/resource_manager.py
@@ -0,0 +1,677 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Resource Manager will be unique for each OLT device.
+
+It exposes APIs to create/free alloc_ids/onu_ids/gemport_ids. Resource Manager
+uses a KV store in backend to ensure resiliency of the data.
+"""
+import json
+import structlog
+from bitstring import BitArray
+from ast import literal_eval
+import shlex
+from argparse import ArgumentParser, ArgumentError
+
+from common.pon_resource_manager.resource_kv_store import ResourceKvStore
+
+
+# Used to parse extra arguments to OpenOlt adapter from the NBI
+class OltVendorArgumentParser(ArgumentParser):
+    # Must override the exit command to prevent it from
+    # calling sys.exit().  Return exception instead.
+    def exit(self, status=0, message=None):
+        raise Exception(message)
+
+
+class PONResourceManager(object):
+    """Implements APIs to initialize/allocate/release alloc/gemport/onu IDs."""
+
+    # Constants to identify resource pool
+    ONU_ID = 'ONU_ID'
+    ALLOC_ID = 'ALLOC_ID'
+    GEMPORT_ID = 'GEMPORT_ID'
+
+    # The resource ranges for a given device vendor_type should be placed
+    # at 'resource_manager/<technology>/resource_ranges/<olt_vendor_type>'
+    # path on the KV store.
+    # If Resource Range parameters are to be read from the external KV store,
+    # they are expected to be stored in the following format.
+    # Note: All parameters are MANDATORY for now.
+    '''
+    {
+        "onu_id_start": 1,
+        "onu_id_end": 127,
+        "alloc_id_start": 1024,
+        "alloc_id_end": 2816,
+        "gemport_id_start": 1024,
+        "gemport_id_end": 8960,
+        "pon_ports": 16
+    }
+
+    '''
+    # constants used as keys to reference the resource range parameters from
+    # and external KV store.
+    ONU_START_IDX = "onu_id_start"
+    ONU_END_IDX = "onu_id_end"
+    ALLOC_ID_START_IDX = "alloc_id_start"
+    ALLOC_ID_END_IDX = "alloc_id_end"
+    GEM_PORT_ID_START_IDX = "gemport_id_start"
+    GEM_PORT_ID_END_IDX = "gemport_id_end"
+    NUM_OF_PON_PORT = "pon_ports"
+
+    # PON Resource range configuration on the KV store.
+    # Format: 'resource_manager/<technology>/resource_ranges/<olt_vendor_type>'
+    # The KV store backend is initialized with a path prefix and we need to
+    # provide only the suffix.
+    PON_RESOURCE_RANGE_CONFIG_PATH = 'resource_ranges/{}'
+
+    # resource path suffix
+    ALLOC_ID_POOL_PATH = '{}/alloc_id_pool/{}'
+    GEMPORT_ID_POOL_PATH = '{}/gemport_id_pool/{}'
+    ONU_ID_POOL_PATH = '{}/onu_id_pool/{}'
+
+    # Path on the KV store for storing list of alloc IDs for a given ONU
+    # Format: <device_id>/<(pon_intf_id, onu_id)>/alloc_ids
+    ALLOC_ID_RESOURCE_MAP_PATH = '{}/{}/alloc_ids'
+
+    # Path on the KV store for storing list of gemport IDs for a given ONU
+    # Format: <device_id>/<(pon_intf_id, onu_id)>/gemport_ids
+    GEMPORT_ID_RESOURCE_MAP_PATH = '{}/{}/gemport_ids'
+
+    # Constants for internal usage.
+    PON_INTF_ID = 'pon_intf_id'
+    START_IDX = 'start_idx'
+    END_IDX = 'end_idx'
+    POOL = 'pool'
+
+    def __init__(self, technology, extra_args, device_id,
+                 backend, host, port):
+        """
+        Create PONResourceManager object.
+
+        :param technology: PON technology
+        :param: extra_args: This string contains extra arguments passed during
+        pre-provisioning of OLT and specifies the OLT Vendor type
+        :param device_id: OLT device id
+        :param backend: backend store
+        :param host: ip of backend store
+        :param port: port on which backend store listens
+        :raises exception when invalid backend store passed as an argument
+        """
+        # logger
+        self._log = structlog.get_logger()
+
+        try:
+            self.technology = technology
+            self.extra_args = extra_args
+            self.device_id = device_id
+            self.backend = backend
+            self.host = host
+            self.port = port
+            self.olt_vendor = None
+            self._kv_store = ResourceKvStore(technology, device_id, backend,
+                                             host, port)
+            # Below attribute, pon_resource_ranges, should be initialized
+            # by reading from KV store.
+            self.pon_resource_ranges = dict()
+        except Exception as e:
+            self._log.exception("exception-in-init")
+            raise Exception(e)
+
+    def init_resource_ranges_from_kv_store(self):
+        """
+        Initialize PON resource ranges with config fetched from kv store.
+
+        :return boolean: True if PON resource ranges initialized else false
+        """
+        self.olt_vendor = self._get_olt_vendor()
+        # Try to initialize the PON Resource Ranges from KV store based on the
+        # OLT vendor key, if available
+        if self.olt_vendor is None:
+            self._log.info("olt-vendor-unavailable--not-reading-from-kv-store")
+            return False
+
+        path = self.PON_RESOURCE_RANGE_CONFIG_PATH.format(self.olt_vendor)
+        try:
+            # get resource from kv store
+            result = self._kv_store.get_from_kv_store(path)
+
+            if result is None:
+                self._log.debug("resource-range-config-unavailable-on-kvstore")
+                return False
+
+            resource_range_config = result
+
+            if resource_range_config is not None:
+                self.pon_resource_ranges = json.loads(resource_range_config)
+                self._log.debug("Init-resource-ranges-from-kvstore-success",
+                                pon_resource_ranges=self.pon_resource_ranges,
+                                path=path)
+                return True
+
+        except Exception as e:
+            self._log.exception("error-initializing-resource-range-from-kv-store",
+                                e=e)
+        return False
+
+    def init_default_pon_resource_ranges(self, onu_start_idx=1,
+                                         onu_end_idx=127,
+                                         alloc_id_start_idx=1024,
+                                         alloc_id_end_idx=2816,
+                                         gem_port_id_start_idx=1024,
+                                         gem_port_id_end_idx=8960,
+                                         num_of_pon_ports=16):
+        """
+        Initialize default PON resource ranges
+
+        :param onu_start_idx: onu id start index
+        :param onu_end_idx: onu id end index
+        :param alloc_id_start_idx: alloc id start index
+        :param alloc_id_end_idx: alloc id end index
+        :param gem_port_id_start_idx: gemport id start index
+        :param gem_port_id_end_idx: gemport id end index
+        :param num_of_pon_ports: number of PON ports
+        """
+        self._log.info("initialize-default-resource-range-values")
+        self.pon_resource_ranges[
+            PONResourceManager.ONU_START_IDX] = onu_start_idx
+        self.pon_resource_ranges[PONResourceManager.ONU_END_IDX] = onu_end_idx
+        self.pon_resource_ranges[
+            PONResourceManager.ALLOC_ID_START_IDX] = alloc_id_start_idx
+        self.pon_resource_ranges[
+            PONResourceManager.ALLOC_ID_END_IDX] = alloc_id_end_idx
+        self.pon_resource_ranges[
+            PONResourceManager.GEM_PORT_ID_START_IDX] = gem_port_id_start_idx
+        self.pon_resource_ranges[
+            PONResourceManager.GEM_PORT_ID_END_IDX] = gem_port_id_end_idx
+        self.pon_resource_ranges[
+            PONResourceManager.NUM_OF_PON_PORT] = num_of_pon_ports
+
+    def init_device_resource_pool(self):
+        """
+        Initialize resource pool for all PON ports.
+        """
+        i = 0
+        while i < self.pon_resource_ranges[PONResourceManager.NUM_OF_PON_PORT]:
+            self.init_resource_id_pool(
+                pon_intf_id=i,
+                resource_type=PONResourceManager.ONU_ID,
+                start_idx=self.pon_resource_ranges[
+                    PONResourceManager.ONU_START_IDX],
+                end_idx=self.pon_resource_ranges[
+                    PONResourceManager.ONU_END_IDX])
+
+            i += 1
+
+        # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+        # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+        # for both the resource types. This may need to change later.
+        self.init_resource_id_pool(
+            pon_intf_id=0,
+            resource_type=PONResourceManager.ALLOC_ID,
+            start_idx=self.pon_resource_ranges[
+                PONResourceManager.ALLOC_ID_START_IDX],
+            end_idx=self.pon_resource_ranges[
+                PONResourceManager.ALLOC_ID_END_IDX])
+
+        self.init_resource_id_pool(
+            pon_intf_id=0,
+            resource_type=PONResourceManager.GEMPORT_ID,
+            start_idx=self.pon_resource_ranges[
+                PONResourceManager.GEM_PORT_ID_START_IDX],
+            end_idx=self.pon_resource_ranges[
+                PONResourceManager.GEM_PORT_ID_END_IDX])
+
+    def clear_device_resource_pool(self):
+        """
+        Clear resource pool of all PON ports.
+        """
+        i = 0
+        while i < self.pon_resource_ranges[PONResourceManager.NUM_OF_PON_PORT]:
+            self.clear_resource_id_pool(
+                pon_intf_id=i,
+                resource_type=PONResourceManager.ONU_ID,
+            )
+            i += 1
+
+        self.clear_resource_id_pool(
+            pon_intf_id=0,
+            resource_type=PONResourceManager.ALLOC_ID,
+        )
+
+        self.clear_resource_id_pool(
+            pon_intf_id=0,
+            resource_type=PONResourceManager.GEMPORT_ID,
+        )
+
+    def init_resource_id_pool(self, pon_intf_id, resource_type, start_idx,
+                              end_idx):
+        """
+        Initialize Resource ID pool for a given Resource Type on a given PON Port
+
+        :param pon_intf_id: OLT PON interface id
+        :param resource_type: String to identify type of resource
+        :param start_idx: start index for onu id pool
+        :param end_idx: end index for onu id pool
+        :return boolean: True if resource id pool initialized else false
+        """
+        status = False
+        path = self._get_path(pon_intf_id, resource_type)
+        if path is None:
+            return status
+
+        try:
+            # In case of adapter reboot and reconciliation resource in kv store
+            # checked for its presence if not kv store update happens
+            resource = self._get_resource(path)
+
+            if resource is not None:
+                self._log.info("Resource-already-present-in-store", path=path)
+                status = True
+            else:
+                resource = self._format_resource(pon_intf_id, start_idx,
+                                                 end_idx)
+                self._log.info("Resource-initialized", path=path)
+
+                # Add resource as json in kv store.
+                result = self._kv_store.update_to_kv_store(path, resource)
+                if result is True:
+                    status = True
+
+        except Exception as e:
+            self._log.exception("error-initializing-resource-pool", e=e)
+
+        return status
+
+    def get_resource_id(self, pon_intf_id, resource_type, num_of_id=1):
+        """
+        Create alloc/gemport/onu id for given OLT PON interface.
+
+        :param pon_intf_id: OLT PON interface id
+        :param resource_type: String to identify type of resource
+        :param num_of_id: required number of ids
+        :return list/int/None: list, int or None if resource type is
+                               alloc_id/gemport_id, onu_id or invalid type
+                               respectively
+        """
+        result = None
+
+        # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+        # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+        # for both the resource types. This may need to change later.
+        # Override the incoming pon_intf_id to PON0
+        if resource_type == PONResourceManager.GEMPORT_ID or \
+                resource_type == PONResourceManager.ALLOC_ID:
+            pon_intf_id = 0
+
+        path = self._get_path(pon_intf_id, resource_type)
+        if path is None:
+            return result
+
+        try:
+            resource = self._get_resource(path)
+            if resource is not None and resource_type == \
+                    PONResourceManager.ONU_ID:
+                result = self._generate_next_id(resource)
+            elif resource is not None and (
+                    resource_type == PONResourceManager.GEMPORT_ID or
+                    resource_type == PONResourceManager.ALLOC_ID):
+                result = list()
+                while num_of_id > 0:
+                    result.append(self._generate_next_id(resource))
+                    num_of_id -= 1
+            else:
+                raise Exception("get-resource-failed")
+
+            self._log.debug("Get-" + resource_type + "-success", result=result,
+                            path=path)
+            # Update resource in kv store
+            self._update_resource(path, resource)
+
+        except Exception as e:
+            self._log.exception("Get-" + resource_type + "-id-failed",
+                                path=path, e=e)
+        return result
+
+    def free_resource_id(self, pon_intf_id, resource_type, release_content):
+        """
+        Release alloc/gemport/onu id for given OLT PON interface.
+
+        :param pon_intf_id: OLT PON interface id
+        :param resource_type: String to identify type of resource
+        :param release_content: required number of ids
+        :return boolean: True if all IDs in given release_content released
+                         else False
+        """
+        status = False
+
+        # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+        # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+        # for both the resource types. This may need to change later.
+        # Override the incoming pon_intf_id to PON0
+        if resource_type == PONResourceManager.GEMPORT_ID or \
+                resource_type == PONResourceManager.ALLOC_ID:
+            pon_intf_id = 0
+
+        path = self._get_path(pon_intf_id, resource_type)
+        if path is None:
+            return status
+
+        try:
+            resource = self._get_resource(path)
+            if resource is not None and resource_type == \
+                    PONResourceManager.ONU_ID:
+                self._release_id(resource, release_content)
+            elif resource is not None and (
+                    resource_type == PONResourceManager.ALLOC_ID or
+                    resource_type == PONResourceManager.GEMPORT_ID):
+                for content in release_content:
+                    self._release_id(resource, content)
+            else:
+                raise Exception("get-resource-failed")
+
+            self._log.debug("Free-" + resource_type + "-success", path=path)
+
+            # Update resource in kv store
+            status = self._update_resource(path, resource)
+
+        except Exception as e:
+            self._log.exception("Free-" + resource_type + "-failed",
+                                path=path, e=e)
+        return status
+
+    def clear_resource_id_pool(self, pon_intf_id, resource_type):
+        """
+        Clear Resource Pool for a given Resource Type on a given PON Port.
+
+        :return boolean: True if removed else False
+        """
+        path = self._get_path(pon_intf_id, resource_type)
+        if path is None:
+            return False
+
+        try:
+            result = self._kv_store.remove_from_kv_store(path)
+            if result is True:
+                self._log.debug("Resource-pool-cleared",
+                                device_id=self.device_id,
+                                path=path)
+                return True
+        except Exception as e:
+            self._log.exception("error-clearing-resource-pool", e=e)
+
+        self._log.error("Clear-resource-pool-failed", device_id=self.device_id,
+                        path=path)
+        return False
+
+    def init_resource_map(self, pon_intf_onu_id):
+        """
+        Initialize resource map
+
+        :param pon_intf_onu_id: reference of PON interface id and onu id
+        """
+        # initialize pon_intf_onu_id tuple to alloc_ids map
+        alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+            self.device_id, str(pon_intf_onu_id)
+        )
+        alloc_ids = list()
+        self._kv_store.update_to_kv_store(
+            alloc_id_path, json.dumps(alloc_ids)
+        )
+
+        # initialize pon_intf_onu_id tuple to gemport_ids map
+        gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+            self.device_id, str(pon_intf_onu_id)
+        )
+        gemport_ids = list()
+        self._kv_store.update_to_kv_store(
+            gemport_id_path, json.dumps(gemport_ids)
+        )
+
+    def remove_resource_map(self, pon_intf_onu_id):
+        """
+        Remove resource map
+
+        :param pon_intf_onu_id: reference of PON interface id and onu id
+        """
+        # remove pon_intf_onu_id tuple to alloc_ids map
+        alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+            self.device_id, str(pon_intf_onu_id)
+        )
+        self._kv_store.remove_from_kv_store(alloc_id_path)
+
+        # remove pon_intf_onu_id tuple to gemport_ids map
+        gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+            self.device_id, str(pon_intf_onu_id)
+        )
+        self._kv_store.remove_from_kv_store(gemport_id_path)
+
+    def get_current_alloc_ids_for_onu(self, pon_intf_onu_id):
+        """
+        Get currently configured alloc ids for given pon_intf_onu_id
+
+        :param pon_intf_onu_id: reference of PON interface id and onu id
+        """
+        path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+            self.device_id,
+            str(pon_intf_onu_id))
+        value = self._kv_store.get_from_kv_store(path)
+        if value is not None:
+            alloc_id_list = json.loads(value)
+            if len(alloc_id_list) > 0:
+                return alloc_id_list
+
+        return None
+
+    def get_current_gemport_ids_for_onu(self, pon_intf_onu_id):
+        """
+        Get currently configured gemport ids for given pon_intf_onu_id
+
+        :param pon_intf_onu_id: reference of PON interface id and onu id
+        """
+
+        path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+            self.device_id,
+            str(pon_intf_onu_id))
+        value = self._kv_store.get_from_kv_store(path)
+        if value is not None:
+            gemport_id_list = json.loads(value)
+            if len(gemport_id_list) > 0:
+                return gemport_id_list
+
+        return None
+
+    def update_alloc_ids_for_onu(self, pon_intf_onu_id, alloc_ids):
+        """
+        Update currently configured alloc ids for given pon_intf_onu_id
+
+        :param pon_intf_onu_id: reference of PON interface id and onu id
+        """
+        path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+            self.device_id, str(pon_intf_onu_id)
+        )
+        self._kv_store.update_to_kv_store(
+            path, json.dumps(alloc_ids)
+        )
+
+    def update_gemport_ids_for_onu(self, pon_intf_onu_id, gemport_ids):
+        """
+        Update currently configured gemport ids for given pon_intf_onu_id
+
+        :param pon_intf_onu_id: reference of PON interface id and onu id
+        """
+        path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+            self.device_id, str(pon_intf_onu_id)
+        )
+        self._kv_store.update_to_kv_store(
+            path, json.dumps(gemport_ids)
+        )
+
+    def _get_olt_vendor(self):
+        """
+        Get olt vendor variant
+
+        :return: type of olt vendor
+        """
+        olt_vendor = None
+        if self.extra_args and len(self.extra_args) > 0:
+            parser = OltVendorArgumentParser(add_help=False)
+            parser.add_argument('--olt_vendor', '-o', action='store',
+                                choices=['default', 'asfvolt16', 'cigolt24'],
+                                default='default')
+            try:
+                args = parser.parse_args(shlex.split(self.extra_args))
+                self._log.debug('parsing-extra-arguments', args=args)
+                olt_vendor = args.olt_vendor
+            except ArgumentError as e:
+                self._log.exception('invalid-arguments: {}', e=e)
+            except Exception as e:
+                self._log.exception('option-parsing-error: {}', e=e)
+
+        return olt_vendor
+
+    def _generate_next_id(self, resource):
+        """
+        Generate unique id having OFFSET as start index.
+
+        :param resource: resource used to generate ID
+        :return int: generated id
+        """
+        pos = resource[PONResourceManager.POOL].find('0b0')
+        resource[PONResourceManager.POOL].set(1, pos)
+        return pos[0] + resource[PONResourceManager.START_IDX]
+
+    def _release_id(self, resource, unique_id):
+        """
+        Release unique id having OFFSET as start index.
+
+        :param resource: resource used to release ID
+        :param unique_id: id need to be released
+        """
+        pos = ((int(unique_id)) - resource[PONResourceManager.START_IDX])
+        resource[PONResourceManager.POOL].set(0, pos)
+
+    def _get_path(self, pon_intf_id, resource_type):
+        """
+        Get path for given resource type.
+
+        :param pon_intf_id: OLT PON interface id
+        :param resource_type: String to identify type of resource
+        :return: path for given resource type
+        """
+        path = None
+        if resource_type == PONResourceManager.ONU_ID:
+            path = self._get_onu_id_resource_path(pon_intf_id)
+        elif resource_type == PONResourceManager.ALLOC_ID:
+            path = self._get_alloc_id_resource_path(pon_intf_id)
+        elif resource_type == PONResourceManager.GEMPORT_ID:
+            path = self._get_gemport_id_resource_path(pon_intf_id)
+        else:
+            self._log.error("invalid-resource-pool-identifier")
+        return path
+
+    def _get_alloc_id_resource_path(self, pon_intf_id):
+        """
+        Get alloc id resource path.
+
+        :param pon_intf_id: OLT PON interface id
+        :return: alloc id resource path
+        """
+        return PONResourceManager.ALLOC_ID_POOL_PATH.format(
+            self.device_id, pon_intf_id)
+
+    def _get_gemport_id_resource_path(self, pon_intf_id):
+        """
+        Get gemport id resource path.
+
+        :param pon_intf_id: OLT PON interface id
+        :return: gemport id resource path
+        """
+        return PONResourceManager.GEMPORT_ID_POOL_PATH.format(
+            self.device_id, pon_intf_id)
+
+    def _get_onu_id_resource_path(self, pon_intf_id):
+        """
+        Get onu id resource path.
+
+        :param pon_intf_id: OLT PON interface id
+        :return: onu id resource path
+        """
+        return PONResourceManager.ONU_ID_POOL_PATH.format(
+            self.device_id, pon_intf_id)
+
+    def _update_resource(self, path, resource):
+        """
+        Update resource in resource kv store.
+
+        :param path: path to update resource
+        :param resource: resource need to be updated
+        :return boolean: True if resource updated in kv store else False
+        """
+        resource[PONResourceManager.POOL] = \
+            resource[PONResourceManager.POOL].bin
+        result = self._kv_store.update_to_kv_store(path, json.dumps(resource))
+        if result is True:
+            return True
+        return False
+
+    def _get_resource(self, path):
+        """
+        Get resource from kv store.
+
+        :param path: path to get resource
+        :return: resource if resource present in kv store else None
+        """
+        # get resource from kv store
+        result = self._kv_store.get_from_kv_store(path)
+        if result is None:
+            return result
+        self._log.info("dumping resource", result=result)
+        resource = result
+
+        if resource is not None:
+            # decode resource fetched from backend store to dictionary
+            resource = json.loads(resource)
+
+            # resource pool in backend store stored as binary string whereas to
+            # access the pool to generate/release IDs it need to be converted
+            # as BitArray
+            resource[PONResourceManager.POOL] = \
+                BitArray('0b' + resource[PONResourceManager.POOL])
+
+        return resource
+
+    def _format_resource(self, pon_intf_id, start_idx, end_idx):
+        """
+        Format resource as json.
+
+        :param pon_intf_id: OLT PON interface id
+        :param start_idx: start index for id pool
+        :param end_idx: end index for id pool
+        :return dictionary: resource formatted as dictionary
+        """
+        # Format resource as json to be stored in backend store
+        resource = dict()
+        resource[PONResourceManager.PON_INTF_ID] = pon_intf_id
+        resource[PONResourceManager.START_IDX] = start_idx
+        resource[PONResourceManager.END_IDX] = end_idx
+
+        # resource pool stored in backend store as binary string
+        resource[PONResourceManager.POOL] = BitArray(end_idx).bin
+
+        return json.dumps(resource)
diff --git a/python/common/structlog_setup.py b/python/common/structlog_setup.py
new file mode 100644
index 0000000..cbbda89
--- /dev/null
+++ b/python/common/structlog_setup.py
@@ -0,0 +1,132 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Setting up proper logging for Voltha"""
+
+import logging
+import logging.config
+from collections import OrderedDict
+
+import structlog
+from structlog.stdlib import BoundLogger, INFO
+
+try:
+    from thread import get_ident as _get_ident
+except ImportError:
+    from dummy_thread import get_ident as _get_ident
+
+
+class StructuredLogRenderer(object):
+    def __call__(self, logger, name, event_dict):
+        # in order to keep structured log data in event_dict to be forwarded as
+        # is, we need to pass it into the logger framework as the first
+        # positional argument.
+        args = (event_dict,)
+        kwargs = {}
+        return args, kwargs
+
+
+class PlainRenderedOrderedDict(OrderedDict):
+    """Our special version of OrderedDict that renders into string as a dict,
+       to make the log stream output cleaner.
+    """
+    def __repr__(self, _repr_running={}):
+        'od.__repr__() <==> repr(od)'
+        call_key = id(self), _get_ident()
+        if call_key in _repr_running:
+            return '...'
+        _repr_running[call_key] = 1
+        try:
+            if not self:
+                return '{}'
+            return '{%s}' % ", ".join("%s: %s" % (k, v)
+                                      for k, v in self.items())
+        finally:
+            del _repr_running[call_key]
+
+
+def setup_logging(log_config, instance_id, verbosity_adjust=0):
+    """
+    Set up logging such that:
+    - The primary logging entry method is structlog
+      (see http://structlog.readthedocs.io/en/stable/index.html)
+    - By default, the logging backend is Python standard lib logger
+    """
+
+    def add_exc_info_flag_for_exception(_, name, event_dict):
+        if name == 'exception':
+            event_dict['exc_info'] = True
+        return event_dict
+
+    def add_instance_id(_, __, event_dict):
+        event_dict['instance_id'] = instance_id
+        return event_dict
+
+    # Configure standard logging
+    logging.config.dictConfig(log_config)
+    logging.root.level -= 10 * verbosity_adjust
+
+    processors = [
+        add_exc_info_flag_for_exception,
+        structlog.processors.StackInfoRenderer(),
+        structlog.processors.format_exc_info,
+        add_instance_id,
+        StructuredLogRenderer(),
+    ]
+    structlog.configure(logger_factory=structlog.stdlib.LoggerFactory(),
+                        context_class=PlainRenderedOrderedDict,
+                        wrapper_class=BoundLogger,
+                        processors=processors)
+
+    # Mark first line of log
+    log = structlog.get_logger()
+    log.info("first-line")
+    return log
+
+
+def update_logging(instance_id, vcore_id):
+    """
+    Add the vcore id to the structured logger
+    :param vcore_id:  The assigned vcore id
+    :return: structure logger
+    """
+    def add_exc_info_flag_for_exception(_, name, event_dict):
+        if name == 'exception':
+            event_dict['exc_info'] = True
+        return event_dict
+
+    def add_instance_id(_, __, event_dict):
+        event_dict['instance_id'] = instance_id
+        return event_dict
+
+    def add_vcore_id(_, __, event_dict):
+        event_dict['vcore_id'] = vcore_id
+        return event_dict
+
+    processors = [
+        add_exc_info_flag_for_exception,
+        structlog.processors.StackInfoRenderer(),
+        structlog.processors.format_exc_info,
+        add_instance_id,
+        add_vcore_id,
+        StructuredLogRenderer(),
+    ]
+    structlog.configure(processors=processors)
+
+    # Mark first line of log
+    log = structlog.get_logger()
+    log.info("updated-logger")
+    return log
diff --git a/python/common/utils/__init__.py b/python/common/utils/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/common/utils/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/common/utils/asleep.py b/python/common/utils/asleep.py
new file mode 100644
index 0000000..10d1ce3
--- /dev/null
+++ b/python/common/utils/asleep.py
@@ -0,0 +1,31 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Async sleep (asleep) method and other twisted goodies """
+
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+
+
+def asleep(dt):
+    """
+    Async (event driven) wait for given time period (in seconds)
+    :param dt: Delay in seconds
+    :return: Deferred to be fired with value None when time expires.
+    """
+    d = Deferred()
+    reactor.callLater(dt, lambda: d.callback(None))
+    return d
diff --git a/python/common/utils/consulhelpers.py b/python/common/utils/consulhelpers.py
new file mode 100644
index 0000000..df4dd58
--- /dev/null
+++ b/python/common/utils/consulhelpers.py
@@ -0,0 +1,178 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Some consul related convenience functions
+"""
+
+from structlog import get_logger
+from consul import Consul
+from random import randint
+from common.utils.nethelpers import get_my_primary_local_ipv4
+
+log = get_logger()
+
+
+def connect_to_consult(consul_endpoint):
+    log.debug('getting-service-endpoint', consul=consul_endpoint)
+
+    host = consul_endpoint.split(':')[0].strip()
+    port = int(consul_endpoint.split(':')[1].strip())
+
+    return Consul(host=host, port=port)
+
+
+def verify_all_services_healthy(consul_endpoint, service_name=None,
+                                number_of_expected_services=None):
+    """
+    Verify in consul if any service is healthy
+    :param consul_endpoint: a <host>:<port> string
+    :param service_name: name of service to check, optional
+    :param number_of_expected_services number of services to check for, optional
+    :return: true if healthy, false otherwise
+    """
+
+    def check_health(service):
+        _, serv_health = consul.health.service(service, passing=True)
+        return not serv_health == []
+
+    consul = connect_to_consult(consul_endpoint)
+
+    if service_name is not None:
+        return check_health(service_name)
+
+    services = get_all_services(consul_endpoint)
+
+    items = services.keys()
+
+    if number_of_expected_services is not None and \
+                    len(items) != number_of_expected_services:
+        return False
+
+    for item in items:
+        if not check_health(item):
+            return False
+
+    return True
+
+
+def get_all_services(consul_endpoint):
+    log.debug('getting-service-verify-health')
+
+    consul = connect_to_consult(consul_endpoint)
+    _, services = consul.catalog.services()
+
+    return services
+
+
+def get_all_instances_of_service(consul_endpoint, service_name):
+    log.debug('getting-all-instances-of-service', service=service_name)
+
+    consul = connect_to_consult(consul_endpoint)
+    _, services = consul.catalog.service(service_name)
+
+    for service in services:
+        log.debug('service',
+                  name=service['ServiceName'],
+                  serviceid=service['ServiceID'],
+                  serviceport=service['ServicePort'],
+                  createindex=service['CreateIndex'])
+
+    return services
+
+
+def get_endpoint_from_consul(consul_endpoint, service_name):
+    """
+    Get endpoint of service_name from consul.
+    :param consul_endpoint: a <host>:<port> string
+    :param service_name: name of service for which endpoint
+                         needs to be found.
+    :return: service endpoint if available, else exit.
+    """
+    log.debug('getting-service-info', service=service_name)
+
+    consul = connect_to_consult(consul_endpoint)
+    _, services = consul.catalog.service(service_name)
+
+    if len(services) == 0:
+        raise Exception(
+            'Cannot find service {} in consul'.format(service_name))
+        os.exit(1)
+
+    """ Get host IPV4 address
+    """
+    local_ipv4 = get_my_primary_local_ipv4()
+    """ If host IP address from where the request came in matches
+        the IP address of the requested service's host IP address,
+        pick the endpoint
+    """
+    for i in range(len(services)):
+        service = services[i]
+        if service['ServiceAddress'] == local_ipv4:
+            log.debug("picking address locally")
+            endpoint = '{}:{}'.format(service['ServiceAddress'],
+                                      service['ServicePort'])
+            return endpoint
+
+    """ If service is not available locally, picak a random
+        endpoint for the service from the list
+    """
+    service = services[randint(0, len(services) - 1)]
+    endpoint = '{}:{}'.format(service['ServiceAddress'],
+                              service['ServicePort'])
+
+    return endpoint
+
+
+def get_healthy_instances(consul_endpoint, service_name=None,
+                          number_of_expected_services=None):
+    """
+    Verify in consul if any service is healthy
+    :param consul_endpoint: a <host>:<port> string
+    :param service_name: name of service to check, optional
+    :param number_of_expected_services number of services to check for, optional
+    :return: true if healthy, false otherwise
+    """
+
+    def check_health(service):
+        _, serv_health = consul.health.service(service, passing=True)
+        return not serv_health == []
+
+    consul = connect_to_consult(consul_endpoint)
+
+    if service_name is not None:
+        return check_health(service_name)
+
+    services = get_all_services(consul_endpoint)
+
+    items = services.keys()
+
+    if number_of_expected_services is not None and \
+                    len(items) != number_of_expected_services:
+        return False
+
+    for item in items:
+        if not check_health(item):
+            return False
+
+    return True
+
+
+if __name__ == '__main__':
+    # print get_endpoint_from_consul('10.100.198.220:8500', 'kafka')
+    # print get_healthy_instances('10.100.198.220:8500', 'voltha-health')
+    # print get_healthy_instances('10.100.198.220:8500')
+    get_all_instances_of_service('10.100.198.220:8500', 'voltha-grpc')
diff --git a/python/common/utils/deferred_utils.py b/python/common/utils/deferred_utils.py
new file mode 100644
index 0000000..3c55c1a
--- /dev/null
+++ b/python/common/utils/deferred_utils.py
@@ -0,0 +1,56 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.internet.error import AlreadyCalled
+
+
+class TimeOutError(Exception): pass
+
+
+class DeferredWithTimeout(Deferred):
+    """
+    Deferred with a timeout. If neither the callback nor the errback method
+    is not called within the given time, the deferred's errback will be called
+    with a TimeOutError() exception.
+
+    All other uses are the same as of Deferred().
+    """
+    def __init__(self, timeout=1.0):
+        Deferred.__init__(self)
+        self._timeout = timeout
+        self.timer = reactor.callLater(timeout, self.timed_out)
+
+    def timed_out(self):
+        self.errback(
+            TimeOutError('timed out after {} seconds'.format(self._timeout)))
+
+    def callback(self, result):
+        self._cancel_timer()
+        return Deferred.callback(self, result)
+
+    def errback(self, fail):
+        self._cancel_timer()
+        return Deferred.errback(self, fail)
+
+    def cancel(self):
+        self._cancel_timer()
+        return Deferred.cancel(self)
+
+    def _cancel_timer(self):
+        try:
+            self.timer.cancel()
+        except AlreadyCalled:
+            pass
+
diff --git a/python/common/utils/dockerhelpers.py b/python/common/utils/dockerhelpers.py
new file mode 100644
index 0000000..4620aef
--- /dev/null
+++ b/python/common/utils/dockerhelpers.py
@@ -0,0 +1,75 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Some docker related convenience functions
+"""
+from datetime import datetime
+from concurrent.futures import ThreadPoolExecutor
+
+import os
+import socket
+from structlog import get_logger
+
+from docker import Client, errors
+
+
+docker_socket = os.environ.get('DOCKER_SOCK', 'unix://tmp/docker.sock')
+log = get_logger()
+
+def get_my_containers_name():
+    """
+    Return the docker containers name in which this process is running.
+    To look up the container name, we use the container ID extracted from the
+    $HOSTNAME environment variable (which is set by docker conventions).
+    :return: String with the docker container name (or None if any issue is
+             encountered)
+    """
+    my_container_id = os.environ.get('HOSTNAME', None)
+
+    try:
+        docker_cli = Client(base_url=docker_socket)
+        info = docker_cli.inspect_container(my_container_id)
+
+    except Exception, e:
+        log.exception('failed', my_container_id=my_container_id, e=e)
+        raise
+
+    name = info['Name'].lstrip('/')
+
+    return name
+
+def get_all_running_containers():
+    try:
+        docker_cli = Client(base_url=docker_socket)
+        containers = docker_cli.containers()
+
+    except Exception, e:
+        log.exception('failed', e=e)
+        raise
+
+    return containers
+
+def inspect_container(id):
+    try:
+        docker_cli = Client(base_url=docker_socket)
+        info = docker_cli.inspect_container(id)
+    except Exception, e:
+        log.exception('failed-inspect-container', id=id, e=e)
+        raise
+
+    return info
+
diff --git a/python/common/utils/grpc_utils.py b/python/common/utils/grpc_utils.py
new file mode 100644
index 0000000..8df630e
--- /dev/null
+++ b/python/common/utils/grpc_utils.py
@@ -0,0 +1,109 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Utilities to handle gRPC server and client side code in a Twisted environment
+"""
+import structlog
+from concurrent.futures import Future
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.python.threadable import isInIOThread
+
+
+log = structlog.get_logger()
+
+
+def twisted_async(func):
+    """
+    This decorator can be used to implement a gRPC method on the twisted
+    thread, allowing asynchronous programming in Twisted while serving
+    a gRPC call.
+
+    gRPC methods normally are called on the futures.ThreadPool threads,
+    so these methods cannot directly use Twisted protocol constructs.
+    If the implementation of the methods needs to touch Twisted, it is
+    safer (or mandatory) to wrap the method with this decorator, which will
+    call the inner method from the external thread and ensure that the
+    result is passed back to the foreign thread.
+
+    Example usage:
+
+    When implementing a gRPC server, typical pattern is:
+
+    class SpamService(SpamServicer):
+
+        def GetBadSpam(self, request, context):
+            '''this is called from a ThreadPoolExecutor thread'''
+            # generally unsafe to make Twisted calls
+
+        @twisted_async
+        def GetSpamSafely(self, request, context):
+            '''this method now is executed on the Twisted main thread
+            # safe to call any Twisted protocol functions
+
+        @twisted_async
+        @inlineCallbacks
+        def GetAsyncSpam(self, request, context):
+            '''this generator can use inlineCallbacks Twisted style'''
+            result = yield some_async_twisted_call(request)
+            returnValue(result)
+
+    """
+    def in_thread_wrapper(*args, **kw):
+
+        if isInIOThread():
+
+            return func(*args, **kw)
+
+        f = Future()
+
+        def twisted_wrapper():
+            try:
+                d = func(*args, **kw)
+                if isinstance(d, Deferred):
+
+                    def _done(result):
+                        f.set_result(result)
+                        f.done()
+
+                    def _error(e):
+                        f.set_exception(e)
+                        f.done()
+
+                    d.addCallback(_done)
+                    d.addErrback(_error)
+
+                else:
+                    f.set_result(d)
+                    f.done()
+
+            except Exception, e:
+                f.set_exception(e)
+                f.done()
+
+        reactor.callFromThread(twisted_wrapper)
+        try:
+            result = f.result()
+        except Exception, e:
+            log.exception(e=e, func=func, args=args, kw=kw)
+            raise
+
+        return result
+
+    return in_thread_wrapper
+
+
diff --git a/python/common/utils/id_generation.py b/python/common/utils/id_generation.py
new file mode 100644
index 0000000..e0fea1c
--- /dev/null
+++ b/python/common/utils/id_generation.py
@@ -0,0 +1,116 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# """ ID generation utils """
+
+from uuid import uuid4
+
+
+BROADCAST_CORE_ID=hex(0xFFFF)[2:]
+
+def get_next_core_id(current_id_in_hex_str):
+    """
+    :param current_id_in_hex_str: a hex string of the maximum core id 
+    assigned without the leading 0x characters
+    :return: current_id_in_hex_str + 1 in hex string 
+    """
+    if not current_id_in_hex_str or current_id_in_hex_str == '':
+        return '0001'
+    else:
+        return format(int(current_id_in_hex_str, 16) + 1, '04x')
+
+
+def create_cluster_logical_device_ids(core_id, switch_id):
+    """
+    Creates a logical device id and an OpenFlow datapath id that is unique 
+    across the Voltha cluster.
+    The returned logical device id  represents a 64 bits integer where the
+    lower 48 bits is the switch id and the upper 16 bits is the core id.   For
+    the datapath id the core id is set to '0000' as it is not used for voltha
+    core routing
+    :param core_id: string
+    :param switch_id:int
+    :return: cluster logical device id and OpenFlow datapath id
+    """
+    switch_id = format(switch_id, '012x')
+    core_in_hex=format(int(core_id, 16), '04x')
+    ld_id = '{}{}'.format(core_in_hex[-4:], switch_id[-12:])
+    dpid_id = '{}{}'.format('0000', switch_id[-12:])
+    return ld_id, int(dpid_id, 16)
+
+def is_broadcast_core_id(id):
+    assert id and len(id) == 16
+    return id[:4] == BROADCAST_CORE_ID
+
+def create_empty_broadcast_id():
+    """
+    Returns an empty broadcast id (ffff000000000000). The id is used to
+    dispatch xPON objects across all the Voltha instances.
+    :return: An empty broadcast id
+    """
+    return '{}{}'.format(BROADCAST_CORE_ID, '0'*12)
+
+def create_cluster_id():
+    """
+    Returns an id that is common across all voltha instances.  The id  
+    is a str of 64 bits.  The lower 48 bits refers to an id specific to that 
+    object while the upper 16 bits refers a broadcast core_id
+    :return: An common id across all Voltha instances
+    """
+    return '{}{}'.format(BROADCAST_CORE_ID, uuid4().hex[:12])
+
+def create_cluster_device_id(core_id):
+    """
+    Creates a device id that is unique across the Voltha cluster.
+    The device id is a str of 64 bits.  The lower 48 bits refers to the 
+    device id while the upper 16 bits refers to the core id.
+    :param core_id: string
+    :return: cluster device id
+    """
+    return '{}{}'.format(format(int(core_id), '04x'), uuid4().hex[:12])
+
+
+def get_core_id_from_device_id(device_id):
+    # Device id is a string and the first 4 characters represent the core_id
+    assert device_id and len(device_id) == 16
+    # Get the leading 4 hexs and remove leading 0's
+    return device_id[:4]
+
+
+def get_core_id_from_logical_device_id(logical_device_id):
+    """ 
+    Logical Device id is a string and the first 4 characters represent the 
+    core_id
+    :param logical_device_id: 
+    :return: core_id string
+    """
+    assert logical_device_id and len(logical_device_id) == 16
+    # Get the leading 4 hexs and remove leading 0's
+    return logical_device_id[:4]
+
+
+def get_core_id_from_datapath_id(datapath_id):
+    """
+    datapath id is a uint64 where:
+        - low 48 bits -> switch_id
+        - high 16 bits -> core id
+    :param datapath_id: 
+    :return: core_id string
+    """
+    assert datapath_id
+    # Get the hex string and remove the '0x' prefix
+    id_in_hex_str = hex(datapath_id)[2:]
+    assert len(id_in_hex_str) > 12
+    return id_in_hex_str[:-12]
diff --git a/python/common/utils/indexpool.py b/python/common/utils/indexpool.py
new file mode 100644
index 0000000..858cb3a
--- /dev/null
+++ b/python/common/utils/indexpool.py
@@ -0,0 +1,64 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from bitstring import BitArray
+import structlog
+
+log = structlog.get_logger()
+
+class IndexPool(object):
+    def __init__(self, max_entries, offset):
+        self.max_entries = max_entries
+        self.offset = offset
+        self.indices = BitArray(self.max_entries)
+
+    def get_next(self):
+        try:
+            _pos = self.indices.find('0b0')
+            self.indices.set(1, _pos)
+            return self.offset + _pos[0]
+        except IndexError:
+            log.info("exception-fail-to-allocate-id-all-bits-in-use")
+            return None
+
+    def allocate(self, index):
+        try:
+            _pos = index - self.offset
+            if not (0 <= _pos < self.max_entries):
+                log.info("{}-out-of-range".format(index))
+                return None
+            if self.indices[_pos]:
+                log.info("{}-is-already-allocated".format(index))
+                return None
+            self.indices.set(1, _pos)
+            return index
+
+        except IndexError:
+            return None
+
+    def release(self, index):
+        index -= self.offset
+        _pos = (index,)
+        try:
+            self.indices.set(0, _pos)
+        except IndexError:
+            log.info("bit-position-{}-out-of-range".format(index))
+
+    #index or multiple indices to set all of them to 1 - need to be a tuple
+    def pre_allocate(self, index):
+        if(isinstance(index, tuple)):
+            _lst = list(index)
+            for i in range(len(_lst)):
+                _lst[i] -= self.offset
+            index = tuple(_lst)
+            self.indices.set(1, index)
diff --git a/python/common/utils/json_format.py b/python/common/utils/json_format.py
new file mode 100644
index 0000000..c18d013
--- /dev/null
+++ b/python/common/utils/json_format.py
@@ -0,0 +1,105 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Monkey patched json_format to allow best effort decoding of Any fields.
+Use the additional flag (strict_any_handling=False) to trigger the
+best-effort behavior. Omit the flag, or just use the original json_format
+module fot the strict behavior.
+"""
+
+from google.protobuf import json_format
+
+class _PatchedPrinter(json_format._Printer):
+
+    def __init__(self, including_default_value_fields=False,
+                 preserving_proto_field_name=False,
+                 strict_any_handling=False):
+        super(_PatchedPrinter, self).__init__(including_default_value_fields,
+                                              preserving_proto_field_name)
+        self.strict_any_handling = strict_any_handling
+
+    def _BestEffortAnyMessageToJsonObject(self, msg):
+        try:
+            res = self._AnyMessageToJsonObject(msg)
+        except TypeError:
+            res = self._RegularMessageToJsonObject(msg, {})
+        return res
+
+
+def MessageToDict(message,
+                  including_default_value_fields=False,
+                  preserving_proto_field_name=False,
+                  strict_any_handling=False):
+    """Converts protobuf message to a JSON dictionary.
+
+    Args:
+      message: The protocol buffers message instance to serialize.
+      including_default_value_fields: If True, singular primitive fields,
+          repeated fields, and map fields will always be serialized.  If
+          False, only serialize non-empty fields.  Singular message fields
+          and oneof fields are not affected by this option.
+      preserving_proto_field_name: If True, use the original proto field
+          names as defined in the .proto file. If False, convert the field
+          names to lowerCamelCase.
+      strict_any_handling: If True, converion will error out (like in the
+          original method) if an Any field with value for which the Any type
+          is not loaded is encountered. If False, the conversion will leave
+          the field un-packed, but otherwise will continue.
+
+    Returns:
+      A dict representation of the JSON formatted protocol buffer message.
+    """
+    printer = _PatchedPrinter(including_default_value_fields,
+                              preserving_proto_field_name,
+                              strict_any_handling=strict_any_handling)
+    # pylint: disable=protected-access
+    return printer._MessageToJsonObject(message)
+
+
+def MessageToJson(message,
+                  including_default_value_fields=False,
+                  preserving_proto_field_name=False,
+                  strict_any_handling=False):
+  """Converts protobuf message to JSON format.
+
+  Args:
+    message: The protocol buffers message instance to serialize.
+    including_default_value_fields: If True, singular primitive fields,
+        repeated fields, and map fields will always be serialized.  If
+        False, only serialize non-empty fields.  Singular message fields
+        and oneof fields are not affected by this option.
+    preserving_proto_field_name: If True, use the original proto field
+        names as defined in the .proto file. If False, convert the field
+        names to lowerCamelCase.
+    strict_any_handling: If True, converion will error out (like in the
+        original method) if an Any field with value for which the Any type
+        is not loaded is encountered. If False, the conversion will leave
+        the field un-packed, but otherwise will continue.
+
+  Returns:
+    A string containing the JSON formatted protocol buffer message.
+  """
+  printer = _PatchedPrinter(including_default_value_fields,
+                            preserving_proto_field_name,
+                            strict_any_handling=strict_any_handling)
+  return printer.ToJsonString(message)
+
+
+json_format._WKTJSONMETHODS['google.protobuf.Any'] = [
+    '_BestEffortAnyMessageToJsonObject',
+    '_ConvertAnyMessage'
+]
+
+json_format._Printer._BestEffortAnyMessageToJsonObject = \
+    json_format._Printer._AnyMessageToJsonObject
diff --git a/python/common/utils/message_queue.py b/python/common/utils/message_queue.py
new file mode 100644
index 0000000..2b4257a
--- /dev/null
+++ b/python/common/utils/message_queue.py
@@ -0,0 +1,89 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from twisted.internet.defer import Deferred
+from twisted.internet.defer import succeed
+
+
+class MessageQueue(object):
+    """
+    An event driven queue, similar to twisted.internet.defer.DeferredQueue
+    but which allows selective dequeing based on a predicate function.
+    Unlike DeferredQueue, there is no limit on backlog, and there is no queue
+    limit.
+    """
+
+    def __init__(self):
+        self.waiting = []  # tuples of (d, predicate)
+        self.queue = []  # messages piling up here if no one is waiting
+
+    def reset(self):
+        """
+        Purge all content as well as waiters (by errback-ing their entries).
+        :return: None
+        """
+        for d, _ in self.waiting:
+            d.errback(Exception('mesage queue reset() was called'))
+        self.waiting = []
+        self.queue = []
+
+    def _cancelGet(self, d):
+        """
+        Remove a deferred from our waiting list.
+        :param d: The deferred that was been canceled.
+        :return: None
+        """
+        for i in range(len(self.waiting)):
+            if self.waiting[i][0] is d:
+                self.waiting.pop(i)
+
+    def put(self, obj):
+        """
+        Add an object to this queue
+        :param obj: arbitrary object that will be added to the queue
+        :return:
+        """
+
+        # if someone is waiting for this, return right away
+        for i in range(len(self.waiting)):
+            d, predicate = self.waiting[i]
+            if predicate is None or predicate(obj):
+                self.waiting.pop(i)
+                d.callback(obj)
+                return
+
+        # otherwise...
+        self.queue.append(obj)
+
+    def get(self, predicate=None):
+        """
+        Attempt to retrieve and remove an object from the queue that
+        matches the optional predicate.
+        :return: Deferred which fires with the next object available.
+        If predicate was provided, only objects for which
+        predicate(obj) is True will be considered.
+        """
+        for i in range(len(self.queue)):
+            msg = self.queue[i]
+            if predicate is None or predicate(msg):
+                self.queue.pop(i)
+                return succeed(msg)
+
+        # there were no matching entries if we got here, so we wait
+        d = Deferred(canceller=self._cancelGet)
+        self.waiting.append((d, predicate))
+        return d
+
+
diff --git a/python/common/utils/nethelpers.py b/python/common/utils/nethelpers.py
new file mode 100644
index 0000000..b17aced
--- /dev/null
+++ b/python/common/utils/nethelpers.py
@@ -0,0 +1,86 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Some network related convenience functions
+"""
+
+from netifaces import AF_INET
+
+import netifaces as ni
+import netaddr
+
+
+def _get_all_interfaces():
+    m_interfaces = []
+    for iface in ni.interfaces():
+        m_interfaces.append((iface, ni.ifaddresses(iface)))
+    return m_interfaces
+
+
+def _get_my_primary_interface():
+    gateways = ni.gateways()
+    assert 'default' in gateways, \
+        ("No default gateway on host/container, "
+         "cannot determine primary interface")
+    default_gw_index = gateways['default'].keys()[0]
+    # gateways[default_gw_index] has the format (example):
+    # [('10.15.32.1', 'en0', True)]
+    interface_name = gateways[default_gw_index][0][1]
+    return interface_name
+
+
+def get_my_primary_local_ipv4(inter_core_subnet=None, ifname=None):
+    if not inter_core_subnet:
+        return _get_my_primary_local_ipv4(ifname)
+    # My IP should belong to the specified subnet
+    for iface in ni.interfaces():
+        addresses = ni.ifaddresses(iface)
+        if AF_INET in addresses:
+            m_ip = addresses[AF_INET][0]['addr']
+            _ip = netaddr.IPAddress(m_ip).value
+            m_network = netaddr.IPNetwork(inter_core_subnet)
+            if _ip >= m_network.first and _ip <= m_network.last:
+                return m_ip
+    return None
+
+
+def get_my_primary_interface(pon_subnet=None):
+    if not pon_subnet:
+        return _get_my_primary_interface()
+    # My interface should have an IP that belongs to the specified subnet
+    for iface in ni.interfaces():
+        addresses = ni.ifaddresses(iface)
+        if AF_INET in addresses:
+            m_ip = addresses[AF_INET][0]['addr']
+            m_ip = netaddr.IPAddress(m_ip).value
+            m_network = netaddr.IPNetwork(pon_subnet)
+            if m_ip >= m_network.first and m_ip <= m_network.last:
+                return iface
+    return None
+
+
+def _get_my_primary_local_ipv4(ifname=None):
+    try:
+        ifname = get_my_primary_interface() if ifname is None else ifname
+        addresses = ni.ifaddresses(ifname)
+        ipv4 = addresses[AF_INET][0]['addr']
+        return ipv4
+    except Exception as e:
+        return None
+
+if __name__ == '__main__':
+    print get_my_primary_local_ipv4()
diff --git a/python/common/utils/ordered_weakvalue_dict.py b/python/common/utils/ordered_weakvalue_dict.py
new file mode 100644
index 0000000..9ea739a
--- /dev/null
+++ b/python/common/utils/ordered_weakvalue_dict.py
@@ -0,0 +1,48 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from _weakref import ref
+from weakref import KeyedRef
+from collections import OrderedDict
+
+
+class OrderedWeakValueDict(OrderedDict):
+    """
+    Modified OrderedDict to use weak references as values. Entries disappear
+    automatically if the referred value has no more strong reference pointing
+    ot it.
+
+    Warning, this is not a complete implementation, only what is needed for
+    now. See test_ordered_wealvalue_dict.py to see what is tested behavior.
+    """
+    def __init__(self, *args, **kw):
+        def remove(wr, selfref=ref(self)):
+            self = selfref()
+            if self is not None:
+                super(OrderedWeakValueDict, self).__delitem__(wr.key)
+        self._remove = remove
+        super(OrderedWeakValueDict, self).__init__(*args, **kw)
+
+    def __setitem__(self, key, value):
+        super(OrderedWeakValueDict, self).__setitem__(
+            key, KeyedRef(value, self._remove, key))
+
+    def __getitem__(self, key):
+        o = super(OrderedWeakValueDict, self).__getitem__(key)()
+        if o is None:
+            raise KeyError, key
+        else:
+            return o
+