VOL-1451 Initial checkin of openonu build

Produced docker container capable of building and running
openonu/brcm_openonci_onu.  Copied over current onu code
and resolved all imports by copying into the local source tree.

Change-Id: Ib9785d37afc65b7d32ecf74aee2456352626e2b6
diff --git a/python/core/__init__.py b/python/core/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/core/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/core/config/__init__.py b/python/core/config/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/core/config/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/core/config/config_backend.py b/python/core/config/config_backend.py
new file mode 100644
index 0000000..d906348
--- /dev/null
+++ b/python/core/config/config_backend.py
@@ -0,0 +1,289 @@
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from consul import Consul, ConsulException
+from common.utils.asleep import asleep
+from requests import ConnectionError
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+import etcd3
+import structlog
+
+
+class ConsulStore(object):
+    """ Config kv store for consul with a cache for quicker subsequent reads
+
+        TODO: This will block the reactor. Should either change
+        whole call stack to yield or put the put/delete transactions into a
+        queue to write later with twisted. Will need a transaction
+        log to ensure we don't lose anything.
+        Making the whole callstack yield is troublesome because other tasks can
+        come in on the side and start modifying things which could be bad.
+    """
+
+    CONNECT_RETRY_INTERVAL_SEC = 1
+    RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+    def __init__(self, host, port, path_prefix):
+
+        self.log = structlog.get_logger()
+        self._consul = Consul(host=host, port=port)
+        self.host = host
+        self.port = port
+        self._path_prefix = path_prefix
+        self._cache = {}
+        self.retries = 0
+
+    def make_path(self, key):
+        return '{}/{}'.format(self._path_prefix, key)
+
+    def __getitem__(self, key):
+        if key in self._cache:
+            return self._cache[key]
+        value = self._kv_get(self.make_path(key))
+        if value is not None:
+            # consul turns empty strings to None, so we do the reverse here
+            self._cache[key] = value['Value'] or ''
+            return value['Value'] or ''
+        else:
+            raise KeyError(key)
+
+    def __contains__(self, key):
+        if key in self._cache:
+            return True
+        value = self._kv_get(self.make_path(key))
+        if value is not None:
+            self._cache[key] = value['Value']
+            return True
+        else:
+            return False
+
+    def __setitem__(self, key, value):
+        try:
+            assert isinstance(value, basestring)
+            self._cache[key] = value
+            self._kv_put(self.make_path(key), value)
+        except Exception, e:
+            self.log.exception('cannot-set-item', e=e)
+
+    def __delitem__(self, key):
+        self._cache.pop(key, None)
+        self._kv_delete(self.make_path(key))
+
+    @inlineCallbacks
+    def _backoff(self, msg):
+        wait_time = self.RETRY_BACKOFF[min(self.retries,
+                                           len(self.RETRY_BACKOFF) - 1)]
+        self.retries += 1
+        self.log.error(msg, retry_in=wait_time)
+        yield asleep(wait_time)
+
+    def _redo_consul_connection(self):
+        self._consul = Consul(host=self.host, port=self.port)
+        self._cache.clear()
+
+    def _clear_backoff(self):
+        if self.retries:
+            self.log.info('reconnected-to-consul', after_retries=self.retries)
+            self.retries = 0
+
+    def _get_consul(self):
+        return self._consul
+
+    # Proxy methods for consul with retry support
+    def _kv_get(self, *args, **kw):
+        return self._retry('GET', *args, **kw)
+
+    def _kv_put(self, *args, **kw):
+        return self._retry('PUT', *args, **kw)
+
+    def _kv_delete(self, *args, **kw):
+        return self._retry('DELETE', *args, **kw)
+
+    def _retry(self, operation, *args, **kw):
+        while 1:
+            try:
+                consul = self._get_consul()
+                self.log.debug('consul', consul=consul, operation=operation,
+                         args=args)
+                if operation == 'GET':
+                    index, result = consul.kv.get(*args, **kw)
+                elif operation == 'PUT':
+                     result = consul.kv.put(*args, **kw)
+                elif operation == 'DELETE':
+                    result = consul.kv.delete(*args, **kw)
+                else:
+                    # Default case - consider operation as a function call
+                    result = operation(*args, **kw)
+                self._clear_backoff()
+                break
+            except ConsulException, e:
+                self.log.exception('consul-not-up', e=e)
+                self._backoff('consul-not-up')
+            except ConnectionError, e:
+                self.log.exception('cannot-connect-to-consul', e=e)
+                self._backoff('cannot-connect-to-consul')
+            except Exception, e:
+                self.log.exception(e)
+                self._backoff('unknown-error')
+            self._redo_consul_connection()
+
+        return result
+
+
+class EtcdStore(object):
+    """ Config kv store for etcd with a cache for quicker subsequent reads
+
+        TODO: This will block the reactor. Should either change
+        whole call stack to yield or put the put/delete transactions into a
+        queue to write later with twisted. Will need a transaction
+        log to ensure we don't lose anything.
+        Making the whole callstack yield is troublesome because other tasks can
+        come in on the side and start modifying things which could be bad.
+    """
+
+    CONNECT_RETRY_INTERVAL_SEC = 1
+    RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+    def __init__(self, host, port, path_prefix):
+
+        self.log = structlog.get_logger()
+        self._etcd = etcd3.client(host=host, port=port)
+        self.host = host
+        self.port = port
+        self._path_prefix = path_prefix
+        self._cache = {}
+        self.retries = 0
+
+    def make_path(self, key):
+        return '{}/{}'.format(self._path_prefix, key)
+
+    def __getitem__(self, key):
+        if key in self._cache:
+            return self._cache[key]
+        (value, meta) = self._kv_get(self.make_path(key))
+        if value is not None:
+            self._cache[key] = value
+            return value
+        else:
+            raise KeyError(key)
+
+    def __contains__(self, key):
+        if key in self._cache:
+            return True
+        (value, meta) = self._kv_get(self.make_path(key))
+        if value is not None:
+            self._cache[key] = value
+            return True
+        else:
+            return False
+
+    def __setitem__(self, key, value):
+        try:
+            assert isinstance(value, basestring)
+            self._cache[key] = value
+            self._kv_put(self.make_path(key), value)
+        except Exception, e:
+            self.log.exception('cannot-set-item', e=e)
+
+    def __delitem__(self, key):
+        self._cache.pop(key, None)
+        self._kv_delete(self.make_path(key))
+
+    @inlineCallbacks
+    def _backoff(self, msg):
+        wait_time = self.RETRY_BACKOFF[min(self.retries,
+                                           len(self.RETRY_BACKOFF) - 1)]
+        self.retries += 1
+        self.log.error(msg, retry_in=wait_time)
+        yield asleep(wait_time)
+
+    def _redo_etcd_connection(self):
+        self._etcd = etcd3.client(host=self.host, port=self.port)
+        self._cache.clear()
+
+    def _clear_backoff(self):
+        if self.retries:
+            self.log.info('reconnected-to-etcd', after_retries=self.retries)
+            self.retries = 0
+
+    def _get_etcd(self):
+        return self._etcd
+
+    # Proxy methods for etcd with retry support
+    def _kv_get(self, *args, **kw):
+        return self._retry('GET', *args, **kw)
+
+    def _kv_put(self, *args, **kw):
+        return self._retry('PUT', *args, **kw)
+
+    def _kv_delete(self, *args, **kw):
+        return self._retry('DELETE', *args, **kw)
+
+    def _retry(self, operation, *args, **kw):
+
+        # etcd data sometimes contains non-utf8 sequences, replace
+        self.log.debug('backend-op',
+                  operation=operation,
+                  args=map(lambda x : unicode(x,'utf8','replace'), args),
+                  kw=kw)
+
+        while 1:
+            try:
+                etcd = self._get_etcd()
+                self.log.debug('etcd', etcd=etcd, operation=operation,
+                    args=map(lambda x : unicode(x,'utf8','replace'), args))
+                if operation == 'GET':
+                    (value, meta) = etcd.get(*args, **kw)
+                    result = (value, meta)
+                elif operation == 'PUT':
+                    result = etcd.put(*args, **kw)
+                elif operation == 'DELETE':
+                    result = etcd.delete(*args, **kw)
+                else:
+                    # Default case - consider operation as a function call
+                    result = operation(*args, **kw)
+                self._clear_backoff()
+                break
+            except Exception, e:
+                self.log.exception(e)
+                self._backoff('unknown-error-with-etcd')
+            self._redo_etcd_connection()
+
+        return result
+
+
+def load_backend(store_id, store_prefix, args):
+    """ Return the kv store backend based on the command line arguments
+    """
+
+    def load_consul_store():
+        instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
+
+        host, port = args.consul.split(':', 1)
+        return ConsulStore(host, int(port), instance_core_store_prefix)
+
+    def load_etcd_store():
+        instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
+
+        host, port = args.etcd.split(':', 1)
+        return EtcdStore(host, int(port), instance_core_store_prefix)
+
+    loaders = {
+        'none': lambda: None,
+        'consul': load_consul_store,
+        'etcd': load_etcd_store
+    }
+
+    return loaders[args.backend]()
diff --git a/python/core/config/config_branch.py b/python/core/config/config_branch.py
new file mode 100644
index 0000000..207818b
--- /dev/null
+++ b/python/core/config/config_branch.py
@@ -0,0 +1,53 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Class to hold revisions, latest revision, etc., for a config node, used
+for the active committed revisions or revisions part of a transaction.
+"""
+
+from collections import OrderedDict
+from weakref import WeakValueDictionary
+
+
+class ConfigBranch(object):
+
+    __slots__ = (
+        '_node',  # ref to node
+        '_txid',  # txid for this branch (None for the committed branch)
+        '_origin',  # _latest at time of branching on default branch
+        '_revs',  # dict of rev-hash to ref of ConfigRevision
+        '_latest',  # ref to latest committed ConfigRevision
+        '__weakref__'
+    )
+
+    def __init__(self, node, txid=None, origin=None, auto_prune=True):
+        self._node = node
+        self._txid = txid
+        self._origin = origin
+        self._revs = WeakValueDictionary() if auto_prune else OrderedDict()
+        self._latest = origin
+
+    def __getitem__(self, hash):
+        return self._revs[hash]
+
+    @property
+    def latest(self):
+        return self._latest
+
+    @property
+    def origin(self):
+        return self._origin
diff --git a/python/core/config/config_event_bus.py b/python/core/config/config_event_bus.py
new file mode 100644
index 0000000..e56d77a
--- /dev/null
+++ b/python/core/config/config_event_bus.py
@@ -0,0 +1,66 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import structlog
+from enum import Enum
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+
+from common.event_bus import EventBusClient
+from voltha.core.config.config_proxy import CallbackType
+from voltha.protos import third_party
+from voltha.protos.events_pb2 import ConfigEvent, ConfigEventType
+
+IGNORED_CALLBACKS = [CallbackType.PRE_ADD, CallbackType.GET,
+                     CallbackType.POST_LISTCHANGE, CallbackType.PRE_REMOVE,
+                     CallbackType.PRE_UPDATE]
+
+log = structlog.get_logger()
+
+class ConfigEventBus(object):
+
+    __slots__ = (
+        '_event_bus_client',  # The event bus client used to publish events.
+        '_topic'  # the topic to publish to
+    )
+
+    def __init__(self):
+        self._event_bus_client = EventBusClient()
+        self._topic = 'model-change-events'
+
+    def advertise(self, type, data, hash=None):
+        if type in IGNORED_CALLBACKS:
+            log.info('Ignoring event {} with data {}'.format(type, data))
+            return
+
+        if type is CallbackType.POST_ADD:
+            kind = ConfigEventType.add
+        elif type is CallbackType.POST_REMOVE:
+            kind = ConfigEventType.remove
+        else:
+            kind = ConfigEventType.update
+
+        if isinstance(data, Message):
+            msg = dumps(MessageToDict(data, True, True))
+        else:
+            msg = data
+
+        event = ConfigEvent(
+            type=kind,
+            hash=hash,
+            data=msg
+        )
+
+        self._event_bus_client.publish(self._topic, event)
+
diff --git a/python/core/config/config_node.py b/python/core/config/config_node.py
new file mode 100644
index 0000000..ab73484
--- /dev/null
+++ b/python/core/config/config_node.py
@@ -0,0 +1,617 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from copy import copy
+
+from jsonpatch import JsonPatch
+from jsonpatch import make_patch
+
+from common.utils.json_format import MessageToDict
+from voltha.core.config.config_branch import ConfigBranch
+from voltha.core.config.config_event_bus import ConfigEventBus
+from voltha.core.config.config_proxy import CallbackType, ConfigProxy
+from voltha.core.config.config_rev import is_proto_message, children_fields, \
+    ConfigRevision, access_rights
+from voltha.core.config.config_rev_persisted import PersistedConfigRevision
+from voltha.core.config.merge_3way import merge_3way
+from voltha.protos import third_party
+from voltha.protos import meta_pb2
+
+import structlog
+
+log = structlog.get_logger()
+
+def message_to_dict(m):
+    return MessageToDict(m, True, True, False)
+
+
+def check_access_violation(new_msg, old_msg):
+    """Raise ValueError if attempt is made to change a read-only field"""
+    access_map = access_rights(new_msg.__class__)
+    violated_fields = []
+    for field_name, access in access_map.iteritems():
+        if access == meta_pb2.READ_ONLY:
+            if getattr(new_msg, field_name) != getattr(old_msg, field_name):
+                violated_fields.append(field_name)
+    if violated_fields:
+        raise ValueError('Cannot change read-only field(s) %s' %
+                         ', '.join('"%s"' % f for f in violated_fields))
+
+
+def find_rev_by_key(revs, keyname, value):
+    for i, rev in enumerate(revs):
+        if getattr(rev._config._data, keyname) == value:
+            return i, rev
+    raise KeyError('key {}={} not found'.format(keyname, value))
+
+
+class ConfigNode(object):
+    """
+    Represents a configuration node which can hold a number of revisions
+    of the configuration for this node.
+    When the configuration changes, the new version is appended to the
+    node.
+    Initial data must be a protobuf message and it will determine the type of
+    this node.
+    """
+    __slots__ = (
+        '_root',  # ref to root node
+        '_type',  # node type, as __class__ of protobuf message
+        '_branches',  # dict of transaction branches and a default (committed)
+                      # branch
+        '_tags',  # dict of tag-name to ref of ConfigRevision
+        '_proxy',  # ref to proxy observer or None if no proxy assigned
+        '_event_bus',  # ref to event_bus or None if no event bus is assigned
+        '_auto_prune'
+    )
+
+    def __init__(self, root, initial_data, auto_prune=True, txid=None):
+        self._root = root
+        self._branches = {}
+        self._tags = {}
+        self._proxy = None
+        self._event_bus = None
+        self._auto_prune = auto_prune
+
+        if isinstance(initial_data, type):
+            self._type = initial_data
+        elif is_proto_message(initial_data):
+            self._type = initial_data.__class__
+            copied_data = initial_data.__class__()
+            copied_data.CopyFrom(initial_data)
+            self._initialize(copied_data, txid)
+        else:
+            raise NotImplementedError()
+
+    def _mknode(self, *args, **kw):
+        return ConfigNode(self._root, *args, **kw)
+
+    def _mkrev(self, *args, **kw):
+        return self._root.mkrev(*args, **kw)
+
+    def _initialize(self, data, txid):
+        # separate external children data away from locally stored data
+        # based on child_node annotations in protobuf
+        children = {}
+        for field_name, field in children_fields(self._type).iteritems():
+            field_value = getattr(data, field_name)
+            if field.is_container:
+                if field.key:
+                    keys_seen = set()
+                    children[field_name] = lst = []
+                    for v in field_value:
+                        rev = self._mknode(v, txid=txid).latest
+                        key = getattr(v, field.key)
+                        if key in keys_seen:
+                            raise ValueError('Duplicate key "{}"'.format(key))
+                        lst.append(rev)
+                        keys_seen.add(key)
+                else:
+                    children[field_name] = [
+                        self._mknode(v, txid=txid).latest for v in field_value]
+            else:
+                children[field_name] = [
+                    self._mknode(field_value, txid=txid).latest]
+            data.ClearField(field_name)
+
+        branch = ConfigBranch(self, auto_prune=self._auto_prune)
+        rev = self._mkrev(branch, data, children)
+        self._make_latest(branch, rev)
+        self._branches[txid] = branch
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ accessors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+    # these convenience short-cuts only work for the committed branch
+
+    @property
+    def revisions(self):
+        return [r._hash for r in self._branches[None]._revs.itervalues()]
+
+    @property
+    def latest(self):
+        return self._branches[None]._latest
+
+    def __getitem__(self, hash):
+        return self._branches[None]._revs[hash]
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ get operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def get(self, path=None, hash=None, depth=0, deep=False, txid=None):
+
+        # depth preparation
+        if deep:
+            depth = -1
+
+        # path preparation
+        path = '' if path is None else path
+        while path.startswith('/'):
+            path = path[1:]
+
+        # determine branch; if lookup fails, it is ok to use default branch
+        branch = self._branches.get(txid, None) or self._branches[None]
+
+        # determine rev
+        if hash is not None:
+            rev = branch._revs[hash]
+        else:
+            rev = branch.latest
+
+        return self._get(rev, path, depth)
+
+    def _get(self, rev, path, depth):
+
+        if not path:
+            return self._do_get(rev, depth)
+
+        # ... otherwise
+        name, _, path = path.partition('/')
+        field = children_fields(self._type)[name]
+        if field.is_container:
+            if field.key:
+                children = rev._children[name]
+                if path:
+                    # need to escalate further
+                    key, _, path = path.partition('/')
+                    key = field.key_from_str(key)
+                    _, child_rev = find_rev_by_key(children, field.key, key)
+                    child_node = child_rev.node
+                    return child_node._get(child_rev, path, depth)
+                else:
+                    # we are the node of interest
+                    response = []
+                    for child_rev in children:
+                        child_node = child_rev.node
+                        value = child_node._do_get(child_rev, depth)
+                        response.append(value)
+                    return response
+            else:
+                if path:
+                    raise LookupError(
+                        'Cannot index into container with no key defined')
+                response = []
+                for child_rev in rev._children[name]:
+                    child_node = child_rev.node
+                    value = child_node._do_get(child_rev, depth)
+                    response.append(value)
+                return response
+        else:
+            child_rev = rev._children[name][0]
+            child_node = child_rev.node
+            return child_node._get(child_rev, path, depth)
+
+    def _do_get(self, rev, depth):
+        msg = rev.get(depth)
+        if self._proxy is not None:
+            msg = self._proxy.invoke_callbacks(CallbackType.GET, msg)
+        return msg
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ update operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def update(self, path, data, strict=False, txid=None, mk_branch=None):
+
+        while path.startswith('/'):
+            path = path[1:]
+
+        try:
+            branch = self._branches[txid]
+        except KeyError:
+            branch = mk_branch(self)
+
+        if not path:
+            return self._do_update(branch, data, strict)
+
+        rev = branch._latest  # change is always made to the latest
+        name, _, path = path.partition('/')
+        field = children_fields(self._type)[name]
+        if field.is_container:
+            if not path:
+                raise ValueError('Cannot update a list')
+            if field.key:
+                key, _, path = path.partition('/')
+                key = field.key_from_str(key)
+                children = copy(rev._children[name])
+                idx, child_rev = find_rev_by_key(children, field.key, key)
+                child_node = child_rev.node
+                # chek if deep copy will work better
+                new_child_rev = child_node.update(
+                    path, data, strict, txid, mk_branch)
+                if new_child_rev.hash == child_rev.hash:
+                    # When the new_child_rev goes out of scope,
+                    # it's destructor gets invoked as it is not being
+                    # referred by any other data structures.  To prevent
+                    # this to trigger the hash it is holding from being
+                    # erased in the db, its hash is set to None.  If the
+                    # new_child_rev object is pointing at the same address
+                    # as the child_rev address then do not clear the hash
+                    if new_child_rev != child_rev:
+                        log.debug('clear-hash',
+                             hash=new_child_rev.hash, object_ref=new_child_rev)
+                        new_child_rev.clear_hash()
+                    return branch._latest
+                if getattr(new_child_rev.data, field.key) != key:
+                    raise ValueError('Cannot change key field')
+                children[idx] = new_child_rev
+                rev = rev.update_children(name, children, branch)
+                self._make_latest(branch, rev)
+                return rev
+            else:
+                raise ValueError('Cannot index into container with no keys')
+
+        else:
+            child_rev = rev._children[name][0]
+            child_node = child_rev.node
+            new_child_rev = child_node.update(
+                path, data, strict, txid, mk_branch)
+            rev = rev.update_children(name, [new_child_rev], branch)
+            self._make_latest(branch, rev)
+            return rev
+
+    def _do_update(self, branch, data, strict):
+        if not isinstance(data, self._type):
+            raise ValueError(
+                '"{}" is not a valid data type for this node'.format(
+                    data.__class__.__name__))
+        self._test_no_children(data)
+        if self._proxy is not None:
+            self._proxy.invoke_callbacks(CallbackType.PRE_UPDATE, data)
+
+        if branch._latest.data != data:
+            if strict:
+                # check if attempt is made to change read-only field
+                check_access_violation(data, branch._latest.data)
+            rev = branch._latest.update_data(data, branch)
+            self._make_latest(branch, rev,
+                              ((CallbackType.POST_UPDATE, rev.data),))
+            return rev
+        else:
+            return branch._latest
+
+    def _make_latest(self, branch, rev, change_announcements=()):
+        # Update the latest branch only when the hash between the previous
+        # data and the new rev is different, otherwise this will trigger the
+        # data already saved in the db (with that hash) to be erased
+        if rev.hash not in branch._revs:
+            branch._revs[rev.hash] = rev
+
+        if not branch._latest or rev.hash != branch._latest.hash:
+            branch._latest = rev
+
+        # announce only if this is main branch
+        if change_announcements and branch._txid is None:
+
+            if self._proxy is not None:
+                for change_type, data in change_announcements:
+                    # since the callback may operate on the config tree,
+                    # we have to defer the execution of the callbacks till
+                    # the change is propagated to the root, then root will
+                    # call the callbacks
+                    self._root.enqueue_callback(
+                        self._proxy.invoke_callbacks,
+                        change_type,
+                        data,
+                        proceed_on_errors=1,
+                    )
+
+            for change_type, data in change_announcements:
+                self._root.enqueue_notification_callback(
+                    self._mk_event_bus().advertise,
+                    change_type,
+                    data,
+                    hash=rev.hash
+                )
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ add operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def add(self, path, data, txid=None, mk_branch=None):
+        while path.startswith('/'):
+            path = path[1:]
+        if not path:
+            raise ValueError('Cannot add to non-container node')
+
+        try:
+            branch = self._branches[txid]
+        except KeyError:
+            branch = mk_branch(self)
+
+        rev = branch._latest  # change is always made to latest
+        name, _, path = path.partition('/')
+        field = children_fields(self._type)[name]
+        if field.is_container:
+            if not path:
+                # we do need to add a new child to the field
+                if field.key:
+                    if self._proxy is not None:
+                        self._proxy.invoke_callbacks(
+                            CallbackType.PRE_ADD, data)
+                    children = copy(rev._children[name])
+                    key = getattr(data, field.key)
+                    try:
+                        find_rev_by_key(children, field.key, key)
+                    except KeyError:
+                        pass
+                    else:
+                        raise ValueError('Duplicate key "{}"'.format(key))
+                    child_rev = self._mknode(data).latest
+                    children.append(child_rev)
+                    rev = rev.update_children(name, children, branch)
+                    self._make_latest(branch, rev,
+                                      ((CallbackType.POST_ADD, data),))
+                    return rev
+                else:
+                    # adding to non-keyed containers not implemented yet
+                    raise ValueError('Cannot add to non-keyed container')
+            else:
+                if field.key:
+                    # need to escalate
+                    key, _, path = path.partition('/')
+                    key = field.key_from_str(key)
+                    children = copy(rev._children[name])
+                    idx, child_rev = find_rev_by_key(children, field.key, key)
+                    child_node = child_rev.node
+                    new_child_rev = child_node.add(path, data, txid, mk_branch)
+                    children[idx] = new_child_rev
+                    rev = rev.update_children(name, children, branch)
+                    self._make_latest(branch, rev)
+                    return rev
+                else:
+                    raise ValueError(
+                        'Cannot index into container with no keys')
+        else:
+            raise ValueError('Cannot add to non-container field')
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ remove operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def remove(self, path, txid=None, mk_branch=None):
+        while path.startswith('/'):
+            path = path[1:]
+        if not path:
+            raise ValueError('Cannot remove from non-container node')
+
+        try:
+            branch = self._branches[txid]
+        except KeyError:
+            branch = mk_branch(self)
+
+        rev = branch._latest  # change is always made to latest
+        name, _, path = path.partition('/')
+        field = children_fields(self._type)[name]
+        if field.is_container:
+            if not path:
+                raise ValueError("Cannot remove without a key")
+            if field.key:
+                key, _, path = path.partition('/')
+                key = field.key_from_str(key)
+                if path:
+                    # need to escalate
+                    children = copy(rev._children[name])
+                    idx, child_rev = find_rev_by_key(children, field.key, key)
+                    child_node = child_rev.node
+                    new_child_rev = child_node.remove(path, txid, mk_branch)
+                    children[idx] = new_child_rev
+                    rev = rev.update_children(name, children, branch)
+                    self._make_latest(branch, rev)
+                    return rev
+                else:
+                    # need to remove from this very node
+                    children = copy(rev._children[name])
+                    idx, child_rev = find_rev_by_key(children, field.key, key)
+                    if self._proxy is not None:
+                        data = child_rev.data
+                        self._proxy.invoke_callbacks(
+                            CallbackType.PRE_REMOVE, data)
+                        post_anno = ((CallbackType.POST_REMOVE, data),)
+                    else:
+                        post_anno = ((CallbackType.POST_REMOVE, child_rev.data),)
+                    del children[idx]
+                    rev = rev.update_children(name, children, branch)
+                    self._make_latest(branch, rev, post_anno)
+                    return rev
+            else:
+                raise ValueError('Cannot remove from non-keyed container')
+        else:
+            raise ValueError('Cannot remove non-conatiner field')
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def _mk_txbranch(self, txid):
+        branch_point = self._branches[None].latest
+        branch = ConfigBranch(self, txid, branch_point)
+        self._branches[txid] = branch
+        return branch
+
+    def _del_txbranch(self, txid):
+        del self._branches[txid]
+
+    def _merge_txbranch(self, txid, dry_run=False):
+        """
+        Make latest in branch to be latest in the common branch, but only
+        if no conflict is detected. Conflict is where the txbranch branch
+        point no longer matches the latest in the default branch. This has
+        to be verified recursively.
+        """
+
+        def merge_child(child_rev):
+            child_branch = child_rev._branch
+            if child_branch._txid == txid:
+                child_rev = child_branch._node._merge_txbranch(txid, dry_run)
+            return child_rev
+
+        src_branch = self._branches[txid]
+        dst_branch = self._branches[None]
+
+        fork_rev = src_branch.origin  # rev from which src branch was made
+        src_rev = src_branch.latest  # head rev of source branch
+        dst_rev = dst_branch.latest  # head rev of target branch
+
+        rev, changes = merge_3way(
+            fork_rev, src_rev, dst_rev, merge_child, dry_run)
+
+        if not dry_run:
+            self._make_latest(dst_branch, rev, change_announcements=changes)
+            del self._branches[txid]
+
+        return rev
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def diff(self, hash1, hash2=None, txid=None):
+        branch = self._branches[txid]
+        rev1 = branch[hash1]
+        rev2 = branch[hash2] if hash2 else branch._latest
+        if rev1.hash == rev2.hash:
+            return JsonPatch([])
+        else:
+            dict1 = message_to_dict(rev1.data)
+            dict2 = message_to_dict(rev2.data)
+            return make_patch(dict1, dict2)
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Tagging utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def tag(self, tag, hash=None):
+        branch = self._branches[None]  # tag only what has been committed
+        rev = branch._latest if hash is None else branch._revs[hash]
+        self._tags[tag] = rev
+        self.persist_tags()
+        return self
+
+    @property
+    def tags(self):
+        return sorted(self._tags.iterkeys())
+
+    def by_tag(self, tag):
+        """
+        Return revision based on tag
+        :param tag: previously registered tag value
+        :return: revision object
+        """
+        return self._tags[tag]
+
+    def diff_by_tag(self, tag1, tag2):
+        return self.diff(self._tags[tag1].hash, self._tags[tag2].hash)
+
+    def delete_tag(self, tag):
+        del self._tags[tag]
+        self.persist_tags()
+
+    def delete_tags(self, *tags):
+        for tag in tags:
+            del self._tags[tag]
+        self.persist_tags()
+
+    def prune_untagged(self):
+        branch = self._branches[None]
+        keep = set(rev.hash for rev in self._tags.itervalues())
+        keep.add(branch._latest.hash)
+        for hash in branch._revs.keys():
+            if hash not in keep:
+                del branch._revs[hash]
+        return self
+
+    def persist_tags(self):
+        """
+        Persist tag information to the backend
+        """
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Internals ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def _test_no_children(self, data):
+        for field_name, field in children_fields(self._type).items():
+            field_value = getattr(data, field_name)
+            if field.is_container:
+                if len(field_value):
+                    raise NotImplementedError(
+                        'Cannot update external children')
+            else:
+                if data.HasField(field_name):
+                    raise NotImplementedError(
+                        'Cannot update externel children')
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Node proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def get_proxy(self, path, exclusive=False):
+        return self._get_proxy(path, self, path, exclusive)
+
+    def _get_proxy(self, path, root, full_path, exclusive):
+        while path.startswith('/'):
+            path = path[1:]
+        if not path:
+            return self._mk_proxy(root, full_path, exclusive)
+
+        # need to escalate
+        rev = self._branches[None]._latest
+        name, _, path = path.partition('/')
+        field = children_fields(self._type)[name]
+        if field.is_container:
+            if not path:
+                raise ValueError('Cannot proxy a container field')
+            if field.key:
+                key, _, path = path.partition('/')
+                key = field.key_from_str(key)
+                children = rev._children[name]
+                _, child_rev = find_rev_by_key(children, field.key, key)
+                child_node = child_rev.node
+                return child_node._get_proxy(path, root, full_path, exclusive)
+
+            raise ValueError('Cannot index into container with no keys')
+
+        else:
+            child_rev = rev._children[name][0]
+            child_node = child_rev.node
+            return child_node._get_proxy(path, root, full_path, exclusive)
+
+    def _mk_proxy(self, root, full_path, exclusive):
+        if self._proxy is None:
+            self._proxy = ConfigProxy(root, self, full_path, exclusive)
+        else:
+            if self._proxy.exclusive:
+                raise ValueError('Node is already owned exclusively')
+        return self._proxy
+
+    def _mk_event_bus(self):
+        if self._event_bus is None:
+            self._event_bus = ConfigEventBus()
+        return self._event_bus
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~ Persistence loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def load_latest(self, latest_hash):
+
+        root = self._root
+        kv_store = root._kv_store
+
+        branch = ConfigBranch(node=self, auto_prune=self._auto_prune)
+        rev = PersistedConfigRevision.load(
+            branch, kv_store, self._type, latest_hash)
+        self._make_latest(branch, rev)
+        self._branches[None] = branch
diff --git a/python/core/config/config_proxy.py b/python/core/config/config_proxy.py
new file mode 100644
index 0000000..57d8150
--- /dev/null
+++ b/python/core/config/config_proxy.py
@@ -0,0 +1,155 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from enum import Enum
+
+from voltha.core.config.config_txn import ConfigTransaction
+
+log = structlog.get_logger()
+
+
+class OperationContext(object):
+    def __init__(self, path=None, data=None, field_name=None, child_key=None):
+        self.path = path
+        self._data = data
+        self.field_name = field_name
+        self.child_key = child_key
+    @property
+    def data(self):
+        return self._data
+    def update(self, data):
+        self._data = data
+        return self
+    def __repr__(self):
+        return 'OperationContext({})'.format(self.__dict__)
+
+
+class CallbackType(Enum):
+
+    # GET hooks are called after the data is retrieved and can be used to
+    # augment the data (they should only augment fields marked as REAL_TIME
+    GET = 1
+
+    # PRE_UPDATE hooks are called before the change is made and are supposed
+    # to be used to reject the data by raising an exception. If they don't,
+    # the change will be applied.
+    PRE_UPDATE = 2
+
+    # POST_UPDATE hooks are called after the update has occurred and can
+    # be used to deal with the change. For instance, an adapter can use the
+    # callback to trigger the south-bound configuration
+    POST_UPDATE = 3
+
+    # These behave similarly to the update callbacks as described above.
+    PRE_ADD = 4
+    POST_ADD = 5
+    PRE_REMOVE = 6
+    POST_REMOVE = 7
+
+    # Bulk list change due to transaction commit that changed items in
+    # non-keyed container fields
+    POST_LISTCHANGE = 8
+
+
+class ConfigProxy(object):
+    """
+    Allows an entity to look at a sub-tree and see it as it was the whole tree
+    """
+    __slots__ = (
+        '_root',
+        '_node',
+        '_path',
+        '_exclusive',
+        '_callbacks'
+    )
+
+    def __init__(self, root, node, path, exclusive):
+        self._root = root
+        self._node = node
+        self._exclusive = exclusive
+        self._path = path  # full path to proxied node
+        self._callbacks = {}  # call back type -> list of callbacks
+
+    @property
+    def exclusive(self):
+        return self._exclusive
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~ CRUD handlers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def get(self, path='/', depth=None, deep=None, txid=None):
+        return self._node.get(path, depth=depth, deep=deep, txid=txid)
+
+    def update(self, path, data, strict=False, txid=None):
+        assert path.startswith('/')
+        full_path = self._path if path == '/' else self._path + path
+        return self._root.update(full_path, data, strict, txid=txid)
+
+    def add(self, path, data, txid=None):
+        assert path.startswith('/')
+        full_path = self._path if path == '/' else self._path + path
+        return self._root.add(full_path, data, txid=txid)
+
+    def remove(self, path, txid=None):
+        assert path.startswith('/')
+        full_path = self._path if path == '/' else self._path + path
+        return self._root.remove(full_path, txid=txid)
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~ Transaction support ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def open_transaction(self):
+        """Open a new transaction"""
+        txid = self._root.mk_txbranch()
+        return ConfigTransaction(self, txid)
+
+    def commit_transaction(self, txid):
+        """
+        If having an open transaction, commit it now. Will raise exception
+        if conflict is detected. Either way, transaction will be deleted.
+        """
+        self._root.fold_txbranch(txid)
+
+    def cancel_transaction(self, txid):
+        """
+        Cancel current transaction if we are in a transaction. Always succeeds.
+        """
+        self._root.del_txbranch(txid)
+
+    # ~~~~~~~~~~~~~~~~~~~~~~ Callbacks registrations ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def register_callback(self, callback_type, callback, *args, **kw):
+        lst = self._callbacks.setdefault(callback_type, [])
+        lst.append((callback, args, kw))
+
+    def unregister_callback(self, callback_type, callback, *args, **kw):
+        lst = self._callbacks.setdefault(callback_type, [])
+        if (callback, args, kw) in lst:
+            lst.remove((callback, args, kw))
+
+    # ~~~~~~~~~~~~~~~~~~~~~ Callback dispatch ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def invoke_callbacks(self, callback_type, context, proceed_on_errors=False):
+        lst = self._callbacks.get(callback_type, [])
+        for callback, args, kw in lst:
+            try:
+                context = callback(context, *args, **kw)
+            except Exception, e:
+                if proceed_on_errors:
+                    log.exception(
+                        'call-back-error', callback_type=callback_type,
+                        context=context, e=e)
+                else:
+                    raise
+        return context
diff --git a/python/core/config/config_rev.py b/python/core/config/config_rev.py
new file mode 100644
index 0000000..8bfac18
--- /dev/null
+++ b/python/core/config/config_rev.py
@@ -0,0 +1,342 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Immutable classes to store config revision information arranged in a tree.
+
+Immutability cannot be enforced in Python, so anyoen working with these
+classes directly must obey the rules.
+"""
+
+import weakref
+from copy import copy
+from hashlib import md5
+
+from google.protobuf.descriptor import Descriptor
+from simplejson import dumps
+
+from common.utils.json_format import MessageToJson
+from voltha.protos import third_party
+from voltha.protos import meta_pb2
+
+import structlog
+
+log = structlog.get_logger()
+
+def is_proto_message(o):
+    """
+    Return True if object o appears to be a protobuf message; False otherwise.
+    """
+    # use a somewhat empirical approach to decide if something looks like
+    # a protobuf message
+    return isinstance(getattr(o, 'DESCRIPTOR', None), Descriptor)
+
+
+def message_to_json_concise(m):
+    """
+    Return the most concise string representation of a protobuf. Good for
+    things where size matters (e.g., generating hash).
+    """
+    return MessageToJson(m, False, True, False)
+
+
+_rev_cache = weakref.WeakValueDictionary()  # cache of config revs
+
+
+_children_fields_cache = {}  # to memoize externally stored field name info
+
+
+class _ChildType(object):
+    """Used to store key metadata about child_node fields in protobuf messages.
+    """
+    __slots__ = (
+        '_module',
+        '_type',
+        '_is_container',
+        '_key',
+        '_key_from_str'
+    )
+
+    def __init__(self, module, type, is_container,
+                 key=None, key_from_str=None):
+        self._module = module
+        self._type = type
+        self._is_container = is_container
+        self._key = key
+        self._key_from_str = key_from_str
+
+    @property
+    def is_container(self):
+        return self._is_container
+
+    @property
+    def key(self):
+        return self._key
+
+    @property
+    def key_from_str(self):
+        return self._key_from_str
+
+    @property
+    def module(self):
+        return self._module
+
+    @property
+    def type(self):
+        return self._type
+
+
+def children_fields(cls):
+    """
+    Return a map of externally stored fields for this protobuf message type.
+    What is stored as branch node is determined by the "child_node"
+    annotation in the protobuf definitions.
+    With each external field, we store if the field is a container, if a
+    container is keyed (indexed), and what is the function that converts
+    path substring back to the key.
+    """
+    names = _children_fields_cache.get(cls)
+
+    if names is None:
+        names = {}
+
+        for field in cls.DESCRIPTOR.fields:
+
+            if field.has_options:
+                options = field.GetOptions()
+
+                if options.HasExtension(meta_pb2.child_node):
+                    is_container = field.label == 3
+                    meta = options.Extensions[meta_pb2.child_node]
+                    key_from_str = None
+
+                    if meta.key:
+                        key_field = field.message_type.fields_by_name[meta.key]
+                        key_type = key_field.type
+
+                        if key_type == key_field.TYPE_STRING:
+                            key_from_str = lambda s: s
+
+                        elif key_type in (
+                                key_field.TYPE_FIXED32,
+                                key_field.TYPE_FIXED64,
+                                key_field.TYPE_INT32,
+                                key_field.TYPE_INT64,
+                                key_field.TYPE_SFIXED32,
+                                key_field.TYPE_SFIXED64,
+                                key_field.TYPE_SINT32,
+                                key_field.TYPE_SINT64,
+                                key_field.TYPE_UINT32,
+                                key_field.TYPE_UINT64):
+                            key_from_str = lambda s: int(s)
+
+                        else:
+                            raise NotImplementedError()
+
+                    field_class = field.message_type._concrete_class
+                    names[field.name] = _ChildType(
+                        module=field_class.__module__,
+                        type=field_class.__name__,
+                        is_container=is_container,
+                        key=meta.key,
+                        key_from_str=key_from_str
+                    )
+
+        _children_fields_cache[cls] = names
+
+    return names
+
+
+_access_right_cache = {}  # to memoize field access right restrictions
+
+
+def access_rights(cls):
+    """
+    Determine the access rights for each field and cache these maps for
+    fast retrieval.
+    """
+    access_map = _access_right_cache.get(cls)
+    if access_map is None:
+        access_map = {}
+        for field in cls.DESCRIPTOR.fields:
+            if field.has_options:
+                options = field.GetOptions()
+                if options.HasExtension(meta_pb2.access):
+                    access = options.Extensions[meta_pb2.access]
+                    access_map[field.name] = access
+        _access_right_cache[cls] = access_map
+    return access_map
+
+
+class ConfigDataRevision(object):
+    """
+    Holds a specific snapshot of the local configuration for config node.
+    It shall be treated as an immutable object, although in Python this is
+    very difficult to enforce!
+    As such, we can compute a unique hash based on the config data which
+    can be used to establish equivalence. It also has a time-stamp to track
+    changes.
+
+    This object must be treated as immutable, including its nested config data.
+    This is very important. The entire config module depends on hashes
+    we create over the data, so altering the data can lead to unpredictable
+    detriments.
+    """
+
+    __slots__ = (
+        '_data',
+        '_hash',
+        '__weakref__'
+    )
+
+    def __init__(self, data):
+        self._data = data
+        self._hash = self._hash_data(data)
+
+    @property
+    def data(self):
+        return self._data
+
+    @property
+    def hash(self):
+        return self._hash
+
+    @staticmethod
+    def _hash_data(data):
+        """Hash function to be used to track version changes of config nodes"""
+        if isinstance(data, (dict, list)):
+            to_hash = dumps(data, sort_keys=True)
+        elif is_proto_message(data):
+            to_hash = ':'.join((
+                data.__class__.__module__,
+                data.__class__.__name__,
+                data.SerializeToString()))
+        else:
+            to_hash = str(hash(data))
+        return md5(to_hash).hexdigest()[:12]
+
+
+class ConfigRevision(object):
+    """
+    Holds not only the local config data, but also the external children
+    reference lists, per field name.
+    Recall that externally stored fields are those marked "child_node" in
+    the protobuf definition.
+    This object must be treated as immutable, including its config data.
+    """
+
+    __slots__ = (
+        '_config',
+        '_children',
+        '_hash',
+        '_branch',
+        '__weakref__'
+    )
+
+    def __init__(self, branch, data, children=None):
+        self._branch = branch
+        self._config = ConfigDataRevision(data)
+        self._children = children
+        self._finalize()
+
+    def _finalize(self):
+        self._hash = self._hash_content()
+        if self._hash not in _rev_cache:
+            _rev_cache[self._hash] = self
+        if self._config._hash not in _rev_cache:
+            _rev_cache[self._config._hash] = self._config
+        else:
+            self._config = _rev_cache[self._config._hash]  # re-use!
+
+    def _hash_content(self):
+        # hash is derived from config hash and hashes of all children
+        m = md5('' if self._config is None else self._config._hash)
+        if self._children is not None:
+            for child_field in sorted(self._children.keys()):
+                children = self._children[child_field]
+                assert isinstance(children, list)
+                m.update(''.join(c._hash for c in children))
+        return m.hexdigest()[:12]
+
+    @property
+    def hash(self):
+        return self._hash
+
+    @property
+    def data(self):
+        return None if self._config is None else self._config.data
+
+    @property
+    def node(self):
+        return self._branch._node
+
+    @property
+    def type(self):
+        return self._config.data.__class__
+
+    def clear_hash(self):
+        self._hash = None
+
+    def get(self, depth):
+        """
+        Get config data of node. If depth > 0, recursively assemble the
+        branch nodes. If depth is < 0, this results in a fully exhaustive
+        "complete config".
+        """
+        orig_data = self._config.data
+        data = orig_data.__class__()
+        data.CopyFrom(orig_data)
+        if depth:
+            # collect children
+            cfields = children_fields(self.type).iteritems()
+            for field_name, field in cfields:
+                if field.is_container:
+                    for rev in self._children[field_name]:
+                        child_data = rev.get(depth=depth - 1)
+                        child_data_holder = getattr(data, field_name).add()
+                        child_data_holder.MergeFrom(child_data)
+                else:
+                    rev = self._children[field_name][0]
+                    child_data = rev.get(depth=depth - 1)
+                    child_data_holder = getattr(data, field_name)
+                    child_data_holder.MergeFrom(child_data)
+        return data
+
+    def update_data(self, data, branch):
+        """Return a NEW revision which is updated for the modified data"""
+        new_rev = copy(self)
+        new_rev._branch = branch
+        new_rev._config = self._config.__class__(data)
+        new_rev._finalize()
+        return new_rev
+
+    def update_children(self, name, children, branch):
+        """Return a NEW revision which is updated for the modified children"""
+        new_children = self._children.copy()
+        new_children[name] = children
+        new_rev = copy(self)
+        new_rev._branch = branch
+        new_rev._children = new_children
+        new_rev._finalize()
+        return new_rev
+
+    def update_all_children(self, children, branch):
+        """Return a NEW revision which is updated for all children entries"""
+        new_rev = copy(self)
+        new_rev._branch = branch
+        new_rev._children = children
+        new_rev._finalize()
+        return new_rev
diff --git a/python/core/config/config_rev_persisted.py b/python/core/config/config_rev_persisted.py
new file mode 100644
index 0000000..8b25b82
--- /dev/null
+++ b/python/core/config/config_rev_persisted.py
@@ -0,0 +1,143 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A config rev object that persists itself
+"""
+from bz2 import compress, decompress
+
+import structlog
+from simplejson import dumps, loads
+
+from voltha.core.config.config_rev import ConfigRevision, children_fields
+
+log = structlog.get_logger()
+
+
+class PersistedConfigRevision(ConfigRevision):
+
+    compress = False
+
+    __slots__ = ('_kv_store',)
+
+    def __init__(self, branch, data, children=None):
+        self._kv_store = branch._node._root._kv_store
+        super(PersistedConfigRevision, self).__init__(branch, data, children)
+
+    def _finalize(self):
+        super(PersistedConfigRevision, self)._finalize()
+        self.store()
+
+    def __del__(self):
+        try:
+            if self._hash:
+                if self._config.__weakref__ is None:
+                    if self._config._hash in self._kv_store:
+                        del self._kv_store[self._config._hash]
+                assert self.__weakref__ is None
+                if self._hash in self._kv_store:
+                    del self._kv_store[self._hash]
+        except Exception, e:
+            # this should never happen
+            log.exception('del-error', hash=self.hash, e=e)
+
+    def store(self):
+
+        try:
+            # crude serialization of children hash and config data hash
+            if self._hash in self._kv_store:
+                return
+
+            self.store_config()
+
+            children_lists = {}
+            for field_name, children in self._children.iteritems():
+                hashes = [rev.hash for rev in children]
+                children_lists[field_name] = hashes
+
+            data = dict(
+                children=children_lists,
+                config=self._config._hash
+            )
+            blob = dumps(data)
+            if self.compress:
+                blob = compress(blob)
+
+            self._kv_store[self._hash] = blob
+
+        except Exception, e:
+            log.exception('store-error', e=e)
+
+    @classmethod
+    def load(cls, branch, kv_store, msg_cls, hash):
+        #  Update the branch's config store
+        blob = kv_store[hash]
+        if cls.compress:
+            blob = decompress(blob)
+        data = loads(blob)
+
+        config_hash = data['config']
+        config_data = cls.load_config(kv_store, msg_cls, config_hash)
+
+        children_list = data['children']
+        assembled_children = {}
+        node = branch._node
+        for field_name, meta in children_fields(msg_cls).iteritems():
+            child_msg_cls = tmp_cls_loader(meta.module, meta.type)
+            children = []
+            for child_hash in children_list[field_name]:
+                child_node = node._mknode(child_msg_cls)
+                child_node.load_latest(child_hash)
+                child_rev = child_node.latest
+                children.append(child_rev)
+            assembled_children[field_name] = children
+        rev = cls(branch, config_data, assembled_children)
+        return rev
+
+    def store_config(self):
+        if self._config._hash in self._kv_store:
+            return
+
+        # crude serialization of config data
+        blob = self._config._data.SerializeToString()
+        if self.compress:
+            blob = compress(blob)
+
+        self._kv_store[self._config._hash] = blob
+
+    @classmethod
+    def load_config(cls, kv_store, msg_cls, config_hash):
+        blob = kv_store[config_hash]
+        if cls.compress:
+            blob = decompress(blob)
+
+        # TODO use a loader later on
+        data = msg_cls()
+        data.ParseFromString(blob)
+        return data
+
+
+def tmp_cls_loader(module_name, cls_name):
+    # TODO this shall be generalized
+    from voltha.protos import voltha_pb2, health_pb2, adapter_pb2, \
+        logical_device_pb2, device_pb2, openflow_13_pb2, bbf_fiber_base_pb2, \
+        bbf_fiber_traffic_descriptor_profile_body_pb2, \
+        bbf_fiber_tcont_body_pb2, bbf_fiber_gemport_body_pb2, \
+        bbf_fiber_multicast_gemport_body_pb2, \
+        bbf_fiber_multicast_distribution_set_body_pb2, \
+        omci_mib_db_pb2, \
+        omci_alarm_db_pb2
+    return getattr(locals()[module_name], cls_name)
diff --git a/python/core/config/config_root.py b/python/core/config/config_root.py
new file mode 100644
index 0000000..4b1006d
--- /dev/null
+++ b/python/core/config/config_root.py
@@ -0,0 +1,229 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from uuid import uuid4
+
+import structlog
+from simplejson import dumps, loads
+
+from voltha.core.config.config_node import ConfigNode
+from voltha.core.config.config_rev import ConfigRevision
+from voltha.core.config.config_rev_persisted import PersistedConfigRevision
+from voltha.core.config.merge_3way import MergeConflictException
+
+log = structlog.get_logger()
+
+
+class ConfigRoot(ConfigNode):
+
+    __slots__ = (
+        '_dirty_nodes',  # holds set of modified nodes per transaction branch
+        '_kv_store',
+        '_loading',
+        '_rev_cls',
+        '_deferred_callback_queue',
+        '_notification_deferred_callback_queue'
+    )
+
+    def __init__(self, initial_data, kv_store=None, rev_cls=ConfigRevision):
+        self._kv_store = kv_store
+        self._dirty_nodes = {}
+        self._loading = False
+        if kv_store is not None and \
+                not issubclass(rev_cls, PersistedConfigRevision):
+            rev_cls = PersistedConfigRevision
+        self._rev_cls = rev_cls
+        self._deferred_callback_queue = []
+        self._notification_deferred_callback_queue = []
+        super(ConfigRoot, self).__init__(self, initial_data, False)
+
+    @property
+    def kv_store(self):
+        if self._loading:
+            # provide fake store for storing things
+            # TODO this shall be a fake_dict providing noop for all relevant
+            # operations
+            return dict()
+        else:
+            return self._kv_store
+
+    def mkrev(self, *args, **kw):
+        return self._rev_cls(*args, **kw)
+
+    def mk_txbranch(self):
+        txid = uuid4().hex[:12]
+        self._dirty_nodes[txid] = {self}
+        self._mk_txbranch(txid)
+        return txid
+
+    def del_txbranch(self, txid):
+        for dirty_node in self._dirty_nodes[txid]:
+            dirty_node._del_txbranch(txid)
+        del self._dirty_nodes[txid]
+
+    def fold_txbranch(self, txid):
+        try:
+            self._merge_txbranch(txid, dry_run=1)
+        except MergeConflictException:
+            self.del_txbranch(txid)
+            raise
+
+        try:
+            self._merge_txbranch(txid)
+        finally:
+            self.execute_deferred_callbacks()
+
+    # ~~~~~~ Overridden, root-level CRUD methods to handle transactions ~~~~~~~
+
+    def update(self, path, data, strict=None, txid=None, mk_branch=None):
+        assert mk_branch is None
+        self.check_callback_queue()
+        try:
+            if txid is not None:
+                dirtied = self._dirty_nodes[txid]
+
+                def track_dirty(node):
+                    dirtied.add(node)
+                    return node._mk_txbranch(txid)
+
+                res = super(ConfigRoot, self).update(path, data, strict,
+                                                          txid, track_dirty)
+            else:
+                res = super(ConfigRoot, self).update(path, data, strict)
+        finally:
+            self.execute_deferred_callbacks()
+        return res
+
+    def add(self, path, data, txid=None, mk_branch=None):
+        assert mk_branch is None
+        self.check_callback_queue()
+        try:
+            if txid is not None:
+                dirtied = self._dirty_nodes[txid]
+
+                def track_dirty(node):
+                    dirtied.add(node)
+                    return node._mk_txbranch(txid)
+
+                res = super(ConfigRoot, self).add(path, data, txid, track_dirty)
+            else:
+                res = super(ConfigRoot, self).add(path, data)
+        finally:
+            self.execute_deferred_callbacks()
+        return res
+
+    def remove(self, path, txid=None, mk_branch=None):
+        assert mk_branch is None
+        self.check_callback_queue()
+        try:
+            if txid is not None:
+                dirtied = self._dirty_nodes[txid]
+
+                def track_dirty(node):
+                    dirtied.add(node)
+                    return node._mk_txbranch(txid)
+
+                res = super(ConfigRoot, self).remove(path, txid, track_dirty)
+            else:
+                res = super(ConfigRoot, self).remove(path)
+        finally:
+            self.execute_deferred_callbacks()
+        return res
+
+    def check_callback_queue(self):
+        assert len(self._deferred_callback_queue) == 0
+
+    def enqueue_callback(self, func, *args, **kw):
+        self._deferred_callback_queue.append((func, args, kw))
+
+    def enqueue_notification_callback(self, func, *args, **kw):
+        """
+        A separate queue is required for notification.  Previously, when the
+        notifications were added to the self._deferred_callback_queue there
+        was a deadlock condition where two callbacks were added (one
+        related to the model change and one for the notification related to
+        that model change).  Since the model change requires the
+        self._deferred_callback_queue to be empty then there was a deadlock
+        in that scenario.   The simple approach to avoid this problem is to
+        have separate queues for model and notification.
+        TODO: Investigate whether there is a need for the
+        self._deferred_callback_queue to handle multiple model events at the same time
+        :param func: callback function
+        :param args: args
+        :param kw: key-value args
+        :return: None
+        """
+        self._notification_deferred_callback_queue.append((func, args, kw))
+
+    def execute_deferred_callbacks(self):
+        # First process the model-triggered related callbacks
+        while self._deferred_callback_queue:
+            func, args, kw = self._deferred_callback_queue.pop(0)
+            func(*args, **kw)
+
+        # Execute the notification callbacks
+        while self._notification_deferred_callback_queue:
+            func, args, kw = self._notification_deferred_callback_queue.pop(0)
+            func(*args, **kw)
+
+
+    # ~~~~~~~~~~~~~~~~ Persistence related ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    @classmethod
+    def load(cls, root_msg_cls, kv_store):
+        # need to use fake kv store during initial load for not to override
+        # our real k vstore
+        fake_kv_store = dict()  # shall use more efficient mock dict
+        root = cls(root_msg_cls(), kv_store=fake_kv_store,
+                   rev_cls=PersistedConfigRevision)
+        # we can install the real store now
+        root._kv_store = kv_store
+        root.load_from_persistence(root_msg_cls)
+        return root
+
+    def _make_latest(self, branch, *args, **kw):
+        super(ConfigRoot, self)._make_latest(branch, *args, **kw)
+        # only persist the committed branch
+        if self._kv_store is not None and branch._txid is None:
+            root_data = dict(
+                latest=branch._latest._hash,
+                tags=dict((k, v._hash) for k, v in self._tags.iteritems())
+            )
+            blob = dumps(root_data)
+            self._kv_store['root'] = blob
+
+    def persist_tags(self):
+        if self._kv_store is not None:
+            root_data = loads(self.kv_store['root'])
+            root_data = dict(
+                latest=root_data['latest'],
+                tags=dict((k, v._hash) for k, v in self._tags.iteritems())
+            )
+            blob = dumps(root_data)
+            self._kv_store['root'] = blob
+
+    def load_from_persistence(self, root_msg_cls):
+        self._loading = True
+        blob = self._kv_store['root']
+        root_data = loads(blob)
+
+        for tag, hash in root_data['tags'].iteritems():
+            self.load_latest(hash)
+            self._tags[tag] = self.latest
+
+        self.load_latest(root_data['latest'])
+
+        self._loading = False
+
diff --git a/python/core/config/config_txn.py b/python/core/config/config_txn.py
new file mode 100644
index 0000000..87dfc59
--- /dev/null
+++ b/python/core/config/config_txn.py
@@ -0,0 +1,73 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class ClosedTransactionError(Exception):
+    pass
+
+
+class ConfigTransaction(object):
+
+    __slots__ = (
+        '_proxy',
+        '_txid'
+    )
+
+    def __init__(self, proxy, txid):
+        self._proxy = proxy
+        self._txid = txid
+
+    def __del__(self):
+        if self._txid:
+            try:
+                self.cancel()
+            except:
+                raise
+
+    # ~~~~~~~~~~~~~~~~~~~~ CRUD ops within the transaction ~~~~~~~~~~~~~~~~~~~~
+
+    def get(self, path='/', depth=None, deep=None):
+        if self._txid is None:
+            raise ClosedTransactionError()
+        return self._proxy.get(path, depth=depth, deep=deep, txid=self._txid)
+
+    def update(self, path, data, strict=False):
+        if self._txid is None:
+            raise ClosedTransactionError()
+        return self._proxy.update(path, data, strict, self._txid)
+
+    def add(self, path, data):
+        if self._txid is None:
+            raise ClosedTransactionError()
+        return self._proxy.add(path, data, self._txid)
+
+    def remove(self, path):
+        if self._txid is None:
+            raise ClosedTransactionError()
+        return self._proxy.remove(path, self._txid)
+
+    # ~~~~~~~~~~~~~~~~~~~~ transaction finalization ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def cancel(self):
+        """Explicitly cancel the transaction"""
+        self._proxy.cancel_transaction(self._txid)
+        self._txid = None
+
+    def commit(self):
+        """Commit all transaction changes"""
+        try:
+            self._proxy.commit_transaction(self._txid)
+        finally:
+            self._txid = None
diff --git a/python/core/config/merge_3way.py b/python/core/config/merge_3way.py
new file mode 100644
index 0000000..5444a6c
--- /dev/null
+++ b/python/core/config/merge_3way.py
@@ -0,0 +1,267 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+3-way merge function for config rev objects.
+"""
+from collections import OrderedDict
+from copy import copy
+
+from voltha.core.config.config_proxy import CallbackType, OperationContext
+from voltha.core.config.config_rev import children_fields
+
+
+class MergeConflictException(Exception):
+    pass
+
+
+def merge_3way(fork_rev, src_rev, dst_rev, merge_child_func, dry_run=False):
+    """
+    Attempt to merge src_rev into dst_rev but taking into account what have
+    changed in both revs since the last known common point, the fork_rev.
+    In case of conflict, raise a MergeConflictException(). If dry run is True,
+    don't actually perform the merge, but detect potential conflicts.
+
+    This function recurses into all children nodes stored under the rev and
+    performs the merge if the children is also part of a transaction branch.
+
+    :param fork_rev: Point of forking (last known common state between branches
+    :param src_rev: Latest rev from which we merge to dst_rev
+    :param dst_rev: Target (destination) rev
+    :param merge_child_fun: To run a potential merge in all children that
+    may need merge (determined from the local changes)
+    :param dry_run: If True, do not perform the merge, but detect merge
+    conflicts.
+    :return: The new dst_rev (a new rev instance) the list of changes that
+    occurred in this node or any of its children as part of this merge.
+    """
+
+    # to collect change tuples of (<callback-type>, <op-context>)
+    changes = []
+
+    class AnalyzeChanges(object):
+        def __init__(self, lst1, lst2, keyname):
+            self.keymap1 = OrderedDict((getattr(rev._config._data, keyname), i)
+                                       for i, rev in enumerate(lst1))
+            self.keymap2 = OrderedDict((getattr(rev._config._data, keyname), i)
+                                       for i, rev in enumerate(lst2))
+            self.added_keys = [
+                k for k in self.keymap2.iterkeys() if k not in self.keymap1]
+            self.removed_keys = [
+                k for k in self.keymap1.iterkeys() if k not in self.keymap2]
+            self.changed_keys = [
+                k for k in self.keymap1.iterkeys()
+                if k in self.keymap2 and
+                    lst1[self.keymap1[k]]._hash != lst2[self.keymap2[k]]._hash
+            ]
+
+    # Note: there are a couple of special cases that can be optimized
+    # for larer on. But since premature optimization is a bad idea, we
+    # defer them.
+
+    # deal with config data first
+    if dst_rev._config is fork_rev._config:
+        # no change in master, accept src if different
+        config_changed = dst_rev._config != src_rev._config
+    else:
+        if dst_rev._config.hash != src_rev._config.hash:
+            raise MergeConflictException('Config collision')
+        config_changed = True
+
+    # now to the external children fields
+    new_children = dst_rev._children.copy()
+    _children_fields = children_fields(fork_rev.data.__class__)
+
+    for field_name, field in _children_fields.iteritems():
+
+        fork_list = fork_rev._children[field_name]
+        src_list = src_rev._children[field_name]
+        dst_list = dst_rev._children[field_name]
+
+        if dst_list == src_list:
+            # we do not need to change the dst, however we still need
+            # to complete the branch purging in child nodes so not
+            # to leave dangling branches around
+            [merge_child_func(rev) for rev in src_list]
+            continue
+
+        if not field.key:
+            # If the list is not keyed, we really should not merge. We merely
+            # check for collision, i.e., if both changed (and not same)
+            if dst_list == fork_list:
+                # dst branch did not change since fork
+
+                assert src_list != fork_list, 'We should not be here otherwise'
+
+                # the incoming (src) rev changed, and we have to apply it
+                new_children[field_name] = [
+                    merge_child_func(rev) for rev in src_list]
+
+                if field.is_container:
+                    changes.append((CallbackType.POST_LISTCHANGE,
+                                    OperationContext(field_name=field_name)))
+
+            else:
+                if src_list != fork_list:
+                    raise MergeConflictException(
+                        'Cannot merge because single child node or un-keyed'
+                        'children list has changed')
+
+        else:
+
+            if dst_list == fork_list:
+                # Destination did not change
+
+                # We need to analyze only the changes on the incoming rev
+                # since fork
+                src = AnalyzeChanges(fork_list, src_list, field.key)
+
+                new_list = copy(src_list)  # we start from the source list
+
+                for key in src.added_keys:
+                    idx = src.keymap2[key]
+                    new_rev = merge_child_func(new_list[idx])
+                    new_list[idx] = new_rev
+                    changes.append(
+                        (CallbackType.POST_ADD,
+                         new_rev.data))
+                         # OperationContext(
+                         #     field_name=field_name,
+                         #     child_key=key,
+                         #     data=new_rev.data)))
+
+                for key in src.removed_keys:
+                    old_rev = fork_list[src.keymap1[key]]
+                    changes.append((
+                        CallbackType.POST_REMOVE,
+                        old_rev.data))
+                        # OperationContext(
+                        #     field_name=field_name,
+                        #     child_key=key,
+                        #     data=old_rev.data)))
+
+                for key in src.changed_keys:
+                    idx = src.keymap2[key]
+                    new_rev = merge_child_func(new_list[idx])
+                    new_list[idx] = new_rev
+                    # updated child gets its own change event
+
+                new_children[field_name] = new_list
+
+            else:
+
+                # For keyed fields we can really investigate what has been
+                # added, removed, or changed in both branches and do a
+                # fine-grained collision detection and merge
+
+                src = AnalyzeChanges(fork_list, src_list, field.key)
+                dst = AnalyzeChanges(fork_list, dst_list, field.key)
+
+                new_list = copy(dst_list)  # this time we start with the dst
+
+                for key in src.added_keys:
+                    # we cannot add if it has been added and is different
+                    if key in dst.added_keys:
+                        # it has been added to both, we need to check if
+                        # they are the same
+                        child_dst_rev = dst_list[dst.keymap2[key]]
+                        child_src_rev = src_list[src.keymap2[key]]
+                        if child_dst_rev.hash == child_src_rev.hash:
+                            # they match, so we do not need to change the
+                            # dst list, but we still need to purge the src
+                            # branch
+                            merge_child_func(child_dst_rev)
+                        else:
+                            raise MergeConflictException(
+                                'Cannot add because it has been added and '
+                                'different'
+                            )
+                    else:
+                        # this is a brand new key, need to add it
+                        new_rev = merge_child_func(src_list[src.keymap2[key]])
+                        new_list.append(new_rev)
+                        changes.append((
+                            CallbackType.POST_ADD,
+                            new_rev.data))
+                            # OperationContext(
+                            #     field_name=field_name,
+                            #     child_key=key,
+                            #     data=new_rev.data)))
+
+                for key in src.changed_keys:
+                    # we cannot change if it was removed in dst
+                    if key in dst.removed_keys:
+                        raise MergeConflictException(
+                            'Cannot change because it has been removed')
+
+                    # if it changed in dst as well, we need to check if they
+                    # match (same change
+                    elif key in dst.changed_keys:
+                        child_dst_rev = dst_list[dst.keymap2[key]]
+                        child_src_rev = src_list[src.keymap2[key]]
+                        if child_dst_rev.hash == child_src_rev.hash:
+                            # they match, so we do not need to change the
+                            # dst list, but we still need to purge the src
+                            # branch
+                            merge_child_func(child_src_rev)
+                        elif child_dst_rev._config.hash != child_src_rev._config.hash:
+                            raise MergeConflictException(
+                                'Cannot update because it has been changed and '
+                                'different'
+                            )
+                        else:
+                            new_rev = merge_child_func(
+                                src_list[src.keymap2[key]])
+                            new_list[dst.keymap2[key]] = new_rev
+                            # no announcement for child update
+
+                    else:
+                        # it only changed in src branch
+                        new_rev = merge_child_func(src_list[src.keymap2[key]])
+                        new_list[dst.keymap2[key]] = new_rev
+                        # no announcement for child update
+
+                for key in reversed(src.removed_keys):  # we go from highest
+                                                        # index to lowest
+
+                    # we cannot remove if it has changed in dst
+                    if key in dst.changed_keys:
+                        raise MergeConflictException(
+                            'Cannot remove because it has changed')
+
+                    # if it has not been removed yet from dst, then remove it
+                    if key not in dst.removed_keys:
+                        dst_idx = dst.keymap2[key]
+                        old_rev = new_list.pop(dst_idx)
+                        changes.append((
+                            CallbackType.POST_REMOVE,
+                            old_rev.data))
+                            # OperationContext(
+                            #     field_name=field_name,
+                            #     child_key=key,
+                            #     data=old_rev.data)))
+
+                new_children[field_name] = new_list
+
+    if not dry_run:
+        rev = src_rev if config_changed else dst_rev
+        rev = rev.update_all_children(new_children, dst_rev._branch)
+        if config_changed:
+            changes.append((CallbackType.POST_UPDATE, rev.data))
+        return rev, changes
+
+    else:
+        return None, None
diff --git a/python/core/device_graph.py b/python/core/device_graph.py
new file mode 100644
index 0000000..a4e6d85
--- /dev/null
+++ b/python/core/device_graph.py
@@ -0,0 +1,136 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import networkx as nx
+
+from voltha.core.flow_decomposer import RouteHop
+
+
+class DeviceGraph(object):
+
+    """
+    Mixin class to compute routes in the device graph within
+    a logical device.
+    """
+
+    def compute_routes(self, root_proxy, logical_ports):
+        boundary_ports, graph = self._build_graph(root_proxy, logical_ports)
+        routes = self._build_routes(boundary_ports, graph, logical_ports)
+        return graph, routes
+
+    def _build_graph(self, root_proxy, logical_ports):
+
+        graph = nx.Graph()
+
+        # walk logical device's device and port links to discover full graph
+        devices_added = set()  # set of device.id's
+        ports_added = set()  # set of (device.id, port_no) tuples
+        peer_links = set()
+
+        boundary_ports = dict(
+            ((lp.device_id, lp.device_port_no), lp.ofp_port.port_no)
+            for lp in logical_ports
+        )
+
+        def add_device(device):
+            if device.id in devices_added:
+                return
+
+            graph.add_node(device.id, device=device)
+            devices_added.add(device.id)
+
+            ports = root_proxy.get('/devices/{}/ports'.format(device.id))
+            for port in ports:
+                port_id = (device.id, port.port_no)
+                if port_id not in ports_added:
+                    boundary = port_id in boundary_ports
+                    graph.add_node(port_id, port=port, boundary=boundary)
+                    graph.add_edge(device.id, port_id)
+                for peer in port.peers:
+                    if peer.device_id not in devices_added:
+                        peer_device = root_proxy.get(
+                            '/devices/{}'.format(peer.device_id))
+                        add_device(peer_device)
+                    else:
+                        peer_port_id = (peer.device_id, peer.port_no)
+                        if port_id < peer_port_id:
+                            peer_link = (port_id, peer_port_id)
+                        else:
+                            peer_link = (peer_port_id, port_id)
+                        if peer_link not in peer_links:
+                            graph.add_edge(*peer_link)
+                            peer_links.add(peer_link)
+
+        for logical_port in logical_ports:
+            device_id = logical_port.device_id
+            device = root_proxy.get('/devices/{}'.format(device_id))
+            add_device(device)
+
+        return boundary_ports, graph
+
+    def _build_routes(self, boundary_ports, graph, logical_ports):
+
+        root_ports = dict((lp.ofp_port.port_no, lp.root_port)
+                          for lp in logical_ports if lp.root_port == True)
+
+        routes = {}
+
+        for source, source_port_no in boundary_ports.iteritems():
+            for target, target_port_no in boundary_ports.iteritems():
+
+                if source is target:
+                    continue
+
+                # Ignore NNI - NNI routes
+                if source_port_no in root_ports \
+                        and target_port_no in root_ports:
+                    continue
+
+                # Ignore UNI - UNI routes
+                if source_port_no not in root_ports \
+                        and target_port_no not in root_ports:
+                    continue
+
+                path = nx.shortest_path(graph, source, target)
+
+                # number of nodes in valid paths is always multiple of 3
+                if len(path) % 3:
+                    continue
+
+                # in fact, we currently deal with single fan-out networks,
+                # so the number of hops is always 6
+                assert len(path) == 6
+
+                ingress_input_port, ingress_device, ingress_output_port, \
+                egress_input_port, egress_device, egress_output_port = path
+
+                ingress_hop = RouteHop(
+                    device=graph.node[ingress_device]['device'],
+                    ingress_port=graph.node[ingress_input_port]['port'],
+                    egress_port=graph.node[ingress_output_port]['port']
+                )
+                egress_hop = RouteHop(
+                    device=graph.node[egress_device]['device'],
+                    ingress_port=graph.node[egress_input_port]['port'],
+                    egress_port=graph.node[egress_output_port]['port']
+                )
+
+                routes[(source_port_no, target_port_no)] = [
+                    ingress_hop, egress_hop
+                ]
+
+        return routes
+
diff --git a/python/core/flow_decomposer.py b/python/core/flow_decomposer.py
new file mode 100644
index 0000000..faf3141
--- /dev/null
+++ b/python/core/flow_decomposer.py
@@ -0,0 +1,1010 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A mix-in class implementing flow decomposition
+"""
+from collections import OrderedDict
+from copy import copy, deepcopy
+from hashlib import md5
+
+import structlog
+
+from voltha.protos import third_party
+from voltha.protos import openflow_13_pb2 as ofp
+from common.tech_profile import tech_profile
+_ = third_party
+log = structlog.get_logger()
+
+
+# aliases
+ofb_field = ofp.ofp_oxm_ofb_field
+action = ofp.ofp_action
+
+# OFPAT_* shortcuts
+OUTPUT = ofp.OFPAT_OUTPUT
+COPY_TTL_OUT = ofp.OFPAT_COPY_TTL_OUT
+COPY_TTL_IN = ofp.OFPAT_COPY_TTL_IN
+SET_MPLS_TTL = ofp.OFPAT_SET_MPLS_TTL
+DEC_MPLS_TTL = ofp.OFPAT_DEC_MPLS_TTL
+PUSH_VLAN = ofp.OFPAT_PUSH_VLAN
+POP_VLAN = ofp.OFPAT_POP_VLAN
+PUSH_MPLS = ofp.OFPAT_PUSH_MPLS
+POP_MPLS = ofp.OFPAT_POP_MPLS
+SET_QUEUE = ofp.OFPAT_SET_QUEUE
+GROUP = ofp.OFPAT_GROUP
+SET_NW_TTL = ofp.OFPAT_SET_NW_TTL
+NW_TTL = ofp.OFPAT_DEC_NW_TTL
+SET_FIELD = ofp.OFPAT_SET_FIELD
+PUSH_PBB = ofp.OFPAT_PUSH_PBB
+POP_PBB = ofp.OFPAT_POP_PBB
+EXPERIMENTER = ofp.OFPAT_EXPERIMENTER
+
+# OFPXMT_OFB_* shortcuts (incomplete)
+IN_PORT = ofp.OFPXMT_OFB_IN_PORT
+IN_PHY_PORT = ofp.OFPXMT_OFB_IN_PHY_PORT
+METADATA = ofp.OFPXMT_OFB_METADATA
+ETH_DST = ofp.OFPXMT_OFB_ETH_DST
+ETH_SRC = ofp.OFPXMT_OFB_ETH_SRC
+ETH_TYPE = ofp.OFPXMT_OFB_ETH_TYPE
+VLAN_VID = ofp.OFPXMT_OFB_VLAN_VID
+VLAN_PCP = ofp.OFPXMT_OFB_VLAN_PCP
+IP_DSCP = ofp.OFPXMT_OFB_IP_DSCP
+IP_ECN = ofp.OFPXMT_OFB_IP_ECN
+IP_PROTO = ofp.OFPXMT_OFB_IP_PROTO
+IPV4_SRC = ofp.OFPXMT_OFB_IPV4_SRC
+IPV4_DST = ofp.OFPXMT_OFB_IPV4_DST
+TCP_SRC = ofp.OFPXMT_OFB_TCP_SRC
+TCP_DST = ofp.OFPXMT_OFB_TCP_DST
+UDP_SRC = ofp.OFPXMT_OFB_UDP_SRC
+UDP_DST = ofp.OFPXMT_OFB_UDP_DST
+SCTP_SRC = ofp.OFPXMT_OFB_SCTP_SRC
+SCTP_DST = ofp.OFPXMT_OFB_SCTP_DST
+ICMPV4_TYPE = ofp.OFPXMT_OFB_ICMPV4_TYPE
+ICMPV4_CODE = ofp.OFPXMT_OFB_ICMPV4_CODE
+ARP_OP = ofp.OFPXMT_OFB_ARP_OP
+ARP_SPA = ofp.OFPXMT_OFB_ARP_SPA
+ARP_TPA = ofp.OFPXMT_OFB_ARP_TPA
+ARP_SHA = ofp.OFPXMT_OFB_ARP_SHA
+ARP_THA = ofp.OFPXMT_OFB_ARP_THA
+IPV6_SRC = ofp.OFPXMT_OFB_IPV6_SRC
+IPV6_DST = ofp.OFPXMT_OFB_IPV6_DST
+IPV6_FLABEL = ofp.OFPXMT_OFB_IPV6_FLABEL
+ICMPV6_TYPE = ofp.OFPXMT_OFB_ICMPV6_TYPE
+ICMPV6_CODE = ofp.OFPXMT_OFB_ICMPV6_CODE
+IPV6_ND_TARGET = ofp.OFPXMT_OFB_IPV6_ND_TARGET
+OFB_IPV6_ND_SLL = ofp.OFPXMT_OFB_IPV6_ND_SLL
+IPV6_ND_TLL = ofp.OFPXMT_OFB_IPV6_ND_TLL
+MPLS_LABEL = ofp.OFPXMT_OFB_MPLS_LABEL
+MPLS_TC = ofp.OFPXMT_OFB_MPLS_TC
+MPLS_BOS = ofp.OFPXMT_OFB_MPLS_BOS
+PBB_ISID = ofp.OFPXMT_OFB_PBB_ISID
+TUNNEL_ID = ofp.OFPXMT_OFB_TUNNEL_ID
+IPV6_EXTHDR = ofp.OFPXMT_OFB_IPV6_EXTHDR
+
+# ofp_action_* shortcuts
+
+def output(port, max_len=ofp.OFPCML_MAX):
+    return action(
+        type=OUTPUT,
+        output=ofp.ofp_action_output(port=port, max_len=max_len)
+    )
+
+def mpls_ttl(ttl):
+    return action(
+        type=SET_MPLS_TTL,
+        mpls_ttl=ofp.ofp_action_mpls_ttl(mpls_ttl=ttl)
+    )
+
+def push_vlan(eth_type):
+    return action(
+        type=PUSH_VLAN,
+        push=ofp.ofp_action_push(ethertype=eth_type)
+    )
+
+def pop_vlan():
+    return action(
+        type=POP_VLAN
+    )
+
+def pop_mpls(eth_type):
+    return action(
+        type=POP_MPLS,
+        pop_mpls=ofp.ofp_action_pop_mpls(ethertype=eth_type)
+    )
+
+def group(group_id):
+    return action(
+        type=GROUP,
+        group=ofp.ofp_action_group(group_id=group_id)
+    )
+
+def nw_ttl(nw_ttl):
+    return action(
+        type=NW_TTL,
+        nw_ttl=ofp.ofp_action_nw_ttl(nw_ttl=nw_ttl)
+    )
+
+def set_field(field):
+    return action(
+        type=SET_FIELD,
+        set_field=ofp.ofp_action_set_field(
+            field=ofp.ofp_oxm_field(
+                oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+                ofb_field=field))
+    )
+
+def experimenter(experimenter, data):
+    return action(
+        type=EXPERIMENTER,
+        experimenter=ofp.ofp_action_experimenter(
+            experimenter=experimenter, data=data)
+    )
+
+
+# ofb_field generators (incomplete set)
+
+def in_port(_in_port):
+    return ofb_field(type=IN_PORT, port=_in_port)
+
+def in_phy_port(_in_phy_port):
+    return ofb_field(type=IN_PHY_PORT, port=_in_phy_port)
+
+def metadata(_table_metadata):
+    return ofb_field(type=METADATA, table_metadata=_table_metadata)
+
+def eth_dst(_eth_dst):
+    return ofb_field(type=ETH_DST, table_metadata=_eth_dst)
+
+def eth_src(_eth_src):
+    return ofb_field(type=ETH_SRC, table_metadata=_eth_src)
+
+def eth_type(_eth_type):
+    return ofb_field(type=ETH_TYPE, eth_type=_eth_type)
+
+def vlan_vid(_vlan_vid):
+    return ofb_field(type=VLAN_VID, vlan_vid=_vlan_vid)
+
+def vlan_pcp(_vlan_pcp):
+    return ofb_field(type=VLAN_PCP, vlan_pcp=_vlan_pcp)
+
+def ip_dscp(_ip_dscp):
+    return ofb_field(type=IP_DSCP, ip_dscp=_ip_dscp)
+
+def ip_ecn(_ip_ecn):
+    return ofb_field(type=IP_ECN, ip_ecn=_ip_ecn)
+
+def ip_proto(_ip_proto):
+    return ofb_field(type=IP_PROTO, ip_proto=_ip_proto)
+
+def ipv4_src(_ipv4_src):
+    return ofb_field(type=IPV4_SRC, ipv4_src=_ipv4_src)
+
+def ipv4_dst(_ipv4_dst):
+    return ofb_field(type=IPV4_DST, ipv4_dst=_ipv4_dst)
+
+def tcp_src(_tcp_src):
+    return ofb_field(type=TCP_SRC, tcp_src=_tcp_src)
+
+def tcp_dst(_tcp_dst):
+    return ofb_field(type=TCP_DST, tcp_dst=_tcp_dst)
+
+def udp_src(_udp_src):
+    return ofb_field(type=UDP_SRC, udp_src=_udp_src)
+
+def udp_dst(_udp_dst):
+    return ofb_field(type=UDP_DST, udp_dst=_udp_dst)
+
+def sctp_src(_sctp_src):
+    return ofb_field(type=SCTP_SRC, sctp_src=_sctp_src)
+
+def sctp_dst(_sctp_dst):
+    return ofb_field(type=SCTP_DST, sctp_dst=_sctp_dst)
+
+def icmpv4_type(_icmpv4_type):
+    return ofb_field(type=ICMPV4_TYPE, icmpv4_type=_icmpv4_type)
+
+def icmpv4_code(_icmpv4_code):
+    return ofb_field(type=ICMPV4_CODE, icmpv4_code=_icmpv4_code)
+
+def arp_op(_arp_op):
+    return ofb_field(type=ARP_OP, arp_op=_arp_op)
+
+def arp_spa(_arp_spa):
+    return ofb_field(type=ARP_SPA, arp_spa=_arp_spa)
+
+def arp_tpa(_arp_tpa):
+    return ofb_field(type=ARP_TPA, arp_tpa=_arp_tpa)
+
+def arp_sha(_arp_sha):
+    return ofb_field(type=ARP_SHA, arp_sha=_arp_sha)
+
+def arp_tha(_arp_tha):
+    return ofb_field(type=ARP_THA, arp_tha=_arp_tha)
+
+def ipv6_src(_ipv6_src):
+    return ofb_field(type=IPV6_SRC, arp_tha=_ipv6_src)
+
+def ipv6_dst(_ipv6_dst):
+    return ofb_field(type=IPV6_DST, arp_tha=_ipv6_dst)
+
+def ipv6_flabel(_ipv6_flabel):
+    return ofb_field(type=IPV6_FLABEL, arp_tha=_ipv6_flabel)
+
+def ipmpv6_type(_icmpv6_type):
+    return ofb_field(type=ICMPV6_TYPE, arp_tha=_icmpv6_type)
+
+def icmpv6_code(_icmpv6_code):
+    return ofb_field(type=ICMPV6_CODE, arp_tha=_icmpv6_code)
+
+def ipv6_nd_target(_ipv6_nd_target):
+    return ofb_field(type=IPV6_ND_TARGET, arp_tha=_ipv6_nd_target)
+
+def ofb_ipv6_nd_sll(_ofb_ipv6_nd_sll):
+    return ofb_field(type=OFB_IPV6_ND_SLL, arp_tha=_ofb_ipv6_nd_sll)
+
+def ipv6_nd_tll(_ipv6_nd_tll):
+    return ofb_field(type=IPV6_ND_TLL, arp_tha=_ipv6_nd_tll)
+
+def mpls_label(_mpls_label):
+    return ofb_field(type=MPLS_LABEL, arp_tha=_mpls_label)
+
+def mpls_tc(_mpls_tc):
+    return ofb_field(type=MPLS_TC, arp_tha=_mpls_tc)
+
+def mpls_bos(_mpls_bos):
+    return ofb_field(type=MPLS_BOS, arp_tha=_mpls_bos)
+
+def pbb_isid(_pbb_isid):
+    return ofb_field(type=PBB_ISID, arp_tha=_pbb_isid)
+
+def tunnel_id(_tunnel_id):
+    return ofb_field(type=TUNNEL_ID, arp_tha=_tunnel_id)
+
+def ipv6_exthdr(_ipv6_exthdr):
+    return ofb_field(type=IPV6_EXTHDR, arp_tha=_ipv6_exthdr)
+
+
+# frequently used extractors:
+
+def get_actions(flow):
+    """Extract list of ofp_action objects from flow spec object"""
+    assert isinstance(flow, ofp.ofp_flow_stats)
+    # we have the following hard assumptions for now
+    actions = []
+    for instruction in flow.instructions:
+        if instruction.type == ofp.OFPIT_APPLY_ACTIONS or instruction.type == ofp.OFPIT_WRITE_ACTIONS:
+            actions.extend(instruction.actions.actions)
+    return actions
+
+
+def get_ofb_fields(flow):
+    assert isinstance(flow, ofp.ofp_flow_stats)
+    assert flow.match.type == ofp.OFPMT_OXM
+    ofb_fields = []
+    for field in flow.match.oxm_fields:
+        assert field.oxm_class == ofp.OFPXMC_OPENFLOW_BASIC
+        ofb_fields.append(field.ofb_field)
+    return ofb_fields
+
+def get_out_port(flow):
+    for action in get_actions(flow):
+        if action.type == OUTPUT:
+            return action.output.port
+    return None
+
+def get_in_port(flow):
+    for field in get_ofb_fields(flow):
+        if field.type == IN_PORT:
+            return field.port
+    return None
+
+def get_goto_table_id(flow):
+    for instruction in flow.instructions:
+        if instruction.type == ofp.OFPIT_GOTO_TABLE:
+            return instruction.goto_table.table_id
+    return None
+
+def get_metadata(flow):
+    ''' legacy get method (only want lower 32 bits '''
+    for field in get_ofb_fields(flow):
+        if field.type == METADATA:
+            return field.table_metadata & 0xffffffff
+    return None
+
+def get_metadata_64_bit(flow):
+    for field in get_ofb_fields(flow):
+        if field.type == METADATA:
+            return field.table_metadata
+    return None
+
+
+def get_port_number_from_metadata(flow):
+    """
+    The port number (UNI on ONU) is in the lower 32-bits of metadata and
+    the inner_tag is in the upper 32-bits
+
+    This is set in the ONOS OltPipeline as a metadata field
+    """
+    md = get_metadata_64_bit(flow)
+
+    if md is None:
+        return None
+
+    if md <= 0xffffffff:
+        log.warn('onos-upgrade-suggested',
+                 netadata=md,
+                 message='Legacy MetaData detected form OltPipeline')
+        return md
+
+    return md & 0xffffffff
+
+
+def get_inner_tag_from_metadata(flow):
+    """
+    The port number (UNI on ONU) is in the lower 32-bits of metadata and
+    the inner_tag is in the upper 32-bits
+
+    This is set in the ONOS OltPipeline as a metadata field
+    """
+    md = get_metadata_64_bit(flow)
+
+    if md is None:
+        return None
+
+    if md <= 0xffffffff:
+        log.warn('onos-upgrade-suggested',
+                 netadata=md,
+                 message='Legacy MetaData detected form OltPipeline')
+        return md
+
+    return (md >> 32) & 0xffffffff
+
+
+# test and extract next table and group information
+def has_next_table(flow):
+    return get_goto_table_id(flow) is not None
+
+def get_group(flow):
+    for action in get_actions(flow):
+        if action.type == GROUP:
+            return action.group.group_id
+    return None
+
+def get_meter_ids_from_flow(flow):
+    meter_ids = list()
+    for instruction in flow.instructions:
+        if instruction.type == ofp.OFPIT_METER:
+            meter_ids.append(instruction.meter.meter_id)
+    return meter_ids
+
+def has_group(flow):
+    return get_group(flow) is not None
+
+def mk_oxm_fields(match_fields):
+    oxm_fields = [
+        ofp.ofp_oxm_field(
+            oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+            ofb_field=field
+        ) for field in match_fields
+        ]
+
+    return oxm_fields
+
+def mk_instructions_from_actions(actions):
+    instructions_action = ofp.ofp_instruction_actions()
+    instructions_action.actions.extend(actions)
+    instruction = ofp.ofp_instruction(type=ofp.OFPIT_APPLY_ACTIONS,
+                                      actions=instructions_action)
+    return [instruction]
+
+def mk_simple_flow_mod(match_fields, actions, command=ofp.OFPFC_ADD,
+                       next_table_id=None, meters=None, **kw):
+    """
+    Convenience function to generare ofp_flow_mod message with OXM BASIC match
+    composed from the match_fields, and single APPLY_ACTIONS instruction with
+    a list if ofp_action objects.
+    :param match_fields: list(ofp_oxm_ofb_field)
+    :param actions: list(ofp_action)
+    :param command: one of OFPFC_*
+    :param kw: additional keyword-based params to ofp_flow_mod
+    :return: initialized ofp_flow_mod object
+    """
+    instructions = [
+        ofp.ofp_instruction(
+            type=ofp.OFPIT_APPLY_ACTIONS,
+            actions=ofp.ofp_instruction_actions(actions=actions)
+        )
+    ]
+
+    if meters is not None:
+        for meter_id in meters:
+            instructions.append(ofp.ofp_instruction(
+                type=ofp.OFPIT_METER,
+                meter=ofp.ofp_instruction_meter(meter_id=meter_id)
+            ))
+
+    if next_table_id is not None:
+        instructions.append(ofp.ofp_instruction(
+            type=ofp.OFPIT_GOTO_TABLE,
+            goto_table=ofp.ofp_instruction_goto_table(table_id=next_table_id)
+        ))
+
+    return ofp.ofp_flow_mod(
+        command=command,
+        match=ofp.ofp_match(
+            type=ofp.OFPMT_OXM,
+            oxm_fields=[
+                ofp.ofp_oxm_field(
+                    oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+                    ofb_field=field
+                ) for field in match_fields
+            ]
+        ),
+        instructions=instructions,
+        **kw
+    )
+
+
+def mk_multicast_group_mod(group_id, buckets, command=ofp.OFPGC_ADD):
+    group = ofp.ofp_group_mod(
+        command=command,
+        type=ofp.OFPGT_ALL,
+        group_id=group_id,
+        buckets=buckets
+    )
+    return group
+
+
+def hash_flow_stats(flow):
+    """
+    Return unique 64-bit integer hash for flow covering the following
+    attributes: 'table_id', 'priority', 'flags', 'cookie', 'match', '_instruction_string'
+    """
+    _instruction_string = ""
+    for _instruction in flow.instructions:
+        _instruction_string += _instruction.SerializeToString()
+
+    hex = md5('{},{},{},{},{},{}'.format(
+        flow.table_id,
+        flow.priority,
+        flow.flags,
+        flow.cookie,
+        flow.match.SerializeToString(),
+        _instruction_string
+    )).hexdigest()
+    return int(hex[:16], 16)
+
+
+def flow_stats_entry_from_flow_mod_message(mod):
+    flow = ofp.ofp_flow_stats(
+        table_id=mod.table_id,
+        priority=mod.priority,
+        idle_timeout=mod.idle_timeout,
+        hard_timeout=mod.hard_timeout,
+        flags=mod.flags,
+        cookie=mod.cookie,
+        match=mod.match,
+        instructions=mod.instructions
+    )
+    flow.id = hash_flow_stats(flow)
+    return flow
+
+
+def group_entry_from_group_mod(mod):
+    group = ofp.ofp_group_entry(
+        desc=ofp.ofp_group_desc(
+            type=mod.type,
+            group_id=mod.group_id,
+            buckets=mod.buckets
+        ),
+        stats=ofp.ofp_group_stats(
+            group_id=mod.group_id
+            # TODO do we need to instantiate bucket bins?
+        )
+    )
+    return group
+
+
+def mk_flow_stat(**kw):
+    return flow_stats_entry_from_flow_mod_message(mk_simple_flow_mod(**kw))
+
+
+def mk_group_stat(**kw):
+    return group_entry_from_group_mod(mk_multicast_group_mod(**kw))
+
+class RouteHop(object):
+    __slots__ = ('_device', '_ingress_port', '_egress_port')
+    def __init__(self, device, ingress_port, egress_port):
+        self._device = device
+        self._ingress_port = ingress_port
+        self._egress_port = egress_port
+    @property
+    def device(self): return self._device
+    @property
+    def ingress_port(self): return self._ingress_port
+    @property
+    def egress_port(self): return self._egress_port
+    def __eq__(self, other):
+        return (
+            self._device == other._device and
+            self._ingress_port == other._ingress_port and
+            self._egress_port == other._egress_port)
+    def __ne__(self, other):
+        return not self.__eq__(other)
+    def __str__(self):
+        return 'RouteHop device_id {}, ingress_port {}, egress_port {}'.format(
+            self._device.id, self._ingress_port, self._egress_port)
+
+class FlowDecomposer(object):
+
+    def __init__(self, *args, **kw):
+        self.logical_device_id = 'this shall be overwritten in derived class'
+        super(FlowDecomposer, self).__init__(*args, **kw)
+
+    # ~~~~~~~~~~~~~~~~~~~~ methods exposed *to* derived class ~~~~~~~~~~~~~~~~~
+
+    def decompose_rules(self, flows, groups):
+        """
+        Generate per-device flows and flow-groups from the flows and groups
+        defined on a logical device
+        :param flows: logical device flows
+        :param groups: logical device flow groups
+        :return: dict(device_id ->
+            (OrderedDict-of-device-flows, OrderedDict-of-device-flow-groups))
+        """
+
+        device_rules = deepcopy(self.get_all_default_rules())
+        group_map = dict((g.desc.group_id, g) for g in groups)
+
+        for flow in flows:
+            for device_id, (_flows, _groups) \
+                    in self.decompose_flow(flow, group_map).iteritems():
+                fl_lst, gr_lst = device_rules.setdefault(
+                    device_id, (OrderedDict(), OrderedDict()))
+                for _flow in _flows:
+                    if _flow.id not in fl_lst:
+                        fl_lst[_flow.id] = _flow
+                for _group in _groups:
+                    if _group.group_id not in gr_lst:
+                        gr_lst[_group.group_id] = _group
+        return device_rules
+
+    def decompose_flow(self, flow, group_map):
+        assert isinstance(flow, ofp.ofp_flow_stats)
+
+        ####################################################################
+        #
+        # limited, heuristics based implementation
+        # needs to be replaced, see https://jira.opencord.org/browse/CORD-841
+        #
+        ####################################################################
+
+        in_port_no = get_in_port(flow)
+        out_port_no = get_out_port(flow)  # may be None
+
+        device_rules = {}  # accumulator
+
+        route = self.get_route(in_port_no, out_port_no)
+        if route is None:
+            log.error('no-route', in_port_no=in_port_no,
+                      out_port_no=out_port_no, comment='deleting flow')
+            self.flow_delete(flow)
+            return device_rules
+
+        assert len(route) == 2
+        ingress_hop, egress_hop = route
+
+        def is_downstream():
+            return ingress_hop.device.root
+
+        def is_upstream():
+            return not is_downstream()
+
+        def update_devices_rules(flow, curr_device_rules, meter_ids=None, table_id=None):
+            actions = [action.type for action in get_actions(flow)]
+            if len(actions) == 1 and OUTPUT in actions:
+                # Transparent ONU and OLT case (No-L2-Modification flow)
+                child_device_flow_lst, _ = curr_device_rules.setdefault(
+                    ingress_hop.device.id, ([], []))
+                parent_device_flow_lst, _ = curr_device_rules.setdefault(
+                    egress_hop.device.id, ([], []))
+
+                child_device_flow_lst.append(mk_flow_stat(
+                    priority=flow.priority,
+                    cookie=flow.cookie,
+                    match_fields=[
+                                     in_port(ingress_hop.ingress_port.port_no)
+                                 ] + [
+                                     field for field in get_ofb_fields(flow)
+                                     if field.type not in (IN_PORT,)
+                                 ],
+                    actions=[
+                        output(ingress_hop.egress_port.port_no)
+                    ]
+                ))
+
+                parent_device_flow_lst.append(mk_flow_stat(
+                    priority=flow.priority,
+                    cookie=flow.cookie,
+                    match_fields=[
+                                     in_port(egress_hop.ingress_port.port_no),
+                                 ] + [
+                                     field for field in get_ofb_fields(flow)
+                                     if field.type not in (IN_PORT,)
+                                 ],
+                    actions=[
+                        output(egress_hop.egress_port.port_no)
+                    ],
+                    table_id=table_id,
+                    meters=meter_ids
+                ))
+
+            else:
+                fl_lst, _ = curr_device_rules.setdefault(
+                    egress_hop.device.id, ([], []))
+                fl_lst.append(mk_flow_stat(
+                    priority=flow.priority,
+                    cookie=flow.cookie,
+                    match_fields=[
+                                     in_port(egress_hop.ingress_port.port_no)
+                                 ] + [
+                                     field for field in get_ofb_fields(flow)
+                                     if field.type not in (IN_PORT,)
+                                 ],
+                    actions=[
+                                action for action in get_actions(flow)
+                                if action.type != OUTPUT
+                            ] + [
+                                output(egress_hop.egress_port.port_no)
+                            ],
+                    table_id=table_id,
+                    meters=meter_ids
+                ))
+
+        if out_port_no is not None and \
+                (out_port_no & 0x7fffffff) == ofp.OFPP_CONTROLLER:
+
+            # UPSTREAM CONTROLLER-BOUND FLOW
+
+            # we assume that the ingress device is already pushing a
+            # customer-specific vlan (c-vid), based on its default flow
+            # rules so there is nothing else to do on the ONU
+
+            # on the olt, we need to push a new tag and set it to 4000
+            # which for now represents in-bound channel to the controller
+            # (via Voltha)
+            # TODO make the 4000 configurable
+            fl_lst, _ = device_rules.setdefault(
+                egress_hop.device.id, ([], []))
+
+            log.info('trap-flow', in_port_no=in_port_no,
+                     nni=self._nni_logical_port_no)
+
+            if in_port_no == self._nni_logical_port_no:
+                log.debug('trap-nni')
+                # Trap flow for NNI port
+                fl_lst.append(mk_flow_stat(
+                    priority=flow.priority,
+                    cookie=flow.cookie,
+                    match_fields=[
+                        in_port(egress_hop.egress_port.port_no)
+                    ] + [
+                        field for field in get_ofb_fields(flow)
+                        if field.type not in (IN_PORT,)
+                    ],
+                    actions=[
+                        action for action in get_actions(flow)
+                    ]
+                ))
+
+            else:
+                log.debug('trap-uni')
+                # Trap flow for UNI port
+
+                # in_port_no is None for wildcard input case, do not include
+                # upstream port for 4000 flow in input
+                if in_port_no is None:
+                    in_ports = self.get_wildcard_input_ports(exclude_port=
+                                                             egress_hop.egress_port.port_no)
+                else:
+                    in_ports = [in_port_no]
+
+                for input_port in in_ports:
+                    fl_lst.append(mk_flow_stat(        # Upstream flow
+                        priority=flow.priority,
+                        cookie=flow.cookie,
+                        match_fields=[
+                            in_port(egress_hop.ingress_port.port_no),
+                            vlan_vid(ofp.OFPVID_PRESENT | input_port)
+                        ] + [
+                            field for field in get_ofb_fields(flow)
+                            if field.type not in (IN_PORT, VLAN_VID)
+                        ],
+                        actions=[
+                            push_vlan(0x8100),
+                            set_field(vlan_vid(ofp.OFPVID_PRESENT | 4000)),
+                            output(egress_hop.egress_port.port_no)]
+                    ))
+                    fl_lst.append(mk_flow_stat(            # Downstream flow
+                        priority=flow.priority,
+                        match_fields=[
+                            in_port(egress_hop.egress_port.port_no),
+                            vlan_vid(ofp.OFPVID_PRESENT | 4000),
+                            vlan_pcp(0),
+                            metadata(input_port)
+                        ],
+                        actions=[
+                            pop_vlan(),
+                            output(egress_hop.ingress_port.port_no)]
+                    ))
+        else:
+            # NOT A CONTROLLER-BOUND FLOW
+            if is_upstream():
+
+                # We assume that anything that is upstream needs to get Q-in-Q
+                # treatment and that this is expressed via two flow rules,
+                # the first using the goto-statement. We also assume that the
+                # inner tag is applied at the ONU, while the outer tag is
+                # applied at the OLT
+                next_table_id = get_goto_table_id(flow)
+                if next_table_id is not None and next_table_id < tech_profile.DEFAULT_TECH_PROFILE_TABLE_ID:
+                    assert out_port_no is None
+                    fl_lst, _ = device_rules.setdefault(
+                        ingress_hop.device.id, ([], []))
+                    fl_lst.append(mk_flow_stat(
+                        priority=flow.priority,
+                        cookie=flow.cookie,
+                        match_fields=[
+                            in_port(ingress_hop.ingress_port.port_no)
+                        ] + [
+                            field for field in get_ofb_fields(flow)
+                            if field.type not in (IN_PORT,)
+                        ],
+                        actions=[
+                            action for action in get_actions(flow)
+                        ] + [
+                            output(ingress_hop.egress_port.port_no)
+                        ]
+                    ))
+
+                elif next_table_id is not None and next_table_id >= tech_profile.DEFAULT_TECH_PROFILE_TABLE_ID:
+                    assert out_port_no is not None
+                    meter_ids = get_meter_ids_from_flow(flow)
+                    update_devices_rules(flow, device_rules, meter_ids, next_table_id)
+                else:
+                    update_devices_rules(flow, device_rules)
+
+            else:  # downstream
+                next_table_id = get_goto_table_id(flow)
+                if next_table_id is not None and next_table_id < tech_profile.DEFAULT_TECH_PROFILE_TABLE_ID:
+                    assert out_port_no is None
+
+                    if get_metadata(flow) is not None:
+                        log.debug('creating-metadata-flow', flow=flow)
+                        # For downstream flows with dual-tags, recalculate route.
+                        port_number = get_port_number_from_metadata(flow)
+
+                        if port_number is not None:
+                            route = self.get_route(in_port_no, port_number)
+                            if route is None:
+                                log.error('no-route-double-tag', in_port_no=in_port_no,
+                                          out_port_no=port_number, comment='deleting flow',
+                                          metadata=get_metadata_64_bit(flow))
+                                self.flow_delete(flow)
+                                return device_rules
+                            assert len(route) == 2
+                            ingress_hop, egress_hop = route
+
+                        inner_tag = get_inner_tag_from_metadata(flow)
+
+                        if inner_tag is None:
+                            log.error('no-inner-tag-double-tag', in_port_no=in_port_no,
+                                      out_port_no=port_number, comment='deleting flow',
+                                      metadata=get_metadata_64_bit(flow))
+                            self.flow_delete(flow)
+                            return device_rules
+
+                        fl_lst, _ = device_rules.setdefault(
+                            ingress_hop.device.id, ([], []))
+                        fl_lst.append(mk_flow_stat(
+                            priority=flow.priority,
+                            cookie=flow.cookie,
+                            match_fields=[
+                                in_port(ingress_hop.ingress_port.port_no),
+                                metadata(inner_tag)
+                            ] + [
+                                field for field in get_ofb_fields(flow)
+                                if field.type not in (IN_PORT, METADATA)
+                            ],
+                            actions=[
+                                action for action in get_actions(flow)
+                            ] + [
+                                output(ingress_hop.egress_port.port_no)
+                            ]
+                        ))
+                    else:
+                        log.debug('creating-standard-flow', flow=flow)
+                        fl_lst, _ = device_rules.setdefault(
+                            ingress_hop.device.id, ([], []))
+                        fl_lst.append(mk_flow_stat(
+                            priority=flow.priority,
+                            cookie=flow.cookie,
+                            match_fields=[
+                                in_port(ingress_hop.ingress_port.port_no)
+                            ] + [
+                                field for field in get_ofb_fields(flow)
+                                if field.type not in (IN_PORT,)
+                            ],
+                            actions=[
+                                action for action in get_actions(flow)
+                            ] + [
+                                output(ingress_hop.egress_port.port_no)
+                            ]
+                        ))
+
+                elif out_port_no is not None:  # unicast case
+
+                    actions = [action.type for action in get_actions(flow)]
+                    # Transparent ONU and OLT case (No-L2-Modification flow)
+                    if len(actions) == 1 and OUTPUT in actions:
+                        parent_device_flow_lst, _ = device_rules.setdefault(
+                                                ingress_hop.device.id, ([], []))
+                        child_device_flow_lst, _ = device_rules.setdefault(
+                                                egress_hop.device.id, ([], []))
+
+                        parent_device_flow_lst.append(mk_flow_stat(
+                                            priority=flow.priority,
+                                            cookie=flow.cookie,
+                                            match_fields=[
+                                                in_port(ingress_hop.ingress_port.port_no)
+                                            ] + [
+                                                field for field in get_ofb_fields(flow)
+                                                if field.type not in (IN_PORT,)
+                                            ],
+                                            actions=[
+                                                 output(ingress_hop.egress_port.port_no)
+                                            ]
+                                            ))
+
+                        child_device_flow_lst.append(mk_flow_stat(
+                                            priority = flow.priority,
+                                            cookie=flow.cookie,
+                                            match_fields = [
+                                                 in_port(egress_hop.ingress_port.port_no),
+                                            ] + [
+                                                 field for field in get_ofb_fields(flow)
+                                                 if field.type not in (IN_PORT, )
+                                            ],
+                                            actions=[
+                                                 output(egress_hop.egress_port.port_no)
+                                            ]
+                                            ))
+                    else:
+                        fl_lst, _ = device_rules.setdefault(
+                            egress_hop.device.id, ([], []))
+                        fl_lst.append(mk_flow_stat(
+                            priority=flow.priority,
+                            cookie=flow.cookie,
+                            match_fields=[
+                                in_port(egress_hop.ingress_port.port_no)
+                            ] + [
+                                field for field in get_ofb_fields(flow)
+                                if field.type not in (IN_PORT,)
+                            ],
+                            actions=[
+                                action for action in get_actions(flow)
+                                if action.type not in (OUTPUT,)
+                            ] + [
+                                output(egress_hop.egress_port.port_no)
+                            ],
+                            #table_id=flow.table_id,
+                            #meters=None if len(get_meter_ids_from_flow(flow)) == 0 else get_meter_ids_from_flow(flow)
+                        ))
+                else:
+                    grp_id = get_group(flow)
+
+                    if grp_id is not None: # multicast case
+                        fl_lst_olt, _ = device_rules.setdefault(
+                            ingress_hop.device.id, ([], []))
+                        # having no group yet is the same as having a group with
+                        # no buckets
+                        group = group_map.get(grp_id, ofp.ofp_group_entry())
+
+                        for bucket in group.desc.buckets:
+                            found_pop_vlan = False
+                            other_actions = []
+                            for action in bucket.actions:
+                                if action.type == POP_VLAN:
+                                    found_pop_vlan = True
+                                elif action.type == OUTPUT:
+                                    out_port_no = action.output.port
+                                else:
+                                    other_actions.append(action)
+                            # re-run route request to determine egress device and
+                            # ports
+                            route2 = self.get_route(in_port_no, out_port_no)
+                            if not route2 or len(route2) != 2:
+                                log.error('mc-no-route', in_port_no=in_port_no,
+                                    out_port_no=out_port_no, route2=route2,
+                                    comment='deleting flow')
+                                self.flow_delete(flow)
+                                continue
+
+                            ingress_hop2, egress_hop = route2
+
+                            if ingress_hop.ingress_port != ingress_hop2.ingress_port:
+                                log.error('mc-ingress-hop-hop2-mismatch',
+                                    ingress_hop=ingress_hop,
+                                    ingress_hop2=ingress_hop2,
+                                    in_port_no=in_port_no,
+                                    out_port_no=out_port_no,
+                                    comment='ignoring flow')
+                                continue
+
+                            fl_lst_olt.append(mk_flow_stat(
+                                priority=flow.priority,
+                                cookie=flow.cookie,
+                                match_fields=[
+                                    in_port(ingress_hop.ingress_port.port_no)
+                                ] + [
+                                    field for field in get_ofb_fields(flow)
+                                    if field.type not in (IN_PORT,)
+                                ],
+                                actions=[
+                                    action for action in get_actions(flow)
+                                    if action.type not in (GROUP,)
+                                ] + [
+                                    pop_vlan(),
+                                    output(egress_hop.ingress_port.port_no)
+                                ]
+                            ))
+
+                            fl_lst_onu, _ = device_rules.setdefault(
+                                egress_hop.device.id, ([], []))
+                            fl_lst_onu.append(mk_flow_stat(
+                                priority=flow.priority,
+                                cookie=flow.cookie,
+                                match_fields=[
+                                    in_port(egress_hop.ingress_port.port_no)
+                                ] + [
+                                    field for field in get_ofb_fields(flow)
+                                    if field.type not in (IN_PORT, VLAN_VID, VLAN_PCP)
+                                ],
+                                actions=other_actions + [
+                                    output(egress_hop.egress_port.port_no)
+                                ]
+                            ))
+                    else:
+                        raise NotImplementedError('undefined downstream case for flows')
+        return device_rules
+
+    # ~~~~~~~~~~~~ methods expected to be provided by derived class ~~~~~~~~~~~
+
+    def get_all_default_rules(self):
+        raise NotImplementedError('derived class must provide')
+
+    def get_default_rules(self, device_id):
+        raise NotImplementedError('derived class must provide')
+
+    def get_route(self, ingress_port_no, egress_port_no):
+        raise NotImplementedError('derived class must provide')
+
+    def get_wildcard_input_ports(self, exclude_port=None):
+        raise NotImplementedError('derived class must provide')
+
+    def flow_delete(self, mod):
+        raise NotImplementedError('derived class must provide')
diff --git a/python/core/logical_device_agent.py b/python/core/logical_device_agent.py
new file mode 100644
index 0000000..10ec66c
--- /dev/null
+++ b/python/core/logical_device_agent.py
@@ -0,0 +1,973 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Model that captures the current state of a logical device
+"""
+from collections import OrderedDict
+
+import structlog
+
+from common.event_bus import EventBusClient
+from common.frameio.frameio import hexify
+from voltha.registry import registry
+from voltha.core.config.config_proxy import CallbackType
+from voltha.core.device_graph import DeviceGraph
+from voltha.core.flow_decomposer import FlowDecomposer, \
+    flow_stats_entry_from_flow_mod_message, group_entry_from_group_mod, \
+    mk_flow_stat, in_port, vlan_vid, vlan_pcp, pop_vlan, output, set_field, \
+    push_vlan, mk_simple_flow_mod
+from voltha.protos import third_party
+from voltha.protos import openflow_13_pb2 as ofp
+from voltha.protos.device_pb2 import Port
+from voltha.protos.logical_device_pb2 import LogicalPort
+from voltha.protos.openflow_13_pb2 import Flows, Meters, FlowGroups, ofp_meter_config
+
+_ = third_party
+
+def mac_str_to_tuple(mac):
+    return tuple(int(d, 16) for d in mac.split(':'))
+
+
+class LogicalDeviceAgent(FlowDecomposer, DeviceGraph):
+
+    def __init__(self, core, logical_device):
+        try:
+            self.core = core
+            self.local_handler = core.get_local_handler()
+            self.logical_device_id = logical_device.id
+
+            self.root_proxy = core.get_proxy('/')
+            self.flows_proxy = core.get_proxy(
+                '/logical_devices/{}/flows'.format(logical_device.id))
+            self.meters_proxy = core.get_proxy(
+                '/logical_devices/{}/meters'.format(logical_device.id))
+            self.groups_proxy = core.get_proxy(
+                '/logical_devices/{}/flow_groups'.format(logical_device.id))
+            self.self_proxy = core.get_proxy(
+                '/logical_devices/{}'.format(logical_device.id))
+
+            self.flows_proxy.register_callback(
+                CallbackType.PRE_UPDATE, self._pre_process_flows)
+            self.flows_proxy.register_callback(
+                CallbackType.POST_UPDATE, self._flow_table_updated)
+            self.groups_proxy.register_callback(
+                CallbackType.POST_UPDATE, self._group_table_updated)
+            self.self_proxy.register_callback(
+                CallbackType.POST_ADD, self._port_added)
+            self.self_proxy.register_callback(
+                CallbackType.POST_REMOVE, self._port_removed)
+
+            self.port_proxy = {}
+            self.port_status_has_changed = {}
+
+            self.event_bus = EventBusClient()
+            self.packet_in_subscription = self.event_bus.subscribe(
+                topic='packet-in:{}'.format(logical_device.id),
+                callback=self.handle_packet_in_event)
+
+            self.log = structlog.get_logger(logical_device_id=logical_device.id)
+
+            self._routes = None
+            self._no_flow_changes_required = False
+            self._flows_ids_to_add = []
+            self._flows_ids_to_remove = []
+            self._flows_to_remove = []
+
+            self.accepts_direct_logical_flows = False
+            self.device_id = self.self_proxy.get('/').root_device_id
+            device_adapter_type = self.root_proxy.get('/devices/{}'.format(
+                self.device_id)).adapter
+            device_type = self.root_proxy.get('/device_types/{}'.format(
+                device_adapter_type))
+
+            if device_type is not None:
+                self.accepts_direct_logical_flows = \
+                    device_type.accepts_direct_logical_flows_update
+
+            if self.accepts_direct_logical_flows:
+
+                self.device_adapter_agent = registry(
+                    'adapter_loader').get_agent(device_adapter_type).adapter
+
+                self.log.debug('this device accepts direct logical flows',
+                               device_adapter_type=device_adapter_type)
+
+
+
+        except Exception, e:
+            self.log.exception('init-error', e=e)
+
+    def start(self, reconcile=False):
+        self.log.debug('starting')
+        if reconcile:
+            # Register the callbacks for the ports
+            ports = self.self_proxy.get('/ports')
+            for port in ports:
+                self._reconcile_port(port)
+            self.log.debug('ports-reconciled', ports=ports)
+        self.log.debug('started')
+        return self
+
+    def stop(self):
+        self.log.debug('stopping')
+        try:
+            self.flows_proxy.unregister_callback(
+                CallbackType.POST_UPDATE, self._flow_table_updated)
+            self.groups_proxy.unregister_callback(
+                CallbackType.POST_UPDATE, self._group_table_updated)
+            self.self_proxy.unregister_callback(
+                CallbackType.POST_ADD, self._port_added)
+            self.self_proxy.unregister_callback(
+                CallbackType.POST_REMOVE, self._port_removed)
+
+            # Remove subscription to the event bus
+            self.event_bus.unsubscribe(self.packet_in_subscription)
+        except Exception, e:
+            self.log.info('stop-exception', e=e)
+
+        self.log.debug('stopped')
+
+    def announce_flows_deleted(self, flows):
+        for f in flows:
+            self.announce_flow_deleted(f)
+
+    def announce_flow_deleted(self, flow):
+        if flow.flags & ofp.OFPFF_SEND_FLOW_REM:
+            raise NotImplementedError("announce_flow_deleted")
+
+    def signal_flow_mod_error(self, code, flow_mod):
+        pass  # TODO
+
+    def signal_flow_removal(self, code, flow):
+        pass  # TODO
+
+    def signal_group_mod_error(self, code, group_mod):
+        pass  # TODO
+
+    def update_flow_table(self, flow_mod):
+
+        command = flow_mod.command
+
+        if command == ofp.OFPFC_ADD:
+            self.flow_add(flow_mod)
+
+        elif command == ofp.OFPFC_DELETE:
+            self.flow_delete(flow_mod)
+
+        elif command == ofp.OFPFC_DELETE_STRICT:
+            self.flow_delete_strict(flow_mod)
+
+        elif command == ofp.OFPFC_MODIFY:
+            self.flow_modify(flow_mod)
+
+        elif command == ofp.OFPFC_MODIFY_STRICT:
+            self.flow_modify_strict(flow_mod)
+
+        else:
+            self.log.warn('unhandled-flow-mod', command=command, flow_mod=flow_mod)
+
+    def update_meter_table(self, meter_mod):
+        command = meter_mod.command
+
+        if command == ofp.OFPMC_ADD:
+            self.meter_add(meter_mod)
+
+        elif command == ofp.OFPMC_MODIFY:
+            self.meter_modify(meter_mod)
+
+        elif command == ofp.OFPMC_DELETE:
+            self.meter_delete(meter_mod)
+        else:
+            self.log.warn('unhandled-meter-mod', command=command, flow_mod=meter_mod)
+
+    def update_group_table(self, group_mod):
+
+        command = group_mod.command
+
+        if command == ofp.OFPGC_DELETE:
+            self.group_delete(group_mod)
+
+        elif command == ofp.OFPGC_ADD:
+            self.group_add(group_mod)
+
+        elif command == ofp.OFPGC_MODIFY:
+            self.group_modify(group_mod)
+
+        else:
+            self.log.warn('unhandled-group-mod',
+                          command=command, group_mod=group_mod)
+
+        # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL METER HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
+
+    def meter_add(self, meter_mod):
+        assert isinstance(meter_mod, ofp.ofp_meter_mod)
+        # read from model
+        meters = list(self.meters_proxy.get('/').items)
+        if not self.check_meter_id_overlapping(meters, meter_mod):
+            meters.append(ofp_meter_config(flags=meter_mod.flags, \
+                                           meter_id=meter_mod.meter_id, \
+                                           bands=meter_mod.bands))
+
+            self.meters_proxy.update('/', Meters(items=meters))
+        else:
+            self.signal_meter_mod_error(ofp.OFPMMFC_METER_EXISTS, meter_mod)
+
+    def meter_modify(self, meter_mod):
+        assert isinstance(meter_mod, ofp.ofp_meter_mod)
+        meters = list(self.meters_proxy.get('/').items)
+        existing_meter = self.check_meter_id_overlapping(meters, meter_mod)
+        if existing_meter:
+            existing_meter.flags = meter_mod.flags
+            existing_meter.bands = meter_mod.bands
+            self.meters_proxy.update('/', Meters(items=meters))
+        else:
+            self.signal_meter_mod_error(ofp.OFPMMFC_UNKNOWN_METER, meter_mod)
+
+    def meter_delete(self, meter_mod):
+        assert isinstance(meter_mod, ofp.ofp_meter_mod)
+        meters = list(self.meters_proxy.get('/').items)
+        to_keep = list()
+        to_delete = 0
+
+        for meter in meters:
+            if meter.meter_id != meter_mod.meter_id:
+                to_keep.append(meter)
+            else:
+                to_delete += 1
+
+        if to_delete == 1:
+            self.meters_proxy.update('/', Meters(items=to_keep))
+        if to_delete == 0:
+            self.signal_meter_mod_error(ofp.OFPMMFC_UNKNOWN_METER, meter_mod)
+        elif to_delete > 1:
+            raise Exception('More than one meter_config sharing the same meter_id cannot exist')
+
+    @staticmethod
+    def check_meter_id_overlapping(meters, meter_mod):
+        for meter in meters:
+            if meter.meter_id == meter_mod.meter_id:
+                return meter
+        return False
+
+    def signal_meter_mod_error(self, error_code, meter_mod):
+        pass  # TODO
+
+
+
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL FLOW HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
+
+    def flow_add(self, mod):
+        assert isinstance(mod, ofp.ofp_flow_mod)
+        assert mod.cookie_mask == 0
+
+        # read from model
+        flows = list(self.flows_proxy.get('/').items)
+
+        changed = False
+        check_overlap = mod.flags & ofp.OFPFF_CHECK_OVERLAP
+        if check_overlap:
+            if self.find_overlapping_flows(flows, mod, True):
+                self.signal_flow_mod_error(
+                    ofp.OFPFMFC_OVERLAP, mod)
+            else:
+                # free to add as new flow
+                flow = flow_stats_entry_from_flow_mod_message(mod)
+                flows.append(flow)
+                changed = True
+                self.log.debug('flow-added', flow=mod)
+
+        else:
+            flow = flow_stats_entry_from_flow_mod_message(mod)
+            idx = self.find_flow(flows, flow)
+            if idx >= 0:
+                old_flow = flows[idx]
+                if not (mod.flags & ofp.OFPFF_RESET_COUNTS):
+                    flow.byte_count = old_flow.byte_count
+                    flow.packet_count = old_flow.packet_count
+                flows[idx] = flow
+                changed = True
+                self.log.debug('flow-updated', flow=flow)
+
+            else:
+                flows.append(flow)
+                changed = True
+                self.log.debug('flow-added', flow=mod)
+
+        # write back to model
+        if changed:
+            self.flows_proxy.update('/', Flows(items=flows))
+
+    def flow_delete(self, mod):
+        assert isinstance(mod, (ofp.ofp_flow_mod, ofp.ofp_flow_stats))
+
+        # read from model
+        flows = list(self.flows_proxy.get('/').items)
+
+        # build a list of what to keep vs what to delete
+        to_keep = []
+        to_delete = []
+        for f in flows:
+            if self.flow_matches_spec(f, mod):
+                to_delete.append(f)
+            else:
+                to_keep.append(f)
+
+        # replace flow table with keepers
+        flows = to_keep
+
+        # write back
+        if to_delete:
+            self.flows_proxy.update('/', Flows(items=flows))
+
+        # from mod send announcement
+        if isinstance(mod, ofp.ofp_flow_mod):
+            # send notifications for discarded flow as required by OpenFlow
+            self.announce_flows_deleted(to_delete)
+
+    def flow_delete_strict(self, mod):
+        assert isinstance(mod, ofp.ofp_flow_mod)
+
+        # read from model
+        flows = list(self.flows_proxy.get('/').items)
+        changed = False
+
+        flow = flow_stats_entry_from_flow_mod_message(mod)
+        idx = self.find_flow(flows, flow)
+        if (idx >= 0):
+            del flows[idx]
+            changed = True
+        else:
+            # TODO need to check what to do with this case
+            self.log.warn('flow-cannot-delete', flow=flow)
+
+        if changed:
+            self.flows_proxy.update('/', Flows(items=flows))
+
+    def flow_modify(self, mod):
+        raise NotImplementedError()
+
+    def flow_modify_strict(self, mod):
+        raise NotImplementedError()
+
+    def find_overlapping_flows(self, flows, mod, return_on_first=False):
+        """
+        Return list of overlapping flow(s)
+        Two flows overlap if a packet may match both and if they have the
+        same priority.
+        :param mod: Flow request
+        :param return_on_first: if True, return with the first entry
+        :return:
+        """
+        return []  # TODO finish implementation
+
+    @classmethod
+    def find_flow(cls, flows, flow):
+        for i, f in enumerate(flows):
+            if cls.flow_match(f, flow):
+                return i
+        return -1
+
+    @staticmethod
+    def flow_match(f1, f2):
+        keys_matter = ('table_id', 'priority', 'flags', 'cookie', 'match')
+        for key in keys_matter:
+            if getattr(f1, key) != getattr(f2, key):
+                return False
+        return True
+
+    @classmethod
+    def flow_matches_spec(cls, flow, flow_mod):
+        """
+        Return True if given flow (ofp_flow_stats) is "covered" by the
+        wildcard flow_mod (ofp_flow_mod), taking into consideration of
+        both exact mactches as well as masks-based match fields if any.
+        Otherwise return False
+        :param flow: ofp_flow_stats
+        :param mod: ofp_flow_mod
+        :return: Bool
+        """
+
+        assert isinstance(flow, ofp.ofp_flow_stats)
+        assert isinstance(flow_mod, (ofp.ofp_flow_mod, ofp.ofp_flow_stats))
+
+        if isinstance(flow_mod, ofp.ofp_flow_stats):
+            return cls.flow_match(flow, flow_mod)
+
+        # Check if flow.cookie is covered by mod.cookie and mod.cookie_mask
+        if (flow.cookie & flow_mod.cookie_mask) != \
+                (flow_mod.cookie & flow_mod.cookie_mask):
+            return False
+
+        # Check if flow.table_id is covered by flow_mod.table_id
+        if flow_mod.table_id != ofp.OFPTT_ALL and \
+                        flow.table_id != flow_mod.table_id:
+            return False
+
+        # Check out_port
+        if (flow_mod.out_port & 0x7fffffff) != ofp.OFPP_ANY and \
+                not cls.flow_has_out_port(flow, flow_mod.out_port):
+            return False
+
+        # Check out_group
+        if (flow_mod.out_group & 0x7fffffff) != ofp.OFPG_ANY and \
+                not cls.flow_has_out_group(flow, flow_mod.out_group):
+            return False
+        # Priority is ignored
+
+        # Check match condition
+        # If the flow_mod match field is empty, that is a special case and
+        # indicates the flow entry matches
+        match = flow_mod.match
+        assert isinstance(match, ofp.ofp_match)
+        if not match.oxm_fields:
+            # If we got this far and the match is empty in the flow spec,
+            # than the flow matches
+            return True
+        else:
+            raise NotImplementedError(
+                "flow_matches_spec(): No flow match analysis yet")
+
+    @staticmethod
+    def flow_has_out_port(flow, out_port):
+        """
+        Return True if flow has a output command with the given out_port
+        """
+        assert isinstance(flow, ofp.ofp_flow_stats)
+        for instruction in flow.instructions:
+            assert isinstance(instruction, ofp.ofp_instruction)
+            if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
+                for action in instruction.actions.actions:
+                    assert isinstance(action, ofp.ofp_action)
+                    if action.type == ofp.OFPAT_OUTPUT and \
+                        action.output.port == out_port:
+                        return True
+
+        # otherwise...
+        return False
+
+    @staticmethod
+    def flow_has_out_group(flow, group_id):
+        """
+        Return True if flow has a output command with the given out_group
+        """
+        assert isinstance(flow, ofp.ofp_flow_stats)
+        for instruction in flow.instructions:
+            assert isinstance(instruction, ofp.ofp_instruction)
+            if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
+                for action in instruction.actions.actions:
+                    assert isinstance(action, ofp.ofp_action)
+                    if action.type == ofp.OFPAT_GROUP and \
+                        action.group.group_id == group_id:
+                            return True
+
+        # otherwise...
+        return False
+
+    def flows_delete_by_group_id(self, flows, group_id):
+        """
+        Delete any flow(s) referring to given group_id
+        :param group_id:
+        :return: None
+        """
+        to_keep = []
+        to_delete = []
+        for f in flows:
+            if self.flow_has_out_group(f, group_id):
+                to_delete.append(f)
+            else:
+                to_keep.append(f)
+
+        # replace flow table with keepers
+        flows = to_keep
+
+        # send notification to deleted ones
+        self.announce_flows_deleted(to_delete)
+
+        return bool(to_delete), flows
+
+    # ~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL GROUP HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def group_add(self, group_mod):
+        assert isinstance(group_mod, ofp.ofp_group_mod)
+
+        groups = OrderedDict((g.desc.group_id, g)
+                             for g in self.groups_proxy.get('/').items)
+        changed = False
+
+        if group_mod.group_id in groups:
+            self.signal_group_mod_error(ofp.OFPGMFC_GROUP_EXISTS, group_mod)
+        else:
+            group_entry = group_entry_from_group_mod(group_mod)
+            groups[group_mod.group_id] = group_entry
+            changed = True
+
+        if changed:
+            self.groups_proxy.update('/', FlowGroups(items=groups.values()))
+
+    def group_delete(self, group_mod):
+        assert isinstance(group_mod, ofp.ofp_group_mod)
+
+        groups = OrderedDict((g.desc.group_id, g)
+                             for g in self.groups_proxy.get('/').items)
+        groups_changed = False
+        flows_changed = False
+
+        group_id = group_mod.group_id
+        if group_id == ofp.OFPG_ALL:
+            # TODO we must delete all flows that point to this group and
+            # signal controller as requested by flow's flag
+            groups = OrderedDict()
+            groups_changed = True
+            self.log.debug('all-groups-deleted')
+
+        else:
+            if group_id not in groups:
+                # per openflow spec, this is not an error
+                pass
+
+            else:
+                flows = list(self.flows_proxy.get('/').items)
+                flows_changed, flows = self.flows_delete_by_group_id(
+                    flows, group_id)
+                del groups[group_id]
+                groups_changed = True
+                self.log.debug('group-deleted', group_id=group_id)
+
+        if groups_changed:
+            self.groups_proxy.update('/', FlowGroups(items=groups.values()))
+        if flows_changed:
+            self.flows_proxy.update('/', Flows(items=flows))
+
+    def group_modify(self, group_mod):
+        assert isinstance(group_mod, ofp.ofp_group_mod)
+
+        groups = OrderedDict((g.desc.group_id, g)
+                             for g in self.groups_proxy.get('/').items)
+        changed = False
+
+        if group_mod.group_id not in groups:
+            self.signal_group_mod_error(
+                ofp.OFPGMFC_INVALID_GROUP, group_mod)
+        else:
+            # replace existing group entry with new group definition
+            group_entry = group_entry_from_group_mod(group_mod)
+            groups[group_mod.group_id] = group_entry
+            changed = True
+
+        if changed:
+            self.groups_proxy.update('/', FlowGroups(items=groups.values()))
+
+    def port_enable(self, port_id):
+        self.log.info("port-enable", port_id=port_id)
+
+        proxy = self.port_proxy[port_id]
+        port = proxy.get('/')
+        port.ofp_port.config = port.ofp_port.config & ~ofp.OFPPC_PORT_DOWN
+        proxy.update('/', port)
+
+    def port_disable(self, port_id):
+        self.log.info("port-disable", port_id=port_id)
+
+        proxy = self.port_proxy[port_id]
+        port = proxy.get('/')
+        port.ofp_port.config = port.ofp_port.config & ~ofp.OFPPC_PORT_DOWN | ofp.OFPPC_PORT_DOWN
+        proxy.update('/', port)
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_OUT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def packet_out(self, ofp_packet_out):
+        self.log.debug('packet-out', packet=ofp_packet_out)
+        topic = 'packet-out:{}'.format(self.logical_device_id)
+        self.event_bus.publish(topic, ofp_packet_out)
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_IN ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def handle_packet_in_event(self, _, msg):
+        self.log.debug('handle-packet-in', msg=msg)
+        logical_port_no, packet = msg
+        packet_in = ofp.ofp_packet_in(
+            # buffer_id=0,
+            reason=ofp.OFPR_ACTION,
+            # table_id=0,
+            # cookie=0,
+            match=ofp.ofp_match(
+                type=ofp.OFPMT_OXM,
+                oxm_fields=[
+                    ofp.ofp_oxm_field(
+                        oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+                        ofb_field=in_port(logical_port_no)
+                    )
+                ]
+            ),
+            data=packet
+        )
+        self.packet_in(packet_in)
+
+    def packet_in(self, ofp_packet_in):
+        self.log.info('packet-in', logical_device_id=self.logical_device_id,
+                      pkt=ofp_packet_in, data=hexify(ofp_packet_in.data))
+        self.local_handler.send_packet_in(
+            self.logical_device_id, ofp_packet_in)
+
+    # ~~~~~~~~~~~~~~~~~~~~~ FLOW TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def _pre_process_flows(self, flows):
+        """
+        This method is invoked before a device flow table data model is
+        updated. The resulting data is stored locally and the flow table is
+        updated during the post-processing phase, i.e. via the POST_UPDATE
+        callback
+        :param flows: Desired flows
+        :return: None
+        """
+        current_flows = self.flows_proxy.get('/')
+        self.log.debug('pre-processing-flows',
+                       logical_device_id=self.logical_device_id,
+                       desired_flows=flows,
+                       existing_flows=current_flows)
+
+        current_flow_ids = set(f.id for f in current_flows.items)
+        desired_flow_ids = set(f.id for f in flows.items)
+
+        self._flows_ids_to_add = desired_flow_ids.difference(current_flow_ids)
+        self._flows_ids_to_remove = current_flow_ids.difference(desired_flow_ids)
+        self._flows_to_remove = []
+        for f in current_flows.items:
+            if f.id in self._flows_ids_to_remove:
+                self._flows_to_remove.append(f)
+
+        if len(self._flows_ids_to_add) + len(self._flows_ids_to_remove) == 0:
+            # No changes of flows, just stats are changing
+            self._no_flow_changes_required = True
+        else:
+            self._no_flow_changes_required = False
+
+        self.log.debug('flows-preprocess-output', current_flows=len(
+            current_flow_ids), new_flows=len(desired_flow_ids),
+                      adding_flows=len(self._flows_ids_to_add),
+                      removing_flows=len(self._flows_ids_to_remove))
+
+
+    def _flow_table_updated(self, flows):
+        self.log.debug('flow-table-updated',
+                  logical_device_id=self.logical_device_id, flows=flows)
+
+        if self._no_flow_changes_required:
+            # Stats changes, no need to process further
+            self.log.debug('flow-stats-update')
+        else:
+
+            groups = self.groups_proxy.get('/').items
+            device_rules_map = self.decompose_rules(flows.items, groups)
+
+            # TODO we have to evolve this into a policy-based, event based pattern
+            # This is a raw implementation of the specific use-case with certain
+            # built-in assumptions, and not yet device vendor specific. The policy-
+            # based refinement will be introduced that later.
+
+
+            # Temporary bypass for openolt
+
+            if self.accepts_direct_logical_flows:
+                #give the logical flows directly to the adapter
+                self.log.debug('it is an direct logical flow bypass')
+                if self.device_adapter_agent is None:
+                    self.log.error('No device adapter agent',
+                                   device_id=self.device_id,
+                                   logical_device_id = self.logical_device_id)
+                    return
+
+                flows_to_add = []
+                for f in flows.items:
+                    if f.id in self._flows_ids_to_add:
+                        flows_to_add.append(f)
+
+
+                self.log.debug('flows to remove',
+                               flows_to_remove=self._flows_to_remove,
+                               flows_ids=self._flows_ids_to_remove)
+
+                try:
+                    self.device_adapter_agent.update_logical_flows(
+                        self.device_id, flows_to_add, self._flows_to_remove,
+                        groups, device_rules_map)
+                except Exception as e:
+                    self.log.error('logical flows bypass error', error=e,
+                                   flows=flows)
+            else:
+
+                for device_id, (flows, groups) in device_rules_map.iteritems():
+
+                    self.root_proxy.update('/devices/{}/flows'.format(device_id),
+                                           Flows(items=flows.values()))
+                    self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
+                                           FlowGroups(items=groups.values()))
+
+    # ~~~~~~~~~~~~~~~~~~~~ GROUP TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def _group_table_updated(self, flow_groups):
+        self.log.debug('group-table-updated',
+                  logical_device_id=self.logical_device_id,
+                  flow_groups=flow_groups)
+
+        flows = self.flows_proxy.get('/').items
+        device_flows_map = self.decompose_rules(flows, flow_groups.items)
+        for device_id, (flows, groups) in device_flows_map.iteritems():
+            self.root_proxy.update('/devices/{}/flows'.format(device_id),
+                                   Flows(items=flows.values()))
+            self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
+                                   FlowGroups(items=groups.values()))
+
+    # ~~~~~~~~~~~~~~~~~~~ APIs NEEDED BY FLOW DECOMPOSER ~~~~~~~~~~~~~~~~~~~~~~
+
+    def _port_added(self, port):
+        self.log.debug('port-added', port=port)
+        assert isinstance(port, LogicalPort)
+        self._port_list_updated(port)
+
+        # Set a proxy and callback for that specific port
+        self.port_proxy[port.id] = self.core.get_proxy(
+            '/logical_devices/{}/ports/{}'.format(self.logical_device_id,
+                                                  port.id))
+        self.port_status_has_changed[port.id] = True
+        self.port_proxy[port.id].register_callback(
+            CallbackType.PRE_UPDATE, self._pre_port_changed)
+        self.port_proxy[port.id].register_callback(
+            CallbackType.POST_UPDATE, self._port_changed)
+
+        self.local_handler.send_port_change_event(
+            device_id=self.logical_device_id,
+            port_status=ofp.ofp_port_status(
+                reason=ofp.OFPPR_ADD,
+                desc=port.ofp_port
+            )
+        )
+
+    def _reconcile_port(self, port):
+        self.log.debug('reconcile-port', port=port)
+        assert isinstance(port, LogicalPort)
+        self._port_list_updated(port)
+
+        # Set a proxy and callback for that specific port
+        self.port_proxy[port.id] = self.core.get_proxy(
+            '/logical_devices/{}/ports/{}'.format(self.logical_device_id,
+                                                  port.id))
+        self.port_status_has_changed[port.id] = True
+        self.port_proxy[port.id].register_callback(
+            CallbackType.PRE_UPDATE, self._pre_port_changed)
+        self.port_proxy[port.id].register_callback(
+            CallbackType.POST_UPDATE, self._port_changed)
+
+    def _port_removed(self, port):
+        self.log.debug('port-removed', port=port)
+        assert isinstance(port, LogicalPort)
+        self._port_list_updated(port)
+
+        # Remove the proxy references
+        self.port_proxy[port.id].unregister_callback(
+            CallbackType.PRE_UPDATE, self._pre_port_changed)
+        self.port_proxy[port.id].unregister_callback(
+            CallbackType.POST_UPDATE, self._port_changed)
+        del self.port_proxy[port.id]
+        del self.port_status_has_changed[port.id]
+
+
+        self.local_handler.send_port_change_event(
+            device_id=self.logical_device_id,
+            port_status=ofp.ofp_port_status(
+                reason=ofp.OFPPR_DELETE,
+                desc=port.ofp_port
+            )
+        )
+
+    def _pre_port_changed(self, port):
+        old_port = self.port_proxy[port.id].get('/')
+        if old_port.ofp_port != port.ofp_port:
+            self.port_status_has_changed[port.id] = True
+        else :
+            self.port_status_has_changed[port.id] = False
+
+    def _port_changed(self, port):
+        self.log.debug('port-changed', port=port)
+        if self.port_status_has_changed[port.id]:
+            assert isinstance(port, LogicalPort)
+            self.local_handler.send_port_change_event(
+                device_id=self.logical_device_id,
+                port_status=ofp.ofp_port_status(
+                    reason=ofp.OFPPR_MODIFY,
+                    desc=port.ofp_port
+                )
+            )
+
+    def _port_list_updated(self, _):
+        # invalidate the graph and the route table
+        self._invalidate_cached_tables()
+
+    def _invalidate_cached_tables(self):
+        self._routes = None
+        self._default_rules = None
+        self._nni_logical_port_no = None
+
+    def _assure_cached_tables_up_to_date(self):
+        if self._routes is None:
+            logical_ports = self.self_proxy.get('/ports')
+            graph, self._routes = self.compute_routes(
+                self.root_proxy, logical_ports)
+            self._default_rules = self._generate_default_rules(graph)
+            root_ports = [p for p in logical_ports if p.root_port]
+            assert len(root_ports) == 1, 'Only one root port supported at this time'
+            self._nni_logical_port_no = root_ports[0].ofp_port.port_no
+
+
+    def _generate_default_rules(self, graph):
+
+        def root_device_default_rules(device):
+            flows = OrderedDict()
+            groups = OrderedDict()
+            return flows, groups
+
+        def leaf_device_default_rules(device):
+            ports = self.root_proxy.get('/devices/{}/ports'.format(device.id))
+            upstream_ports = [
+                port for port in ports if port.type == Port.PON_ONU \
+                                            or port.type == Port.VENET_ONU
+            ]
+            assert len(upstream_ports) == 1
+            downstream_ports = [
+                port for port in ports if port.type == Port.ETHERNET_UNI
+            ]
+
+            # it is possible that the downstream ports are not
+            # created, but the flow_decomposition has already
+            # kicked in. In such scenarios, cut short the processing
+            # and return.
+            if len(downstream_ports) == 0:
+                return None, None
+            # assert len(downstream_ports) == 1
+            upstream_port  = upstream_ports[0]
+            flows = OrderedDict()
+            for downstream_port in downstream_ports:
+                flows.update(OrderedDict((f.id, f) for f in [
+                    mk_flow_stat(
+                        priority=500,
+                        match_fields=[
+                            in_port(downstream_port.port_no),
+                            vlan_vid(ofp.OFPVID_PRESENT | 0)
+                        ],
+                        actions=[
+                            set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
+                            output(upstream_port.port_no)
+                        ]
+                    ),
+                    mk_flow_stat(
+                        priority=500,
+                        match_fields=[
+                            in_port(downstream_port.port_no),
+                            vlan_vid(0)
+                        ],
+                        actions=[
+                            push_vlan(0x8100),
+                            set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
+                            output(upstream_port.port_no)
+                        ]
+                    ),
+                    mk_flow_stat(
+                        priority=500,
+                        match_fields=[
+                            in_port(upstream_port.port_no),
+                            vlan_vid(ofp.OFPVID_PRESENT | device.vlan)
+                        ],
+                        actions=[
+                            set_field(vlan_vid(ofp.OFPVID_PRESENT | 0)),
+                            output(downstream_port.port_no)
+                        ]
+                    ),
+                ]))
+            groups = OrderedDict()
+            return flows, groups
+
+        root_device_id = self.self_proxy.get('/').root_device_id
+        rules = {}
+        for node_key in graph.nodes():
+            node = graph.node[node_key]
+            device = node.get('device', None)
+            if device is None:
+                continue
+            if device.id == root_device_id:
+                rules[device.id] = root_device_default_rules(device)
+            else:
+                rules[device.id] = leaf_device_default_rules(device)
+        return rules
+
+    def get_route(self, ingress_port_no, egress_port_no):
+        self._assure_cached_tables_up_to_date()
+        self.log.info('getting-route', eg_port=egress_port_no, in_port=ingress_port_no,
+                nni_port=self._nni_logical_port_no)
+        if egress_port_no is not None and \
+                        (egress_port_no & 0x7fffffff) == ofp.OFPP_CONTROLLER:
+            self.log.info('controller-flow', eg_port=egress_port_no, in_port=ingress_port_no,
+                    nni_port=self._nni_logical_port_no)
+            if ingress_port_no == self._nni_logical_port_no:
+                self.log.info('returning half route')
+                # This is a trap on the NNI Port
+                # Return a 'half' route to make the flow decomp logic happy
+                for (ingress, egress), route in self._routes.iteritems():
+                    if egress == self._nni_logical_port_no:
+                        return [None, route[1]]
+                raise Exception('not a single upstream route')
+            # treat it as if the output port is the NNI of the OLT
+            egress_port_no = self._nni_logical_port_no
+
+        # If ingress_port is not specified (None), it may be a wildcarded
+        # route if egress_port is OFPP_CONTROLLER or _nni_logical_port,
+        # in which case we need to create a half-route where only the egress
+        # hop is filled, the first hope is None
+        if ingress_port_no is None and \
+                        egress_port_no == self._nni_logical_port_no:
+            # We can use the 2nd hop of any upstream route, so just find the
+            # first upstream:
+            for (ingress, egress), route in self._routes.iteritems():
+                if egress == self._nni_logical_port_no:
+                    return [None, route[1]]
+            raise Exception('not a single upstream route')
+
+        # If egress_port is not specified (None), we can also can return a
+        # "half" route
+        if egress_port_no is None:
+            for (ingress, egress), route in self._routes.iteritems():
+                if ingress == ingress_port_no:
+                    return [route[0], None]
+
+            # This can occur is a leaf device is disabled
+            self.log.exception('no-downstream-route',
+                               ingress_port_no=ingress_port_no,
+                               egress_port_no= egress_port_no
+                               )
+            return None
+
+
+        return self._routes.get((ingress_port_no, egress_port_no))
+
+    def get_all_default_rules(self):
+        self._assure_cached_tables_up_to_date()
+        return self._default_rules
+
+    def get_wildcard_input_ports(self, exclude_port=None):
+        logical_ports = self.self_proxy.get('/ports')
+        return [port.ofp_port.port_no for port in logical_ports
+                if port.ofp_port.port_no != exclude_port]
diff --git a/python/core/registry.py b/python/core/registry.py
new file mode 100644
index 0000000..270bd71
--- /dev/null
+++ b/python/core/registry.py
@@ -0,0 +1,69 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Simple component registry to provide centralized access to any registered
+components.
+"""
+from collections import OrderedDict
+from zope.interface import Interface
+
+
+class IComponent(Interface):
+    """
+    A Voltha Component
+    """
+
+    def start():
+        """
+        Called once the componet is instantiated. Can be used for async
+        initialization.
+        :return: (None or Deferred)
+        """
+
+    def stop():
+        """
+        Called once before the component is unloaded. Can be used for async
+        cleanup operations.
+        :return: (None or Deferred)
+        """
+
+
+class Registry(object):
+
+    def __init__(self):
+        self.components = OrderedDict()
+
+    def register(self, name, component):
+        assert IComponent.providedBy(component)
+        assert name not in self.components
+        self.components[name] = component
+        return component
+
+    def unregister(self, name):
+        if name in self.components:
+            del self.components[name]
+
+    def __call__(self, name):
+        return self.components[name]
+
+    def iterate(self):
+        return self.components.values()
+
+
+# public shared registry
+registry = Registry()