VOL-1451 Initial checkin of openonu build
Produced docker container capable of building and running
openonu/brcm_openonci_onu. Copied over current onu code
and resolved all imports by copying into the local source tree.
Change-Id: Ib9785d37afc65b7d32ecf74aee2456352626e2b6
diff --git a/python/core/__init__.py b/python/core/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/core/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/core/config/__init__.py b/python/core/config/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/core/config/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/core/config/config_backend.py b/python/core/config/config_backend.py
new file mode 100644
index 0000000..d906348
--- /dev/null
+++ b/python/core/config/config_backend.py
@@ -0,0 +1,289 @@
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from consul import Consul, ConsulException
+from common.utils.asleep import asleep
+from requests import ConnectionError
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+import etcd3
+import structlog
+
+
+class ConsulStore(object):
+ """ Config kv store for consul with a cache for quicker subsequent reads
+
+ TODO: This will block the reactor. Should either change
+ whole call stack to yield or put the put/delete transactions into a
+ queue to write later with twisted. Will need a transaction
+ log to ensure we don't lose anything.
+ Making the whole callstack yield is troublesome because other tasks can
+ come in on the side and start modifying things which could be bad.
+ """
+
+ CONNECT_RETRY_INTERVAL_SEC = 1
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+ def __init__(self, host, port, path_prefix):
+
+ self.log = structlog.get_logger()
+ self._consul = Consul(host=host, port=port)
+ self.host = host
+ self.port = port
+ self._path_prefix = path_prefix
+ self._cache = {}
+ self.retries = 0
+
+ def make_path(self, key):
+ return '{}/{}'.format(self._path_prefix, key)
+
+ def __getitem__(self, key):
+ if key in self._cache:
+ return self._cache[key]
+ value = self._kv_get(self.make_path(key))
+ if value is not None:
+ # consul turns empty strings to None, so we do the reverse here
+ self._cache[key] = value['Value'] or ''
+ return value['Value'] or ''
+ else:
+ raise KeyError(key)
+
+ def __contains__(self, key):
+ if key in self._cache:
+ return True
+ value = self._kv_get(self.make_path(key))
+ if value is not None:
+ self._cache[key] = value['Value']
+ return True
+ else:
+ return False
+
+ def __setitem__(self, key, value):
+ try:
+ assert isinstance(value, basestring)
+ self._cache[key] = value
+ self._kv_put(self.make_path(key), value)
+ except Exception, e:
+ self.log.exception('cannot-set-item', e=e)
+
+ def __delitem__(self, key):
+ self._cache.pop(key, None)
+ self._kv_delete(self.make_path(key))
+
+ @inlineCallbacks
+ def _backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries,
+ len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ self.log.error(msg, retry_in=wait_time)
+ yield asleep(wait_time)
+
+ def _redo_consul_connection(self):
+ self._consul = Consul(host=self.host, port=self.port)
+ self._cache.clear()
+
+ def _clear_backoff(self):
+ if self.retries:
+ self.log.info('reconnected-to-consul', after_retries=self.retries)
+ self.retries = 0
+
+ def _get_consul(self):
+ return self._consul
+
+ # Proxy methods for consul with retry support
+ def _kv_get(self, *args, **kw):
+ return self._retry('GET', *args, **kw)
+
+ def _kv_put(self, *args, **kw):
+ return self._retry('PUT', *args, **kw)
+
+ def _kv_delete(self, *args, **kw):
+ return self._retry('DELETE', *args, **kw)
+
+ def _retry(self, operation, *args, **kw):
+ while 1:
+ try:
+ consul = self._get_consul()
+ self.log.debug('consul', consul=consul, operation=operation,
+ args=args)
+ if operation == 'GET':
+ index, result = consul.kv.get(*args, **kw)
+ elif operation == 'PUT':
+ result = consul.kv.put(*args, **kw)
+ elif operation == 'DELETE':
+ result = consul.kv.delete(*args, **kw)
+ else:
+ # Default case - consider operation as a function call
+ result = operation(*args, **kw)
+ self._clear_backoff()
+ break
+ except ConsulException, e:
+ self.log.exception('consul-not-up', e=e)
+ self._backoff('consul-not-up')
+ except ConnectionError, e:
+ self.log.exception('cannot-connect-to-consul', e=e)
+ self._backoff('cannot-connect-to-consul')
+ except Exception, e:
+ self.log.exception(e)
+ self._backoff('unknown-error')
+ self._redo_consul_connection()
+
+ return result
+
+
+class EtcdStore(object):
+ """ Config kv store for etcd with a cache for quicker subsequent reads
+
+ TODO: This will block the reactor. Should either change
+ whole call stack to yield or put the put/delete transactions into a
+ queue to write later with twisted. Will need a transaction
+ log to ensure we don't lose anything.
+ Making the whole callstack yield is troublesome because other tasks can
+ come in on the side and start modifying things which could be bad.
+ """
+
+ CONNECT_RETRY_INTERVAL_SEC = 1
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+ def __init__(self, host, port, path_prefix):
+
+ self.log = structlog.get_logger()
+ self._etcd = etcd3.client(host=host, port=port)
+ self.host = host
+ self.port = port
+ self._path_prefix = path_prefix
+ self._cache = {}
+ self.retries = 0
+
+ def make_path(self, key):
+ return '{}/{}'.format(self._path_prefix, key)
+
+ def __getitem__(self, key):
+ if key in self._cache:
+ return self._cache[key]
+ (value, meta) = self._kv_get(self.make_path(key))
+ if value is not None:
+ self._cache[key] = value
+ return value
+ else:
+ raise KeyError(key)
+
+ def __contains__(self, key):
+ if key in self._cache:
+ return True
+ (value, meta) = self._kv_get(self.make_path(key))
+ if value is not None:
+ self._cache[key] = value
+ return True
+ else:
+ return False
+
+ def __setitem__(self, key, value):
+ try:
+ assert isinstance(value, basestring)
+ self._cache[key] = value
+ self._kv_put(self.make_path(key), value)
+ except Exception, e:
+ self.log.exception('cannot-set-item', e=e)
+
+ def __delitem__(self, key):
+ self._cache.pop(key, None)
+ self._kv_delete(self.make_path(key))
+
+ @inlineCallbacks
+ def _backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries,
+ len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ self.log.error(msg, retry_in=wait_time)
+ yield asleep(wait_time)
+
+ def _redo_etcd_connection(self):
+ self._etcd = etcd3.client(host=self.host, port=self.port)
+ self._cache.clear()
+
+ def _clear_backoff(self):
+ if self.retries:
+ self.log.info('reconnected-to-etcd', after_retries=self.retries)
+ self.retries = 0
+
+ def _get_etcd(self):
+ return self._etcd
+
+ # Proxy methods for etcd with retry support
+ def _kv_get(self, *args, **kw):
+ return self._retry('GET', *args, **kw)
+
+ def _kv_put(self, *args, **kw):
+ return self._retry('PUT', *args, **kw)
+
+ def _kv_delete(self, *args, **kw):
+ return self._retry('DELETE', *args, **kw)
+
+ def _retry(self, operation, *args, **kw):
+
+ # etcd data sometimes contains non-utf8 sequences, replace
+ self.log.debug('backend-op',
+ operation=operation,
+ args=map(lambda x : unicode(x,'utf8','replace'), args),
+ kw=kw)
+
+ while 1:
+ try:
+ etcd = self._get_etcd()
+ self.log.debug('etcd', etcd=etcd, operation=operation,
+ args=map(lambda x : unicode(x,'utf8','replace'), args))
+ if operation == 'GET':
+ (value, meta) = etcd.get(*args, **kw)
+ result = (value, meta)
+ elif operation == 'PUT':
+ result = etcd.put(*args, **kw)
+ elif operation == 'DELETE':
+ result = etcd.delete(*args, **kw)
+ else:
+ # Default case - consider operation as a function call
+ result = operation(*args, **kw)
+ self._clear_backoff()
+ break
+ except Exception, e:
+ self.log.exception(e)
+ self._backoff('unknown-error-with-etcd')
+ self._redo_etcd_connection()
+
+ return result
+
+
+def load_backend(store_id, store_prefix, args):
+ """ Return the kv store backend based on the command line arguments
+ """
+
+ def load_consul_store():
+ instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
+
+ host, port = args.consul.split(':', 1)
+ return ConsulStore(host, int(port), instance_core_store_prefix)
+
+ def load_etcd_store():
+ instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
+
+ host, port = args.etcd.split(':', 1)
+ return EtcdStore(host, int(port), instance_core_store_prefix)
+
+ loaders = {
+ 'none': lambda: None,
+ 'consul': load_consul_store,
+ 'etcd': load_etcd_store
+ }
+
+ return loaders[args.backend]()
diff --git a/python/core/config/config_branch.py b/python/core/config/config_branch.py
new file mode 100644
index 0000000..207818b
--- /dev/null
+++ b/python/core/config/config_branch.py
@@ -0,0 +1,53 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Class to hold revisions, latest revision, etc., for a config node, used
+for the active committed revisions or revisions part of a transaction.
+"""
+
+from collections import OrderedDict
+from weakref import WeakValueDictionary
+
+
+class ConfigBranch(object):
+
+ __slots__ = (
+ '_node', # ref to node
+ '_txid', # txid for this branch (None for the committed branch)
+ '_origin', # _latest at time of branching on default branch
+ '_revs', # dict of rev-hash to ref of ConfigRevision
+ '_latest', # ref to latest committed ConfigRevision
+ '__weakref__'
+ )
+
+ def __init__(self, node, txid=None, origin=None, auto_prune=True):
+ self._node = node
+ self._txid = txid
+ self._origin = origin
+ self._revs = WeakValueDictionary() if auto_prune else OrderedDict()
+ self._latest = origin
+
+ def __getitem__(self, hash):
+ return self._revs[hash]
+
+ @property
+ def latest(self):
+ return self._latest
+
+ @property
+ def origin(self):
+ return self._origin
diff --git a/python/core/config/config_event_bus.py b/python/core/config/config_event_bus.py
new file mode 100644
index 0000000..e56d77a
--- /dev/null
+++ b/python/core/config/config_event_bus.py
@@ -0,0 +1,66 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import structlog
+from enum import Enum
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+
+from common.event_bus import EventBusClient
+from voltha.core.config.config_proxy import CallbackType
+from voltha.protos import third_party
+from voltha.protos.events_pb2 import ConfigEvent, ConfigEventType
+
+IGNORED_CALLBACKS = [CallbackType.PRE_ADD, CallbackType.GET,
+ CallbackType.POST_LISTCHANGE, CallbackType.PRE_REMOVE,
+ CallbackType.PRE_UPDATE]
+
+log = structlog.get_logger()
+
+class ConfigEventBus(object):
+
+ __slots__ = (
+ '_event_bus_client', # The event bus client used to publish events.
+ '_topic' # the topic to publish to
+ )
+
+ def __init__(self):
+ self._event_bus_client = EventBusClient()
+ self._topic = 'model-change-events'
+
+ def advertise(self, type, data, hash=None):
+ if type in IGNORED_CALLBACKS:
+ log.info('Ignoring event {} with data {}'.format(type, data))
+ return
+
+ if type is CallbackType.POST_ADD:
+ kind = ConfigEventType.add
+ elif type is CallbackType.POST_REMOVE:
+ kind = ConfigEventType.remove
+ else:
+ kind = ConfigEventType.update
+
+ if isinstance(data, Message):
+ msg = dumps(MessageToDict(data, True, True))
+ else:
+ msg = data
+
+ event = ConfigEvent(
+ type=kind,
+ hash=hash,
+ data=msg
+ )
+
+ self._event_bus_client.publish(self._topic, event)
+
diff --git a/python/core/config/config_node.py b/python/core/config/config_node.py
new file mode 100644
index 0000000..ab73484
--- /dev/null
+++ b/python/core/config/config_node.py
@@ -0,0 +1,617 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from copy import copy
+
+from jsonpatch import JsonPatch
+from jsonpatch import make_patch
+
+from common.utils.json_format import MessageToDict
+from voltha.core.config.config_branch import ConfigBranch
+from voltha.core.config.config_event_bus import ConfigEventBus
+from voltha.core.config.config_proxy import CallbackType, ConfigProxy
+from voltha.core.config.config_rev import is_proto_message, children_fields, \
+ ConfigRevision, access_rights
+from voltha.core.config.config_rev_persisted import PersistedConfigRevision
+from voltha.core.config.merge_3way import merge_3way
+from voltha.protos import third_party
+from voltha.protos import meta_pb2
+
+import structlog
+
+log = structlog.get_logger()
+
+def message_to_dict(m):
+ return MessageToDict(m, True, True, False)
+
+
+def check_access_violation(new_msg, old_msg):
+ """Raise ValueError if attempt is made to change a read-only field"""
+ access_map = access_rights(new_msg.__class__)
+ violated_fields = []
+ for field_name, access in access_map.iteritems():
+ if access == meta_pb2.READ_ONLY:
+ if getattr(new_msg, field_name) != getattr(old_msg, field_name):
+ violated_fields.append(field_name)
+ if violated_fields:
+ raise ValueError('Cannot change read-only field(s) %s' %
+ ', '.join('"%s"' % f for f in violated_fields))
+
+
+def find_rev_by_key(revs, keyname, value):
+ for i, rev in enumerate(revs):
+ if getattr(rev._config._data, keyname) == value:
+ return i, rev
+ raise KeyError('key {}={} not found'.format(keyname, value))
+
+
+class ConfigNode(object):
+ """
+ Represents a configuration node which can hold a number of revisions
+ of the configuration for this node.
+ When the configuration changes, the new version is appended to the
+ node.
+ Initial data must be a protobuf message and it will determine the type of
+ this node.
+ """
+ __slots__ = (
+ '_root', # ref to root node
+ '_type', # node type, as __class__ of protobuf message
+ '_branches', # dict of transaction branches and a default (committed)
+ # branch
+ '_tags', # dict of tag-name to ref of ConfigRevision
+ '_proxy', # ref to proxy observer or None if no proxy assigned
+ '_event_bus', # ref to event_bus or None if no event bus is assigned
+ '_auto_prune'
+ )
+
+ def __init__(self, root, initial_data, auto_prune=True, txid=None):
+ self._root = root
+ self._branches = {}
+ self._tags = {}
+ self._proxy = None
+ self._event_bus = None
+ self._auto_prune = auto_prune
+
+ if isinstance(initial_data, type):
+ self._type = initial_data
+ elif is_proto_message(initial_data):
+ self._type = initial_data.__class__
+ copied_data = initial_data.__class__()
+ copied_data.CopyFrom(initial_data)
+ self._initialize(copied_data, txid)
+ else:
+ raise NotImplementedError()
+
+ def _mknode(self, *args, **kw):
+ return ConfigNode(self._root, *args, **kw)
+
+ def _mkrev(self, *args, **kw):
+ return self._root.mkrev(*args, **kw)
+
+ def _initialize(self, data, txid):
+ # separate external children data away from locally stored data
+ # based on child_node annotations in protobuf
+ children = {}
+ for field_name, field in children_fields(self._type).iteritems():
+ field_value = getattr(data, field_name)
+ if field.is_container:
+ if field.key:
+ keys_seen = set()
+ children[field_name] = lst = []
+ for v in field_value:
+ rev = self._mknode(v, txid=txid).latest
+ key = getattr(v, field.key)
+ if key in keys_seen:
+ raise ValueError('Duplicate key "{}"'.format(key))
+ lst.append(rev)
+ keys_seen.add(key)
+ else:
+ children[field_name] = [
+ self._mknode(v, txid=txid).latest for v in field_value]
+ else:
+ children[field_name] = [
+ self._mknode(field_value, txid=txid).latest]
+ data.ClearField(field_name)
+
+ branch = ConfigBranch(self, auto_prune=self._auto_prune)
+ rev = self._mkrev(branch, data, children)
+ self._make_latest(branch, rev)
+ self._branches[txid] = branch
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ accessors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ # these convenience short-cuts only work for the committed branch
+
+ @property
+ def revisions(self):
+ return [r._hash for r in self._branches[None]._revs.itervalues()]
+
+ @property
+ def latest(self):
+ return self._branches[None]._latest
+
+ def __getitem__(self, hash):
+ return self._branches[None]._revs[hash]
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ get operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def get(self, path=None, hash=None, depth=0, deep=False, txid=None):
+
+ # depth preparation
+ if deep:
+ depth = -1
+
+ # path preparation
+ path = '' if path is None else path
+ while path.startswith('/'):
+ path = path[1:]
+
+ # determine branch; if lookup fails, it is ok to use default branch
+ branch = self._branches.get(txid, None) or self._branches[None]
+
+ # determine rev
+ if hash is not None:
+ rev = branch._revs[hash]
+ else:
+ rev = branch.latest
+
+ return self._get(rev, path, depth)
+
+ def _get(self, rev, path, depth):
+
+ if not path:
+ return self._do_get(rev, depth)
+
+ # ... otherwise
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if field.key:
+ children = rev._children[name]
+ if path:
+ # need to escalate further
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ _, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ return child_node._get(child_rev, path, depth)
+ else:
+ # we are the node of interest
+ response = []
+ for child_rev in children:
+ child_node = child_rev.node
+ value = child_node._do_get(child_rev, depth)
+ response.append(value)
+ return response
+ else:
+ if path:
+ raise LookupError(
+ 'Cannot index into container with no key defined')
+ response = []
+ for child_rev in rev._children[name]:
+ child_node = child_rev.node
+ value = child_node._do_get(child_rev, depth)
+ response.append(value)
+ return response
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ return child_node._get(child_rev, path, depth)
+
+ def _do_get(self, rev, depth):
+ msg = rev.get(depth)
+ if self._proxy is not None:
+ msg = self._proxy.invoke_callbacks(CallbackType.GET, msg)
+ return msg
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ update operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def update(self, path, data, strict=False, txid=None, mk_branch=None):
+
+ while path.startswith('/'):
+ path = path[1:]
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ if not path:
+ return self._do_update(branch, data, strict)
+
+ rev = branch._latest # change is always made to the latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError('Cannot update a list')
+ if field.key:
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ # chek if deep copy will work better
+ new_child_rev = child_node.update(
+ path, data, strict, txid, mk_branch)
+ if new_child_rev.hash == child_rev.hash:
+ # When the new_child_rev goes out of scope,
+ # it's destructor gets invoked as it is not being
+ # referred by any other data structures. To prevent
+ # this to trigger the hash it is holding from being
+ # erased in the db, its hash is set to None. If the
+ # new_child_rev object is pointing at the same address
+ # as the child_rev address then do not clear the hash
+ if new_child_rev != child_rev:
+ log.debug('clear-hash',
+ hash=new_child_rev.hash, object_ref=new_child_rev)
+ new_child_rev.clear_hash()
+ return branch._latest
+ if getattr(new_child_rev.data, field.key) != key:
+ raise ValueError('Cannot change key field')
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ raise ValueError('Cannot index into container with no keys')
+
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ new_child_rev = child_node.update(
+ path, data, strict, txid, mk_branch)
+ rev = rev.update_children(name, [new_child_rev], branch)
+ self._make_latest(branch, rev)
+ return rev
+
+ def _do_update(self, branch, data, strict):
+ if not isinstance(data, self._type):
+ raise ValueError(
+ '"{}" is not a valid data type for this node'.format(
+ data.__class__.__name__))
+ self._test_no_children(data)
+ if self._proxy is not None:
+ self._proxy.invoke_callbacks(CallbackType.PRE_UPDATE, data)
+
+ if branch._latest.data != data:
+ if strict:
+ # check if attempt is made to change read-only field
+ check_access_violation(data, branch._latest.data)
+ rev = branch._latest.update_data(data, branch)
+ self._make_latest(branch, rev,
+ ((CallbackType.POST_UPDATE, rev.data),))
+ return rev
+ else:
+ return branch._latest
+
+ def _make_latest(self, branch, rev, change_announcements=()):
+ # Update the latest branch only when the hash between the previous
+ # data and the new rev is different, otherwise this will trigger the
+ # data already saved in the db (with that hash) to be erased
+ if rev.hash not in branch._revs:
+ branch._revs[rev.hash] = rev
+
+ if not branch._latest or rev.hash != branch._latest.hash:
+ branch._latest = rev
+
+ # announce only if this is main branch
+ if change_announcements and branch._txid is None:
+
+ if self._proxy is not None:
+ for change_type, data in change_announcements:
+ # since the callback may operate on the config tree,
+ # we have to defer the execution of the callbacks till
+ # the change is propagated to the root, then root will
+ # call the callbacks
+ self._root.enqueue_callback(
+ self._proxy.invoke_callbacks,
+ change_type,
+ data,
+ proceed_on_errors=1,
+ )
+
+ for change_type, data in change_announcements:
+ self._root.enqueue_notification_callback(
+ self._mk_event_bus().advertise,
+ change_type,
+ data,
+ hash=rev.hash
+ )
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ add operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def add(self, path, data, txid=None, mk_branch=None):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ raise ValueError('Cannot add to non-container node')
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ rev = branch._latest # change is always made to latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ # we do need to add a new child to the field
+ if field.key:
+ if self._proxy is not None:
+ self._proxy.invoke_callbacks(
+ CallbackType.PRE_ADD, data)
+ children = copy(rev._children[name])
+ key = getattr(data, field.key)
+ try:
+ find_rev_by_key(children, field.key, key)
+ except KeyError:
+ pass
+ else:
+ raise ValueError('Duplicate key "{}"'.format(key))
+ child_rev = self._mknode(data).latest
+ children.append(child_rev)
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev,
+ ((CallbackType.POST_ADD, data),))
+ return rev
+ else:
+ # adding to non-keyed containers not implemented yet
+ raise ValueError('Cannot add to non-keyed container')
+ else:
+ if field.key:
+ # need to escalate
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ new_child_rev = child_node.add(path, data, txid, mk_branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ raise ValueError(
+ 'Cannot index into container with no keys')
+ else:
+ raise ValueError('Cannot add to non-container field')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ remove operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def remove(self, path, txid=None, mk_branch=None):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ raise ValueError('Cannot remove from non-container node')
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ rev = branch._latest # change is always made to latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError("Cannot remove without a key")
+ if field.key:
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ if path:
+ # need to escalate
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ new_child_rev = child_node.remove(path, txid, mk_branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ # need to remove from this very node
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ if self._proxy is not None:
+ data = child_rev.data
+ self._proxy.invoke_callbacks(
+ CallbackType.PRE_REMOVE, data)
+ post_anno = ((CallbackType.POST_REMOVE, data),)
+ else:
+ post_anno = ((CallbackType.POST_REMOVE, child_rev.data),)
+ del children[idx]
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev, post_anno)
+ return rev
+ else:
+ raise ValueError('Cannot remove from non-keyed container')
+ else:
+ raise ValueError('Cannot remove non-conatiner field')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _mk_txbranch(self, txid):
+ branch_point = self._branches[None].latest
+ branch = ConfigBranch(self, txid, branch_point)
+ self._branches[txid] = branch
+ return branch
+
+ def _del_txbranch(self, txid):
+ del self._branches[txid]
+
+ def _merge_txbranch(self, txid, dry_run=False):
+ """
+ Make latest in branch to be latest in the common branch, but only
+ if no conflict is detected. Conflict is where the txbranch branch
+ point no longer matches the latest in the default branch. This has
+ to be verified recursively.
+ """
+
+ def merge_child(child_rev):
+ child_branch = child_rev._branch
+ if child_branch._txid == txid:
+ child_rev = child_branch._node._merge_txbranch(txid, dry_run)
+ return child_rev
+
+ src_branch = self._branches[txid]
+ dst_branch = self._branches[None]
+
+ fork_rev = src_branch.origin # rev from which src branch was made
+ src_rev = src_branch.latest # head rev of source branch
+ dst_rev = dst_branch.latest # head rev of target branch
+
+ rev, changes = merge_3way(
+ fork_rev, src_rev, dst_rev, merge_child, dry_run)
+
+ if not dry_run:
+ self._make_latest(dst_branch, rev, change_announcements=changes)
+ del self._branches[txid]
+
+ return rev
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def diff(self, hash1, hash2=None, txid=None):
+ branch = self._branches[txid]
+ rev1 = branch[hash1]
+ rev2 = branch[hash2] if hash2 else branch._latest
+ if rev1.hash == rev2.hash:
+ return JsonPatch([])
+ else:
+ dict1 = message_to_dict(rev1.data)
+ dict2 = message_to_dict(rev2.data)
+ return make_patch(dict1, dict2)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Tagging utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def tag(self, tag, hash=None):
+ branch = self._branches[None] # tag only what has been committed
+ rev = branch._latest if hash is None else branch._revs[hash]
+ self._tags[tag] = rev
+ self.persist_tags()
+ return self
+
+ @property
+ def tags(self):
+ return sorted(self._tags.iterkeys())
+
+ def by_tag(self, tag):
+ """
+ Return revision based on tag
+ :param tag: previously registered tag value
+ :return: revision object
+ """
+ return self._tags[tag]
+
+ def diff_by_tag(self, tag1, tag2):
+ return self.diff(self._tags[tag1].hash, self._tags[tag2].hash)
+
+ def delete_tag(self, tag):
+ del self._tags[tag]
+ self.persist_tags()
+
+ def delete_tags(self, *tags):
+ for tag in tags:
+ del self._tags[tag]
+ self.persist_tags()
+
+ def prune_untagged(self):
+ branch = self._branches[None]
+ keep = set(rev.hash for rev in self._tags.itervalues())
+ keep.add(branch._latest.hash)
+ for hash in branch._revs.keys():
+ if hash not in keep:
+ del branch._revs[hash]
+ return self
+
+ def persist_tags(self):
+ """
+ Persist tag information to the backend
+ """
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Internals ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _test_no_children(self, data):
+ for field_name, field in children_fields(self._type).items():
+ field_value = getattr(data, field_name)
+ if field.is_container:
+ if len(field_value):
+ raise NotImplementedError(
+ 'Cannot update external children')
+ else:
+ if data.HasField(field_name):
+ raise NotImplementedError(
+ 'Cannot update externel children')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Node proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def get_proxy(self, path, exclusive=False):
+ return self._get_proxy(path, self, path, exclusive)
+
+ def _get_proxy(self, path, root, full_path, exclusive):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ return self._mk_proxy(root, full_path, exclusive)
+
+ # need to escalate
+ rev = self._branches[None]._latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError('Cannot proxy a container field')
+ if field.key:
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ children = rev._children[name]
+ _, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ return child_node._get_proxy(path, root, full_path, exclusive)
+
+ raise ValueError('Cannot index into container with no keys')
+
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ return child_node._get_proxy(path, root, full_path, exclusive)
+
+ def _mk_proxy(self, root, full_path, exclusive):
+ if self._proxy is None:
+ self._proxy = ConfigProxy(root, self, full_path, exclusive)
+ else:
+ if self._proxy.exclusive:
+ raise ValueError('Node is already owned exclusively')
+ return self._proxy
+
+ def _mk_event_bus(self):
+ if self._event_bus is None:
+ self._event_bus = ConfigEventBus()
+ return self._event_bus
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~ Persistence loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def load_latest(self, latest_hash):
+
+ root = self._root
+ kv_store = root._kv_store
+
+ branch = ConfigBranch(node=self, auto_prune=self._auto_prune)
+ rev = PersistedConfigRevision.load(
+ branch, kv_store, self._type, latest_hash)
+ self._make_latest(branch, rev)
+ self._branches[None] = branch
diff --git a/python/core/config/config_proxy.py b/python/core/config/config_proxy.py
new file mode 100644
index 0000000..57d8150
--- /dev/null
+++ b/python/core/config/config_proxy.py
@@ -0,0 +1,155 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from enum import Enum
+
+from voltha.core.config.config_txn import ConfigTransaction
+
+log = structlog.get_logger()
+
+
+class OperationContext(object):
+ def __init__(self, path=None, data=None, field_name=None, child_key=None):
+ self.path = path
+ self._data = data
+ self.field_name = field_name
+ self.child_key = child_key
+ @property
+ def data(self):
+ return self._data
+ def update(self, data):
+ self._data = data
+ return self
+ def __repr__(self):
+ return 'OperationContext({})'.format(self.__dict__)
+
+
+class CallbackType(Enum):
+
+ # GET hooks are called after the data is retrieved and can be used to
+ # augment the data (they should only augment fields marked as REAL_TIME
+ GET = 1
+
+ # PRE_UPDATE hooks are called before the change is made and are supposed
+ # to be used to reject the data by raising an exception. If they don't,
+ # the change will be applied.
+ PRE_UPDATE = 2
+
+ # POST_UPDATE hooks are called after the update has occurred and can
+ # be used to deal with the change. For instance, an adapter can use the
+ # callback to trigger the south-bound configuration
+ POST_UPDATE = 3
+
+ # These behave similarly to the update callbacks as described above.
+ PRE_ADD = 4
+ POST_ADD = 5
+ PRE_REMOVE = 6
+ POST_REMOVE = 7
+
+ # Bulk list change due to transaction commit that changed items in
+ # non-keyed container fields
+ POST_LISTCHANGE = 8
+
+
+class ConfigProxy(object):
+ """
+ Allows an entity to look at a sub-tree and see it as it was the whole tree
+ """
+ __slots__ = (
+ '_root',
+ '_node',
+ '_path',
+ '_exclusive',
+ '_callbacks'
+ )
+
+ def __init__(self, root, node, path, exclusive):
+ self._root = root
+ self._node = node
+ self._exclusive = exclusive
+ self._path = path # full path to proxied node
+ self._callbacks = {} # call back type -> list of callbacks
+
+ @property
+ def exclusive(self):
+ return self._exclusive
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~ CRUD handlers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def get(self, path='/', depth=None, deep=None, txid=None):
+ return self._node.get(path, depth=depth, deep=deep, txid=txid)
+
+ def update(self, path, data, strict=False, txid=None):
+ assert path.startswith('/')
+ full_path = self._path if path == '/' else self._path + path
+ return self._root.update(full_path, data, strict, txid=txid)
+
+ def add(self, path, data, txid=None):
+ assert path.startswith('/')
+ full_path = self._path if path == '/' else self._path + path
+ return self._root.add(full_path, data, txid=txid)
+
+ def remove(self, path, txid=None):
+ assert path.startswith('/')
+ full_path = self._path if path == '/' else self._path + path
+ return self._root.remove(full_path, txid=txid)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~ Transaction support ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def open_transaction(self):
+ """Open a new transaction"""
+ txid = self._root.mk_txbranch()
+ return ConfigTransaction(self, txid)
+
+ def commit_transaction(self, txid):
+ """
+ If having an open transaction, commit it now. Will raise exception
+ if conflict is detected. Either way, transaction will be deleted.
+ """
+ self._root.fold_txbranch(txid)
+
+ def cancel_transaction(self, txid):
+ """
+ Cancel current transaction if we are in a transaction. Always succeeds.
+ """
+ self._root.del_txbranch(txid)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~ Callbacks registrations ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def register_callback(self, callback_type, callback, *args, **kw):
+ lst = self._callbacks.setdefault(callback_type, [])
+ lst.append((callback, args, kw))
+
+ def unregister_callback(self, callback_type, callback, *args, **kw):
+ lst = self._callbacks.setdefault(callback_type, [])
+ if (callback, args, kw) in lst:
+ lst.remove((callback, args, kw))
+
+ # ~~~~~~~~~~~~~~~~~~~~~ Callback dispatch ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def invoke_callbacks(self, callback_type, context, proceed_on_errors=False):
+ lst = self._callbacks.get(callback_type, [])
+ for callback, args, kw in lst:
+ try:
+ context = callback(context, *args, **kw)
+ except Exception, e:
+ if proceed_on_errors:
+ log.exception(
+ 'call-back-error', callback_type=callback_type,
+ context=context, e=e)
+ else:
+ raise
+ return context
diff --git a/python/core/config/config_rev.py b/python/core/config/config_rev.py
new file mode 100644
index 0000000..8bfac18
--- /dev/null
+++ b/python/core/config/config_rev.py
@@ -0,0 +1,342 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Immutable classes to store config revision information arranged in a tree.
+
+Immutability cannot be enforced in Python, so anyoen working with these
+classes directly must obey the rules.
+"""
+
+import weakref
+from copy import copy
+from hashlib import md5
+
+from google.protobuf.descriptor import Descriptor
+from simplejson import dumps
+
+from common.utils.json_format import MessageToJson
+from voltha.protos import third_party
+from voltha.protos import meta_pb2
+
+import structlog
+
+log = structlog.get_logger()
+
+def is_proto_message(o):
+ """
+ Return True if object o appears to be a protobuf message; False otherwise.
+ """
+ # use a somewhat empirical approach to decide if something looks like
+ # a protobuf message
+ return isinstance(getattr(o, 'DESCRIPTOR', None), Descriptor)
+
+
+def message_to_json_concise(m):
+ """
+ Return the most concise string representation of a protobuf. Good for
+ things where size matters (e.g., generating hash).
+ """
+ return MessageToJson(m, False, True, False)
+
+
+_rev_cache = weakref.WeakValueDictionary() # cache of config revs
+
+
+_children_fields_cache = {} # to memoize externally stored field name info
+
+
+class _ChildType(object):
+ """Used to store key metadata about child_node fields in protobuf messages.
+ """
+ __slots__ = (
+ '_module',
+ '_type',
+ '_is_container',
+ '_key',
+ '_key_from_str'
+ )
+
+ def __init__(self, module, type, is_container,
+ key=None, key_from_str=None):
+ self._module = module
+ self._type = type
+ self._is_container = is_container
+ self._key = key
+ self._key_from_str = key_from_str
+
+ @property
+ def is_container(self):
+ return self._is_container
+
+ @property
+ def key(self):
+ return self._key
+
+ @property
+ def key_from_str(self):
+ return self._key_from_str
+
+ @property
+ def module(self):
+ return self._module
+
+ @property
+ def type(self):
+ return self._type
+
+
+def children_fields(cls):
+ """
+ Return a map of externally stored fields for this protobuf message type.
+ What is stored as branch node is determined by the "child_node"
+ annotation in the protobuf definitions.
+ With each external field, we store if the field is a container, if a
+ container is keyed (indexed), and what is the function that converts
+ path substring back to the key.
+ """
+ names = _children_fields_cache.get(cls)
+
+ if names is None:
+ names = {}
+
+ for field in cls.DESCRIPTOR.fields:
+
+ if field.has_options:
+ options = field.GetOptions()
+
+ if options.HasExtension(meta_pb2.child_node):
+ is_container = field.label == 3
+ meta = options.Extensions[meta_pb2.child_node]
+ key_from_str = None
+
+ if meta.key:
+ key_field = field.message_type.fields_by_name[meta.key]
+ key_type = key_field.type
+
+ if key_type == key_field.TYPE_STRING:
+ key_from_str = lambda s: s
+
+ elif key_type in (
+ key_field.TYPE_FIXED32,
+ key_field.TYPE_FIXED64,
+ key_field.TYPE_INT32,
+ key_field.TYPE_INT64,
+ key_field.TYPE_SFIXED32,
+ key_field.TYPE_SFIXED64,
+ key_field.TYPE_SINT32,
+ key_field.TYPE_SINT64,
+ key_field.TYPE_UINT32,
+ key_field.TYPE_UINT64):
+ key_from_str = lambda s: int(s)
+
+ else:
+ raise NotImplementedError()
+
+ field_class = field.message_type._concrete_class
+ names[field.name] = _ChildType(
+ module=field_class.__module__,
+ type=field_class.__name__,
+ is_container=is_container,
+ key=meta.key,
+ key_from_str=key_from_str
+ )
+
+ _children_fields_cache[cls] = names
+
+ return names
+
+
+_access_right_cache = {} # to memoize field access right restrictions
+
+
+def access_rights(cls):
+ """
+ Determine the access rights for each field and cache these maps for
+ fast retrieval.
+ """
+ access_map = _access_right_cache.get(cls)
+ if access_map is None:
+ access_map = {}
+ for field in cls.DESCRIPTOR.fields:
+ if field.has_options:
+ options = field.GetOptions()
+ if options.HasExtension(meta_pb2.access):
+ access = options.Extensions[meta_pb2.access]
+ access_map[field.name] = access
+ _access_right_cache[cls] = access_map
+ return access_map
+
+
+class ConfigDataRevision(object):
+ """
+ Holds a specific snapshot of the local configuration for config node.
+ It shall be treated as an immutable object, although in Python this is
+ very difficult to enforce!
+ As such, we can compute a unique hash based on the config data which
+ can be used to establish equivalence. It also has a time-stamp to track
+ changes.
+
+ This object must be treated as immutable, including its nested config data.
+ This is very important. The entire config module depends on hashes
+ we create over the data, so altering the data can lead to unpredictable
+ detriments.
+ """
+
+ __slots__ = (
+ '_data',
+ '_hash',
+ '__weakref__'
+ )
+
+ def __init__(self, data):
+ self._data = data
+ self._hash = self._hash_data(data)
+
+ @property
+ def data(self):
+ return self._data
+
+ @property
+ def hash(self):
+ return self._hash
+
+ @staticmethod
+ def _hash_data(data):
+ """Hash function to be used to track version changes of config nodes"""
+ if isinstance(data, (dict, list)):
+ to_hash = dumps(data, sort_keys=True)
+ elif is_proto_message(data):
+ to_hash = ':'.join((
+ data.__class__.__module__,
+ data.__class__.__name__,
+ data.SerializeToString()))
+ else:
+ to_hash = str(hash(data))
+ return md5(to_hash).hexdigest()[:12]
+
+
+class ConfigRevision(object):
+ """
+ Holds not only the local config data, but also the external children
+ reference lists, per field name.
+ Recall that externally stored fields are those marked "child_node" in
+ the protobuf definition.
+ This object must be treated as immutable, including its config data.
+ """
+
+ __slots__ = (
+ '_config',
+ '_children',
+ '_hash',
+ '_branch',
+ '__weakref__'
+ )
+
+ def __init__(self, branch, data, children=None):
+ self._branch = branch
+ self._config = ConfigDataRevision(data)
+ self._children = children
+ self._finalize()
+
+ def _finalize(self):
+ self._hash = self._hash_content()
+ if self._hash not in _rev_cache:
+ _rev_cache[self._hash] = self
+ if self._config._hash not in _rev_cache:
+ _rev_cache[self._config._hash] = self._config
+ else:
+ self._config = _rev_cache[self._config._hash] # re-use!
+
+ def _hash_content(self):
+ # hash is derived from config hash and hashes of all children
+ m = md5('' if self._config is None else self._config._hash)
+ if self._children is not None:
+ for child_field in sorted(self._children.keys()):
+ children = self._children[child_field]
+ assert isinstance(children, list)
+ m.update(''.join(c._hash for c in children))
+ return m.hexdigest()[:12]
+
+ @property
+ def hash(self):
+ return self._hash
+
+ @property
+ def data(self):
+ return None if self._config is None else self._config.data
+
+ @property
+ def node(self):
+ return self._branch._node
+
+ @property
+ def type(self):
+ return self._config.data.__class__
+
+ def clear_hash(self):
+ self._hash = None
+
+ def get(self, depth):
+ """
+ Get config data of node. If depth > 0, recursively assemble the
+ branch nodes. If depth is < 0, this results in a fully exhaustive
+ "complete config".
+ """
+ orig_data = self._config.data
+ data = orig_data.__class__()
+ data.CopyFrom(orig_data)
+ if depth:
+ # collect children
+ cfields = children_fields(self.type).iteritems()
+ for field_name, field in cfields:
+ if field.is_container:
+ for rev in self._children[field_name]:
+ child_data = rev.get(depth=depth - 1)
+ child_data_holder = getattr(data, field_name).add()
+ child_data_holder.MergeFrom(child_data)
+ else:
+ rev = self._children[field_name][0]
+ child_data = rev.get(depth=depth - 1)
+ child_data_holder = getattr(data, field_name)
+ child_data_holder.MergeFrom(child_data)
+ return data
+
+ def update_data(self, data, branch):
+ """Return a NEW revision which is updated for the modified data"""
+ new_rev = copy(self)
+ new_rev._branch = branch
+ new_rev._config = self._config.__class__(data)
+ new_rev._finalize()
+ return new_rev
+
+ def update_children(self, name, children, branch):
+ """Return a NEW revision which is updated for the modified children"""
+ new_children = self._children.copy()
+ new_children[name] = children
+ new_rev = copy(self)
+ new_rev._branch = branch
+ new_rev._children = new_children
+ new_rev._finalize()
+ return new_rev
+
+ def update_all_children(self, children, branch):
+ """Return a NEW revision which is updated for all children entries"""
+ new_rev = copy(self)
+ new_rev._branch = branch
+ new_rev._children = children
+ new_rev._finalize()
+ return new_rev
diff --git a/python/core/config/config_rev_persisted.py b/python/core/config/config_rev_persisted.py
new file mode 100644
index 0000000..8b25b82
--- /dev/null
+++ b/python/core/config/config_rev_persisted.py
@@ -0,0 +1,143 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A config rev object that persists itself
+"""
+from bz2 import compress, decompress
+
+import structlog
+from simplejson import dumps, loads
+
+from voltha.core.config.config_rev import ConfigRevision, children_fields
+
+log = structlog.get_logger()
+
+
+class PersistedConfigRevision(ConfigRevision):
+
+ compress = False
+
+ __slots__ = ('_kv_store',)
+
+ def __init__(self, branch, data, children=None):
+ self._kv_store = branch._node._root._kv_store
+ super(PersistedConfigRevision, self).__init__(branch, data, children)
+
+ def _finalize(self):
+ super(PersistedConfigRevision, self)._finalize()
+ self.store()
+
+ def __del__(self):
+ try:
+ if self._hash:
+ if self._config.__weakref__ is None:
+ if self._config._hash in self._kv_store:
+ del self._kv_store[self._config._hash]
+ assert self.__weakref__ is None
+ if self._hash in self._kv_store:
+ del self._kv_store[self._hash]
+ except Exception, e:
+ # this should never happen
+ log.exception('del-error', hash=self.hash, e=e)
+
+ def store(self):
+
+ try:
+ # crude serialization of children hash and config data hash
+ if self._hash in self._kv_store:
+ return
+
+ self.store_config()
+
+ children_lists = {}
+ for field_name, children in self._children.iteritems():
+ hashes = [rev.hash for rev in children]
+ children_lists[field_name] = hashes
+
+ data = dict(
+ children=children_lists,
+ config=self._config._hash
+ )
+ blob = dumps(data)
+ if self.compress:
+ blob = compress(blob)
+
+ self._kv_store[self._hash] = blob
+
+ except Exception, e:
+ log.exception('store-error', e=e)
+
+ @classmethod
+ def load(cls, branch, kv_store, msg_cls, hash):
+ # Update the branch's config store
+ blob = kv_store[hash]
+ if cls.compress:
+ blob = decompress(blob)
+ data = loads(blob)
+
+ config_hash = data['config']
+ config_data = cls.load_config(kv_store, msg_cls, config_hash)
+
+ children_list = data['children']
+ assembled_children = {}
+ node = branch._node
+ for field_name, meta in children_fields(msg_cls).iteritems():
+ child_msg_cls = tmp_cls_loader(meta.module, meta.type)
+ children = []
+ for child_hash in children_list[field_name]:
+ child_node = node._mknode(child_msg_cls)
+ child_node.load_latest(child_hash)
+ child_rev = child_node.latest
+ children.append(child_rev)
+ assembled_children[field_name] = children
+ rev = cls(branch, config_data, assembled_children)
+ return rev
+
+ def store_config(self):
+ if self._config._hash in self._kv_store:
+ return
+
+ # crude serialization of config data
+ blob = self._config._data.SerializeToString()
+ if self.compress:
+ blob = compress(blob)
+
+ self._kv_store[self._config._hash] = blob
+
+ @classmethod
+ def load_config(cls, kv_store, msg_cls, config_hash):
+ blob = kv_store[config_hash]
+ if cls.compress:
+ blob = decompress(blob)
+
+ # TODO use a loader later on
+ data = msg_cls()
+ data.ParseFromString(blob)
+ return data
+
+
+def tmp_cls_loader(module_name, cls_name):
+ # TODO this shall be generalized
+ from voltha.protos import voltha_pb2, health_pb2, adapter_pb2, \
+ logical_device_pb2, device_pb2, openflow_13_pb2, bbf_fiber_base_pb2, \
+ bbf_fiber_traffic_descriptor_profile_body_pb2, \
+ bbf_fiber_tcont_body_pb2, bbf_fiber_gemport_body_pb2, \
+ bbf_fiber_multicast_gemport_body_pb2, \
+ bbf_fiber_multicast_distribution_set_body_pb2, \
+ omci_mib_db_pb2, \
+ omci_alarm_db_pb2
+ return getattr(locals()[module_name], cls_name)
diff --git a/python/core/config/config_root.py b/python/core/config/config_root.py
new file mode 100644
index 0000000..4b1006d
--- /dev/null
+++ b/python/core/config/config_root.py
@@ -0,0 +1,229 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from uuid import uuid4
+
+import structlog
+from simplejson import dumps, loads
+
+from voltha.core.config.config_node import ConfigNode
+from voltha.core.config.config_rev import ConfigRevision
+from voltha.core.config.config_rev_persisted import PersistedConfigRevision
+from voltha.core.config.merge_3way import MergeConflictException
+
+log = structlog.get_logger()
+
+
+class ConfigRoot(ConfigNode):
+
+ __slots__ = (
+ '_dirty_nodes', # holds set of modified nodes per transaction branch
+ '_kv_store',
+ '_loading',
+ '_rev_cls',
+ '_deferred_callback_queue',
+ '_notification_deferred_callback_queue'
+ )
+
+ def __init__(self, initial_data, kv_store=None, rev_cls=ConfigRevision):
+ self._kv_store = kv_store
+ self._dirty_nodes = {}
+ self._loading = False
+ if kv_store is not None and \
+ not issubclass(rev_cls, PersistedConfigRevision):
+ rev_cls = PersistedConfigRevision
+ self._rev_cls = rev_cls
+ self._deferred_callback_queue = []
+ self._notification_deferred_callback_queue = []
+ super(ConfigRoot, self).__init__(self, initial_data, False)
+
+ @property
+ def kv_store(self):
+ if self._loading:
+ # provide fake store for storing things
+ # TODO this shall be a fake_dict providing noop for all relevant
+ # operations
+ return dict()
+ else:
+ return self._kv_store
+
+ def mkrev(self, *args, **kw):
+ return self._rev_cls(*args, **kw)
+
+ def mk_txbranch(self):
+ txid = uuid4().hex[:12]
+ self._dirty_nodes[txid] = {self}
+ self._mk_txbranch(txid)
+ return txid
+
+ def del_txbranch(self, txid):
+ for dirty_node in self._dirty_nodes[txid]:
+ dirty_node._del_txbranch(txid)
+ del self._dirty_nodes[txid]
+
+ def fold_txbranch(self, txid):
+ try:
+ self._merge_txbranch(txid, dry_run=1)
+ except MergeConflictException:
+ self.del_txbranch(txid)
+ raise
+
+ try:
+ self._merge_txbranch(txid)
+ finally:
+ self.execute_deferred_callbacks()
+
+ # ~~~~~~ Overridden, root-level CRUD methods to handle transactions ~~~~~~~
+
+ def update(self, path, data, strict=None, txid=None, mk_branch=None):
+ assert mk_branch is None
+ self.check_callback_queue()
+ try:
+ if txid is not None:
+ dirtied = self._dirty_nodes[txid]
+
+ def track_dirty(node):
+ dirtied.add(node)
+ return node._mk_txbranch(txid)
+
+ res = super(ConfigRoot, self).update(path, data, strict,
+ txid, track_dirty)
+ else:
+ res = super(ConfigRoot, self).update(path, data, strict)
+ finally:
+ self.execute_deferred_callbacks()
+ return res
+
+ def add(self, path, data, txid=None, mk_branch=None):
+ assert mk_branch is None
+ self.check_callback_queue()
+ try:
+ if txid is not None:
+ dirtied = self._dirty_nodes[txid]
+
+ def track_dirty(node):
+ dirtied.add(node)
+ return node._mk_txbranch(txid)
+
+ res = super(ConfigRoot, self).add(path, data, txid, track_dirty)
+ else:
+ res = super(ConfigRoot, self).add(path, data)
+ finally:
+ self.execute_deferred_callbacks()
+ return res
+
+ def remove(self, path, txid=None, mk_branch=None):
+ assert mk_branch is None
+ self.check_callback_queue()
+ try:
+ if txid is not None:
+ dirtied = self._dirty_nodes[txid]
+
+ def track_dirty(node):
+ dirtied.add(node)
+ return node._mk_txbranch(txid)
+
+ res = super(ConfigRoot, self).remove(path, txid, track_dirty)
+ else:
+ res = super(ConfigRoot, self).remove(path)
+ finally:
+ self.execute_deferred_callbacks()
+ return res
+
+ def check_callback_queue(self):
+ assert len(self._deferred_callback_queue) == 0
+
+ def enqueue_callback(self, func, *args, **kw):
+ self._deferred_callback_queue.append((func, args, kw))
+
+ def enqueue_notification_callback(self, func, *args, **kw):
+ """
+ A separate queue is required for notification. Previously, when the
+ notifications were added to the self._deferred_callback_queue there
+ was a deadlock condition where two callbacks were added (one
+ related to the model change and one for the notification related to
+ that model change). Since the model change requires the
+ self._deferred_callback_queue to be empty then there was a deadlock
+ in that scenario. The simple approach to avoid this problem is to
+ have separate queues for model and notification.
+ TODO: Investigate whether there is a need for the
+ self._deferred_callback_queue to handle multiple model events at the same time
+ :param func: callback function
+ :param args: args
+ :param kw: key-value args
+ :return: None
+ """
+ self._notification_deferred_callback_queue.append((func, args, kw))
+
+ def execute_deferred_callbacks(self):
+ # First process the model-triggered related callbacks
+ while self._deferred_callback_queue:
+ func, args, kw = self._deferred_callback_queue.pop(0)
+ func(*args, **kw)
+
+ # Execute the notification callbacks
+ while self._notification_deferred_callback_queue:
+ func, args, kw = self._notification_deferred_callback_queue.pop(0)
+ func(*args, **kw)
+
+
+ # ~~~~~~~~~~~~~~~~ Persistence related ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ @classmethod
+ def load(cls, root_msg_cls, kv_store):
+ # need to use fake kv store during initial load for not to override
+ # our real k vstore
+ fake_kv_store = dict() # shall use more efficient mock dict
+ root = cls(root_msg_cls(), kv_store=fake_kv_store,
+ rev_cls=PersistedConfigRevision)
+ # we can install the real store now
+ root._kv_store = kv_store
+ root.load_from_persistence(root_msg_cls)
+ return root
+
+ def _make_latest(self, branch, *args, **kw):
+ super(ConfigRoot, self)._make_latest(branch, *args, **kw)
+ # only persist the committed branch
+ if self._kv_store is not None and branch._txid is None:
+ root_data = dict(
+ latest=branch._latest._hash,
+ tags=dict((k, v._hash) for k, v in self._tags.iteritems())
+ )
+ blob = dumps(root_data)
+ self._kv_store['root'] = blob
+
+ def persist_tags(self):
+ if self._kv_store is not None:
+ root_data = loads(self.kv_store['root'])
+ root_data = dict(
+ latest=root_data['latest'],
+ tags=dict((k, v._hash) for k, v in self._tags.iteritems())
+ )
+ blob = dumps(root_data)
+ self._kv_store['root'] = blob
+
+ def load_from_persistence(self, root_msg_cls):
+ self._loading = True
+ blob = self._kv_store['root']
+ root_data = loads(blob)
+
+ for tag, hash in root_data['tags'].iteritems():
+ self.load_latest(hash)
+ self._tags[tag] = self.latest
+
+ self.load_latest(root_data['latest'])
+
+ self._loading = False
+
diff --git a/python/core/config/config_txn.py b/python/core/config/config_txn.py
new file mode 100644
index 0000000..87dfc59
--- /dev/null
+++ b/python/core/config/config_txn.py
@@ -0,0 +1,73 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class ClosedTransactionError(Exception):
+ pass
+
+
+class ConfigTransaction(object):
+
+ __slots__ = (
+ '_proxy',
+ '_txid'
+ )
+
+ def __init__(self, proxy, txid):
+ self._proxy = proxy
+ self._txid = txid
+
+ def __del__(self):
+ if self._txid:
+ try:
+ self.cancel()
+ except:
+ raise
+
+ # ~~~~~~~~~~~~~~~~~~~~ CRUD ops within the transaction ~~~~~~~~~~~~~~~~~~~~
+
+ def get(self, path='/', depth=None, deep=None):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.get(path, depth=depth, deep=deep, txid=self._txid)
+
+ def update(self, path, data, strict=False):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.update(path, data, strict, self._txid)
+
+ def add(self, path, data):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.add(path, data, self._txid)
+
+ def remove(self, path):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.remove(path, self._txid)
+
+ # ~~~~~~~~~~~~~~~~~~~~ transaction finalization ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def cancel(self):
+ """Explicitly cancel the transaction"""
+ self._proxy.cancel_transaction(self._txid)
+ self._txid = None
+
+ def commit(self):
+ """Commit all transaction changes"""
+ try:
+ self._proxy.commit_transaction(self._txid)
+ finally:
+ self._txid = None
diff --git a/python/core/config/merge_3way.py b/python/core/config/merge_3way.py
new file mode 100644
index 0000000..5444a6c
--- /dev/null
+++ b/python/core/config/merge_3way.py
@@ -0,0 +1,267 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+3-way merge function for config rev objects.
+"""
+from collections import OrderedDict
+from copy import copy
+
+from voltha.core.config.config_proxy import CallbackType, OperationContext
+from voltha.core.config.config_rev import children_fields
+
+
+class MergeConflictException(Exception):
+ pass
+
+
+def merge_3way(fork_rev, src_rev, dst_rev, merge_child_func, dry_run=False):
+ """
+ Attempt to merge src_rev into dst_rev but taking into account what have
+ changed in both revs since the last known common point, the fork_rev.
+ In case of conflict, raise a MergeConflictException(). If dry run is True,
+ don't actually perform the merge, but detect potential conflicts.
+
+ This function recurses into all children nodes stored under the rev and
+ performs the merge if the children is also part of a transaction branch.
+
+ :param fork_rev: Point of forking (last known common state between branches
+ :param src_rev: Latest rev from which we merge to dst_rev
+ :param dst_rev: Target (destination) rev
+ :param merge_child_fun: To run a potential merge in all children that
+ may need merge (determined from the local changes)
+ :param dry_run: If True, do not perform the merge, but detect merge
+ conflicts.
+ :return: The new dst_rev (a new rev instance) the list of changes that
+ occurred in this node or any of its children as part of this merge.
+ """
+
+ # to collect change tuples of (<callback-type>, <op-context>)
+ changes = []
+
+ class AnalyzeChanges(object):
+ def __init__(self, lst1, lst2, keyname):
+ self.keymap1 = OrderedDict((getattr(rev._config._data, keyname), i)
+ for i, rev in enumerate(lst1))
+ self.keymap2 = OrderedDict((getattr(rev._config._data, keyname), i)
+ for i, rev in enumerate(lst2))
+ self.added_keys = [
+ k for k in self.keymap2.iterkeys() if k not in self.keymap1]
+ self.removed_keys = [
+ k for k in self.keymap1.iterkeys() if k not in self.keymap2]
+ self.changed_keys = [
+ k for k in self.keymap1.iterkeys()
+ if k in self.keymap2 and
+ lst1[self.keymap1[k]]._hash != lst2[self.keymap2[k]]._hash
+ ]
+
+ # Note: there are a couple of special cases that can be optimized
+ # for larer on. But since premature optimization is a bad idea, we
+ # defer them.
+
+ # deal with config data first
+ if dst_rev._config is fork_rev._config:
+ # no change in master, accept src if different
+ config_changed = dst_rev._config != src_rev._config
+ else:
+ if dst_rev._config.hash != src_rev._config.hash:
+ raise MergeConflictException('Config collision')
+ config_changed = True
+
+ # now to the external children fields
+ new_children = dst_rev._children.copy()
+ _children_fields = children_fields(fork_rev.data.__class__)
+
+ for field_name, field in _children_fields.iteritems():
+
+ fork_list = fork_rev._children[field_name]
+ src_list = src_rev._children[field_name]
+ dst_list = dst_rev._children[field_name]
+
+ if dst_list == src_list:
+ # we do not need to change the dst, however we still need
+ # to complete the branch purging in child nodes so not
+ # to leave dangling branches around
+ [merge_child_func(rev) for rev in src_list]
+ continue
+
+ if not field.key:
+ # If the list is not keyed, we really should not merge. We merely
+ # check for collision, i.e., if both changed (and not same)
+ if dst_list == fork_list:
+ # dst branch did not change since fork
+
+ assert src_list != fork_list, 'We should not be here otherwise'
+
+ # the incoming (src) rev changed, and we have to apply it
+ new_children[field_name] = [
+ merge_child_func(rev) for rev in src_list]
+
+ if field.is_container:
+ changes.append((CallbackType.POST_LISTCHANGE,
+ OperationContext(field_name=field_name)))
+
+ else:
+ if src_list != fork_list:
+ raise MergeConflictException(
+ 'Cannot merge because single child node or un-keyed'
+ 'children list has changed')
+
+ else:
+
+ if dst_list == fork_list:
+ # Destination did not change
+
+ # We need to analyze only the changes on the incoming rev
+ # since fork
+ src = AnalyzeChanges(fork_list, src_list, field.key)
+
+ new_list = copy(src_list) # we start from the source list
+
+ for key in src.added_keys:
+ idx = src.keymap2[key]
+ new_rev = merge_child_func(new_list[idx])
+ new_list[idx] = new_rev
+ changes.append(
+ (CallbackType.POST_ADD,
+ new_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=new_rev.data)))
+
+ for key in src.removed_keys:
+ old_rev = fork_list[src.keymap1[key]]
+ changes.append((
+ CallbackType.POST_REMOVE,
+ old_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=old_rev.data)))
+
+ for key in src.changed_keys:
+ idx = src.keymap2[key]
+ new_rev = merge_child_func(new_list[idx])
+ new_list[idx] = new_rev
+ # updated child gets its own change event
+
+ new_children[field_name] = new_list
+
+ else:
+
+ # For keyed fields we can really investigate what has been
+ # added, removed, or changed in both branches and do a
+ # fine-grained collision detection and merge
+
+ src = AnalyzeChanges(fork_list, src_list, field.key)
+ dst = AnalyzeChanges(fork_list, dst_list, field.key)
+
+ new_list = copy(dst_list) # this time we start with the dst
+
+ for key in src.added_keys:
+ # we cannot add if it has been added and is different
+ if key in dst.added_keys:
+ # it has been added to both, we need to check if
+ # they are the same
+ child_dst_rev = dst_list[dst.keymap2[key]]
+ child_src_rev = src_list[src.keymap2[key]]
+ if child_dst_rev.hash == child_src_rev.hash:
+ # they match, so we do not need to change the
+ # dst list, but we still need to purge the src
+ # branch
+ merge_child_func(child_dst_rev)
+ else:
+ raise MergeConflictException(
+ 'Cannot add because it has been added and '
+ 'different'
+ )
+ else:
+ # this is a brand new key, need to add it
+ new_rev = merge_child_func(src_list[src.keymap2[key]])
+ new_list.append(new_rev)
+ changes.append((
+ CallbackType.POST_ADD,
+ new_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=new_rev.data)))
+
+ for key in src.changed_keys:
+ # we cannot change if it was removed in dst
+ if key in dst.removed_keys:
+ raise MergeConflictException(
+ 'Cannot change because it has been removed')
+
+ # if it changed in dst as well, we need to check if they
+ # match (same change
+ elif key in dst.changed_keys:
+ child_dst_rev = dst_list[dst.keymap2[key]]
+ child_src_rev = src_list[src.keymap2[key]]
+ if child_dst_rev.hash == child_src_rev.hash:
+ # they match, so we do not need to change the
+ # dst list, but we still need to purge the src
+ # branch
+ merge_child_func(child_src_rev)
+ elif child_dst_rev._config.hash != child_src_rev._config.hash:
+ raise MergeConflictException(
+ 'Cannot update because it has been changed and '
+ 'different'
+ )
+ else:
+ new_rev = merge_child_func(
+ src_list[src.keymap2[key]])
+ new_list[dst.keymap2[key]] = new_rev
+ # no announcement for child update
+
+ else:
+ # it only changed in src branch
+ new_rev = merge_child_func(src_list[src.keymap2[key]])
+ new_list[dst.keymap2[key]] = new_rev
+ # no announcement for child update
+
+ for key in reversed(src.removed_keys): # we go from highest
+ # index to lowest
+
+ # we cannot remove if it has changed in dst
+ if key in dst.changed_keys:
+ raise MergeConflictException(
+ 'Cannot remove because it has changed')
+
+ # if it has not been removed yet from dst, then remove it
+ if key not in dst.removed_keys:
+ dst_idx = dst.keymap2[key]
+ old_rev = new_list.pop(dst_idx)
+ changes.append((
+ CallbackType.POST_REMOVE,
+ old_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=old_rev.data)))
+
+ new_children[field_name] = new_list
+
+ if not dry_run:
+ rev = src_rev if config_changed else dst_rev
+ rev = rev.update_all_children(new_children, dst_rev._branch)
+ if config_changed:
+ changes.append((CallbackType.POST_UPDATE, rev.data))
+ return rev, changes
+
+ else:
+ return None, None
diff --git a/python/core/device_graph.py b/python/core/device_graph.py
new file mode 100644
index 0000000..a4e6d85
--- /dev/null
+++ b/python/core/device_graph.py
@@ -0,0 +1,136 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import networkx as nx
+
+from voltha.core.flow_decomposer import RouteHop
+
+
+class DeviceGraph(object):
+
+ """
+ Mixin class to compute routes in the device graph within
+ a logical device.
+ """
+
+ def compute_routes(self, root_proxy, logical_ports):
+ boundary_ports, graph = self._build_graph(root_proxy, logical_ports)
+ routes = self._build_routes(boundary_ports, graph, logical_ports)
+ return graph, routes
+
+ def _build_graph(self, root_proxy, logical_ports):
+
+ graph = nx.Graph()
+
+ # walk logical device's device and port links to discover full graph
+ devices_added = set() # set of device.id's
+ ports_added = set() # set of (device.id, port_no) tuples
+ peer_links = set()
+
+ boundary_ports = dict(
+ ((lp.device_id, lp.device_port_no), lp.ofp_port.port_no)
+ for lp in logical_ports
+ )
+
+ def add_device(device):
+ if device.id in devices_added:
+ return
+
+ graph.add_node(device.id, device=device)
+ devices_added.add(device.id)
+
+ ports = root_proxy.get('/devices/{}/ports'.format(device.id))
+ for port in ports:
+ port_id = (device.id, port.port_no)
+ if port_id not in ports_added:
+ boundary = port_id in boundary_ports
+ graph.add_node(port_id, port=port, boundary=boundary)
+ graph.add_edge(device.id, port_id)
+ for peer in port.peers:
+ if peer.device_id not in devices_added:
+ peer_device = root_proxy.get(
+ '/devices/{}'.format(peer.device_id))
+ add_device(peer_device)
+ else:
+ peer_port_id = (peer.device_id, peer.port_no)
+ if port_id < peer_port_id:
+ peer_link = (port_id, peer_port_id)
+ else:
+ peer_link = (peer_port_id, port_id)
+ if peer_link not in peer_links:
+ graph.add_edge(*peer_link)
+ peer_links.add(peer_link)
+
+ for logical_port in logical_ports:
+ device_id = logical_port.device_id
+ device = root_proxy.get('/devices/{}'.format(device_id))
+ add_device(device)
+
+ return boundary_ports, graph
+
+ def _build_routes(self, boundary_ports, graph, logical_ports):
+
+ root_ports = dict((lp.ofp_port.port_no, lp.root_port)
+ for lp in logical_ports if lp.root_port == True)
+
+ routes = {}
+
+ for source, source_port_no in boundary_ports.iteritems():
+ for target, target_port_no in boundary_ports.iteritems():
+
+ if source is target:
+ continue
+
+ # Ignore NNI - NNI routes
+ if source_port_no in root_ports \
+ and target_port_no in root_ports:
+ continue
+
+ # Ignore UNI - UNI routes
+ if source_port_no not in root_ports \
+ and target_port_no not in root_ports:
+ continue
+
+ path = nx.shortest_path(graph, source, target)
+
+ # number of nodes in valid paths is always multiple of 3
+ if len(path) % 3:
+ continue
+
+ # in fact, we currently deal with single fan-out networks,
+ # so the number of hops is always 6
+ assert len(path) == 6
+
+ ingress_input_port, ingress_device, ingress_output_port, \
+ egress_input_port, egress_device, egress_output_port = path
+
+ ingress_hop = RouteHop(
+ device=graph.node[ingress_device]['device'],
+ ingress_port=graph.node[ingress_input_port]['port'],
+ egress_port=graph.node[ingress_output_port]['port']
+ )
+ egress_hop = RouteHop(
+ device=graph.node[egress_device]['device'],
+ ingress_port=graph.node[egress_input_port]['port'],
+ egress_port=graph.node[egress_output_port]['port']
+ )
+
+ routes[(source_port_no, target_port_no)] = [
+ ingress_hop, egress_hop
+ ]
+
+ return routes
+
diff --git a/python/core/flow_decomposer.py b/python/core/flow_decomposer.py
new file mode 100644
index 0000000..faf3141
--- /dev/null
+++ b/python/core/flow_decomposer.py
@@ -0,0 +1,1010 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A mix-in class implementing flow decomposition
+"""
+from collections import OrderedDict
+from copy import copy, deepcopy
+from hashlib import md5
+
+import structlog
+
+from voltha.protos import third_party
+from voltha.protos import openflow_13_pb2 as ofp
+from common.tech_profile import tech_profile
+_ = third_party
+log = structlog.get_logger()
+
+
+# aliases
+ofb_field = ofp.ofp_oxm_ofb_field
+action = ofp.ofp_action
+
+# OFPAT_* shortcuts
+OUTPUT = ofp.OFPAT_OUTPUT
+COPY_TTL_OUT = ofp.OFPAT_COPY_TTL_OUT
+COPY_TTL_IN = ofp.OFPAT_COPY_TTL_IN
+SET_MPLS_TTL = ofp.OFPAT_SET_MPLS_TTL
+DEC_MPLS_TTL = ofp.OFPAT_DEC_MPLS_TTL
+PUSH_VLAN = ofp.OFPAT_PUSH_VLAN
+POP_VLAN = ofp.OFPAT_POP_VLAN
+PUSH_MPLS = ofp.OFPAT_PUSH_MPLS
+POP_MPLS = ofp.OFPAT_POP_MPLS
+SET_QUEUE = ofp.OFPAT_SET_QUEUE
+GROUP = ofp.OFPAT_GROUP
+SET_NW_TTL = ofp.OFPAT_SET_NW_TTL
+NW_TTL = ofp.OFPAT_DEC_NW_TTL
+SET_FIELD = ofp.OFPAT_SET_FIELD
+PUSH_PBB = ofp.OFPAT_PUSH_PBB
+POP_PBB = ofp.OFPAT_POP_PBB
+EXPERIMENTER = ofp.OFPAT_EXPERIMENTER
+
+# OFPXMT_OFB_* shortcuts (incomplete)
+IN_PORT = ofp.OFPXMT_OFB_IN_PORT
+IN_PHY_PORT = ofp.OFPXMT_OFB_IN_PHY_PORT
+METADATA = ofp.OFPXMT_OFB_METADATA
+ETH_DST = ofp.OFPXMT_OFB_ETH_DST
+ETH_SRC = ofp.OFPXMT_OFB_ETH_SRC
+ETH_TYPE = ofp.OFPXMT_OFB_ETH_TYPE
+VLAN_VID = ofp.OFPXMT_OFB_VLAN_VID
+VLAN_PCP = ofp.OFPXMT_OFB_VLAN_PCP
+IP_DSCP = ofp.OFPXMT_OFB_IP_DSCP
+IP_ECN = ofp.OFPXMT_OFB_IP_ECN
+IP_PROTO = ofp.OFPXMT_OFB_IP_PROTO
+IPV4_SRC = ofp.OFPXMT_OFB_IPV4_SRC
+IPV4_DST = ofp.OFPXMT_OFB_IPV4_DST
+TCP_SRC = ofp.OFPXMT_OFB_TCP_SRC
+TCP_DST = ofp.OFPXMT_OFB_TCP_DST
+UDP_SRC = ofp.OFPXMT_OFB_UDP_SRC
+UDP_DST = ofp.OFPXMT_OFB_UDP_DST
+SCTP_SRC = ofp.OFPXMT_OFB_SCTP_SRC
+SCTP_DST = ofp.OFPXMT_OFB_SCTP_DST
+ICMPV4_TYPE = ofp.OFPXMT_OFB_ICMPV4_TYPE
+ICMPV4_CODE = ofp.OFPXMT_OFB_ICMPV4_CODE
+ARP_OP = ofp.OFPXMT_OFB_ARP_OP
+ARP_SPA = ofp.OFPXMT_OFB_ARP_SPA
+ARP_TPA = ofp.OFPXMT_OFB_ARP_TPA
+ARP_SHA = ofp.OFPXMT_OFB_ARP_SHA
+ARP_THA = ofp.OFPXMT_OFB_ARP_THA
+IPV6_SRC = ofp.OFPXMT_OFB_IPV6_SRC
+IPV6_DST = ofp.OFPXMT_OFB_IPV6_DST
+IPV6_FLABEL = ofp.OFPXMT_OFB_IPV6_FLABEL
+ICMPV6_TYPE = ofp.OFPXMT_OFB_ICMPV6_TYPE
+ICMPV6_CODE = ofp.OFPXMT_OFB_ICMPV6_CODE
+IPV6_ND_TARGET = ofp.OFPXMT_OFB_IPV6_ND_TARGET
+OFB_IPV6_ND_SLL = ofp.OFPXMT_OFB_IPV6_ND_SLL
+IPV6_ND_TLL = ofp.OFPXMT_OFB_IPV6_ND_TLL
+MPLS_LABEL = ofp.OFPXMT_OFB_MPLS_LABEL
+MPLS_TC = ofp.OFPXMT_OFB_MPLS_TC
+MPLS_BOS = ofp.OFPXMT_OFB_MPLS_BOS
+PBB_ISID = ofp.OFPXMT_OFB_PBB_ISID
+TUNNEL_ID = ofp.OFPXMT_OFB_TUNNEL_ID
+IPV6_EXTHDR = ofp.OFPXMT_OFB_IPV6_EXTHDR
+
+# ofp_action_* shortcuts
+
+def output(port, max_len=ofp.OFPCML_MAX):
+ return action(
+ type=OUTPUT,
+ output=ofp.ofp_action_output(port=port, max_len=max_len)
+ )
+
+def mpls_ttl(ttl):
+ return action(
+ type=SET_MPLS_TTL,
+ mpls_ttl=ofp.ofp_action_mpls_ttl(mpls_ttl=ttl)
+ )
+
+def push_vlan(eth_type):
+ return action(
+ type=PUSH_VLAN,
+ push=ofp.ofp_action_push(ethertype=eth_type)
+ )
+
+def pop_vlan():
+ return action(
+ type=POP_VLAN
+ )
+
+def pop_mpls(eth_type):
+ return action(
+ type=POP_MPLS,
+ pop_mpls=ofp.ofp_action_pop_mpls(ethertype=eth_type)
+ )
+
+def group(group_id):
+ return action(
+ type=GROUP,
+ group=ofp.ofp_action_group(group_id=group_id)
+ )
+
+def nw_ttl(nw_ttl):
+ return action(
+ type=NW_TTL,
+ nw_ttl=ofp.ofp_action_nw_ttl(nw_ttl=nw_ttl)
+ )
+
+def set_field(field):
+ return action(
+ type=SET_FIELD,
+ set_field=ofp.ofp_action_set_field(
+ field=ofp.ofp_oxm_field(
+ oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+ ofb_field=field))
+ )
+
+def experimenter(experimenter, data):
+ return action(
+ type=EXPERIMENTER,
+ experimenter=ofp.ofp_action_experimenter(
+ experimenter=experimenter, data=data)
+ )
+
+
+# ofb_field generators (incomplete set)
+
+def in_port(_in_port):
+ return ofb_field(type=IN_PORT, port=_in_port)
+
+def in_phy_port(_in_phy_port):
+ return ofb_field(type=IN_PHY_PORT, port=_in_phy_port)
+
+def metadata(_table_metadata):
+ return ofb_field(type=METADATA, table_metadata=_table_metadata)
+
+def eth_dst(_eth_dst):
+ return ofb_field(type=ETH_DST, table_metadata=_eth_dst)
+
+def eth_src(_eth_src):
+ return ofb_field(type=ETH_SRC, table_metadata=_eth_src)
+
+def eth_type(_eth_type):
+ return ofb_field(type=ETH_TYPE, eth_type=_eth_type)
+
+def vlan_vid(_vlan_vid):
+ return ofb_field(type=VLAN_VID, vlan_vid=_vlan_vid)
+
+def vlan_pcp(_vlan_pcp):
+ return ofb_field(type=VLAN_PCP, vlan_pcp=_vlan_pcp)
+
+def ip_dscp(_ip_dscp):
+ return ofb_field(type=IP_DSCP, ip_dscp=_ip_dscp)
+
+def ip_ecn(_ip_ecn):
+ return ofb_field(type=IP_ECN, ip_ecn=_ip_ecn)
+
+def ip_proto(_ip_proto):
+ return ofb_field(type=IP_PROTO, ip_proto=_ip_proto)
+
+def ipv4_src(_ipv4_src):
+ return ofb_field(type=IPV4_SRC, ipv4_src=_ipv4_src)
+
+def ipv4_dst(_ipv4_dst):
+ return ofb_field(type=IPV4_DST, ipv4_dst=_ipv4_dst)
+
+def tcp_src(_tcp_src):
+ return ofb_field(type=TCP_SRC, tcp_src=_tcp_src)
+
+def tcp_dst(_tcp_dst):
+ return ofb_field(type=TCP_DST, tcp_dst=_tcp_dst)
+
+def udp_src(_udp_src):
+ return ofb_field(type=UDP_SRC, udp_src=_udp_src)
+
+def udp_dst(_udp_dst):
+ return ofb_field(type=UDP_DST, udp_dst=_udp_dst)
+
+def sctp_src(_sctp_src):
+ return ofb_field(type=SCTP_SRC, sctp_src=_sctp_src)
+
+def sctp_dst(_sctp_dst):
+ return ofb_field(type=SCTP_DST, sctp_dst=_sctp_dst)
+
+def icmpv4_type(_icmpv4_type):
+ return ofb_field(type=ICMPV4_TYPE, icmpv4_type=_icmpv4_type)
+
+def icmpv4_code(_icmpv4_code):
+ return ofb_field(type=ICMPV4_CODE, icmpv4_code=_icmpv4_code)
+
+def arp_op(_arp_op):
+ return ofb_field(type=ARP_OP, arp_op=_arp_op)
+
+def arp_spa(_arp_spa):
+ return ofb_field(type=ARP_SPA, arp_spa=_arp_spa)
+
+def arp_tpa(_arp_tpa):
+ return ofb_field(type=ARP_TPA, arp_tpa=_arp_tpa)
+
+def arp_sha(_arp_sha):
+ return ofb_field(type=ARP_SHA, arp_sha=_arp_sha)
+
+def arp_tha(_arp_tha):
+ return ofb_field(type=ARP_THA, arp_tha=_arp_tha)
+
+def ipv6_src(_ipv6_src):
+ return ofb_field(type=IPV6_SRC, arp_tha=_ipv6_src)
+
+def ipv6_dst(_ipv6_dst):
+ return ofb_field(type=IPV6_DST, arp_tha=_ipv6_dst)
+
+def ipv6_flabel(_ipv6_flabel):
+ return ofb_field(type=IPV6_FLABEL, arp_tha=_ipv6_flabel)
+
+def ipmpv6_type(_icmpv6_type):
+ return ofb_field(type=ICMPV6_TYPE, arp_tha=_icmpv6_type)
+
+def icmpv6_code(_icmpv6_code):
+ return ofb_field(type=ICMPV6_CODE, arp_tha=_icmpv6_code)
+
+def ipv6_nd_target(_ipv6_nd_target):
+ return ofb_field(type=IPV6_ND_TARGET, arp_tha=_ipv6_nd_target)
+
+def ofb_ipv6_nd_sll(_ofb_ipv6_nd_sll):
+ return ofb_field(type=OFB_IPV6_ND_SLL, arp_tha=_ofb_ipv6_nd_sll)
+
+def ipv6_nd_tll(_ipv6_nd_tll):
+ return ofb_field(type=IPV6_ND_TLL, arp_tha=_ipv6_nd_tll)
+
+def mpls_label(_mpls_label):
+ return ofb_field(type=MPLS_LABEL, arp_tha=_mpls_label)
+
+def mpls_tc(_mpls_tc):
+ return ofb_field(type=MPLS_TC, arp_tha=_mpls_tc)
+
+def mpls_bos(_mpls_bos):
+ return ofb_field(type=MPLS_BOS, arp_tha=_mpls_bos)
+
+def pbb_isid(_pbb_isid):
+ return ofb_field(type=PBB_ISID, arp_tha=_pbb_isid)
+
+def tunnel_id(_tunnel_id):
+ return ofb_field(type=TUNNEL_ID, arp_tha=_tunnel_id)
+
+def ipv6_exthdr(_ipv6_exthdr):
+ return ofb_field(type=IPV6_EXTHDR, arp_tha=_ipv6_exthdr)
+
+
+# frequently used extractors:
+
+def get_actions(flow):
+ """Extract list of ofp_action objects from flow spec object"""
+ assert isinstance(flow, ofp.ofp_flow_stats)
+ # we have the following hard assumptions for now
+ actions = []
+ for instruction in flow.instructions:
+ if instruction.type == ofp.OFPIT_APPLY_ACTIONS or instruction.type == ofp.OFPIT_WRITE_ACTIONS:
+ actions.extend(instruction.actions.actions)
+ return actions
+
+
+def get_ofb_fields(flow):
+ assert isinstance(flow, ofp.ofp_flow_stats)
+ assert flow.match.type == ofp.OFPMT_OXM
+ ofb_fields = []
+ for field in flow.match.oxm_fields:
+ assert field.oxm_class == ofp.OFPXMC_OPENFLOW_BASIC
+ ofb_fields.append(field.ofb_field)
+ return ofb_fields
+
+def get_out_port(flow):
+ for action in get_actions(flow):
+ if action.type == OUTPUT:
+ return action.output.port
+ return None
+
+def get_in_port(flow):
+ for field in get_ofb_fields(flow):
+ if field.type == IN_PORT:
+ return field.port
+ return None
+
+def get_goto_table_id(flow):
+ for instruction in flow.instructions:
+ if instruction.type == ofp.OFPIT_GOTO_TABLE:
+ return instruction.goto_table.table_id
+ return None
+
+def get_metadata(flow):
+ ''' legacy get method (only want lower 32 bits '''
+ for field in get_ofb_fields(flow):
+ if field.type == METADATA:
+ return field.table_metadata & 0xffffffff
+ return None
+
+def get_metadata_64_bit(flow):
+ for field in get_ofb_fields(flow):
+ if field.type == METADATA:
+ return field.table_metadata
+ return None
+
+
+def get_port_number_from_metadata(flow):
+ """
+ The port number (UNI on ONU) is in the lower 32-bits of metadata and
+ the inner_tag is in the upper 32-bits
+
+ This is set in the ONOS OltPipeline as a metadata field
+ """
+ md = get_metadata_64_bit(flow)
+
+ if md is None:
+ return None
+
+ if md <= 0xffffffff:
+ log.warn('onos-upgrade-suggested',
+ netadata=md,
+ message='Legacy MetaData detected form OltPipeline')
+ return md
+
+ return md & 0xffffffff
+
+
+def get_inner_tag_from_metadata(flow):
+ """
+ The port number (UNI on ONU) is in the lower 32-bits of metadata and
+ the inner_tag is in the upper 32-bits
+
+ This is set in the ONOS OltPipeline as a metadata field
+ """
+ md = get_metadata_64_bit(flow)
+
+ if md is None:
+ return None
+
+ if md <= 0xffffffff:
+ log.warn('onos-upgrade-suggested',
+ netadata=md,
+ message='Legacy MetaData detected form OltPipeline')
+ return md
+
+ return (md >> 32) & 0xffffffff
+
+
+# test and extract next table and group information
+def has_next_table(flow):
+ return get_goto_table_id(flow) is not None
+
+def get_group(flow):
+ for action in get_actions(flow):
+ if action.type == GROUP:
+ return action.group.group_id
+ return None
+
+def get_meter_ids_from_flow(flow):
+ meter_ids = list()
+ for instruction in flow.instructions:
+ if instruction.type == ofp.OFPIT_METER:
+ meter_ids.append(instruction.meter.meter_id)
+ return meter_ids
+
+def has_group(flow):
+ return get_group(flow) is not None
+
+def mk_oxm_fields(match_fields):
+ oxm_fields = [
+ ofp.ofp_oxm_field(
+ oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+ ofb_field=field
+ ) for field in match_fields
+ ]
+
+ return oxm_fields
+
+def mk_instructions_from_actions(actions):
+ instructions_action = ofp.ofp_instruction_actions()
+ instructions_action.actions.extend(actions)
+ instruction = ofp.ofp_instruction(type=ofp.OFPIT_APPLY_ACTIONS,
+ actions=instructions_action)
+ return [instruction]
+
+def mk_simple_flow_mod(match_fields, actions, command=ofp.OFPFC_ADD,
+ next_table_id=None, meters=None, **kw):
+ """
+ Convenience function to generare ofp_flow_mod message with OXM BASIC match
+ composed from the match_fields, and single APPLY_ACTIONS instruction with
+ a list if ofp_action objects.
+ :param match_fields: list(ofp_oxm_ofb_field)
+ :param actions: list(ofp_action)
+ :param command: one of OFPFC_*
+ :param kw: additional keyword-based params to ofp_flow_mod
+ :return: initialized ofp_flow_mod object
+ """
+ instructions = [
+ ofp.ofp_instruction(
+ type=ofp.OFPIT_APPLY_ACTIONS,
+ actions=ofp.ofp_instruction_actions(actions=actions)
+ )
+ ]
+
+ if meters is not None:
+ for meter_id in meters:
+ instructions.append(ofp.ofp_instruction(
+ type=ofp.OFPIT_METER,
+ meter=ofp.ofp_instruction_meter(meter_id=meter_id)
+ ))
+
+ if next_table_id is not None:
+ instructions.append(ofp.ofp_instruction(
+ type=ofp.OFPIT_GOTO_TABLE,
+ goto_table=ofp.ofp_instruction_goto_table(table_id=next_table_id)
+ ))
+
+ return ofp.ofp_flow_mod(
+ command=command,
+ match=ofp.ofp_match(
+ type=ofp.OFPMT_OXM,
+ oxm_fields=[
+ ofp.ofp_oxm_field(
+ oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+ ofb_field=field
+ ) for field in match_fields
+ ]
+ ),
+ instructions=instructions,
+ **kw
+ )
+
+
+def mk_multicast_group_mod(group_id, buckets, command=ofp.OFPGC_ADD):
+ group = ofp.ofp_group_mod(
+ command=command,
+ type=ofp.OFPGT_ALL,
+ group_id=group_id,
+ buckets=buckets
+ )
+ return group
+
+
+def hash_flow_stats(flow):
+ """
+ Return unique 64-bit integer hash for flow covering the following
+ attributes: 'table_id', 'priority', 'flags', 'cookie', 'match', '_instruction_string'
+ """
+ _instruction_string = ""
+ for _instruction in flow.instructions:
+ _instruction_string += _instruction.SerializeToString()
+
+ hex = md5('{},{},{},{},{},{}'.format(
+ flow.table_id,
+ flow.priority,
+ flow.flags,
+ flow.cookie,
+ flow.match.SerializeToString(),
+ _instruction_string
+ )).hexdigest()
+ return int(hex[:16], 16)
+
+
+def flow_stats_entry_from_flow_mod_message(mod):
+ flow = ofp.ofp_flow_stats(
+ table_id=mod.table_id,
+ priority=mod.priority,
+ idle_timeout=mod.idle_timeout,
+ hard_timeout=mod.hard_timeout,
+ flags=mod.flags,
+ cookie=mod.cookie,
+ match=mod.match,
+ instructions=mod.instructions
+ )
+ flow.id = hash_flow_stats(flow)
+ return flow
+
+
+def group_entry_from_group_mod(mod):
+ group = ofp.ofp_group_entry(
+ desc=ofp.ofp_group_desc(
+ type=mod.type,
+ group_id=mod.group_id,
+ buckets=mod.buckets
+ ),
+ stats=ofp.ofp_group_stats(
+ group_id=mod.group_id
+ # TODO do we need to instantiate bucket bins?
+ )
+ )
+ return group
+
+
+def mk_flow_stat(**kw):
+ return flow_stats_entry_from_flow_mod_message(mk_simple_flow_mod(**kw))
+
+
+def mk_group_stat(**kw):
+ return group_entry_from_group_mod(mk_multicast_group_mod(**kw))
+
+class RouteHop(object):
+ __slots__ = ('_device', '_ingress_port', '_egress_port')
+ def __init__(self, device, ingress_port, egress_port):
+ self._device = device
+ self._ingress_port = ingress_port
+ self._egress_port = egress_port
+ @property
+ def device(self): return self._device
+ @property
+ def ingress_port(self): return self._ingress_port
+ @property
+ def egress_port(self): return self._egress_port
+ def __eq__(self, other):
+ return (
+ self._device == other._device and
+ self._ingress_port == other._ingress_port and
+ self._egress_port == other._egress_port)
+ def __ne__(self, other):
+ return not self.__eq__(other)
+ def __str__(self):
+ return 'RouteHop device_id {}, ingress_port {}, egress_port {}'.format(
+ self._device.id, self._ingress_port, self._egress_port)
+
+class FlowDecomposer(object):
+
+ def __init__(self, *args, **kw):
+ self.logical_device_id = 'this shall be overwritten in derived class'
+ super(FlowDecomposer, self).__init__(*args, **kw)
+
+ # ~~~~~~~~~~~~~~~~~~~~ methods exposed *to* derived class ~~~~~~~~~~~~~~~~~
+
+ def decompose_rules(self, flows, groups):
+ """
+ Generate per-device flows and flow-groups from the flows and groups
+ defined on a logical device
+ :param flows: logical device flows
+ :param groups: logical device flow groups
+ :return: dict(device_id ->
+ (OrderedDict-of-device-flows, OrderedDict-of-device-flow-groups))
+ """
+
+ device_rules = deepcopy(self.get_all_default_rules())
+ group_map = dict((g.desc.group_id, g) for g in groups)
+
+ for flow in flows:
+ for device_id, (_flows, _groups) \
+ in self.decompose_flow(flow, group_map).iteritems():
+ fl_lst, gr_lst = device_rules.setdefault(
+ device_id, (OrderedDict(), OrderedDict()))
+ for _flow in _flows:
+ if _flow.id not in fl_lst:
+ fl_lst[_flow.id] = _flow
+ for _group in _groups:
+ if _group.group_id not in gr_lst:
+ gr_lst[_group.group_id] = _group
+ return device_rules
+
+ def decompose_flow(self, flow, group_map):
+ assert isinstance(flow, ofp.ofp_flow_stats)
+
+ ####################################################################
+ #
+ # limited, heuristics based implementation
+ # needs to be replaced, see https://jira.opencord.org/browse/CORD-841
+ #
+ ####################################################################
+
+ in_port_no = get_in_port(flow)
+ out_port_no = get_out_port(flow) # may be None
+
+ device_rules = {} # accumulator
+
+ route = self.get_route(in_port_no, out_port_no)
+ if route is None:
+ log.error('no-route', in_port_no=in_port_no,
+ out_port_no=out_port_no, comment='deleting flow')
+ self.flow_delete(flow)
+ return device_rules
+
+ assert len(route) == 2
+ ingress_hop, egress_hop = route
+
+ def is_downstream():
+ return ingress_hop.device.root
+
+ def is_upstream():
+ return not is_downstream()
+
+ def update_devices_rules(flow, curr_device_rules, meter_ids=None, table_id=None):
+ actions = [action.type for action in get_actions(flow)]
+ if len(actions) == 1 and OUTPUT in actions:
+ # Transparent ONU and OLT case (No-L2-Modification flow)
+ child_device_flow_lst, _ = curr_device_rules.setdefault(
+ ingress_hop.device.id, ([], []))
+ parent_device_flow_lst, _ = curr_device_rules.setdefault(
+ egress_hop.device.id, ([], []))
+
+ child_device_flow_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(ingress_hop.ingress_port.port_no)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ output(ingress_hop.egress_port.port_no)
+ ]
+ ))
+
+ parent_device_flow_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(egress_hop.ingress_port.port_no),
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ output(egress_hop.egress_port.port_no)
+ ],
+ table_id=table_id,
+ meters=meter_ids
+ ))
+
+ else:
+ fl_lst, _ = curr_device_rules.setdefault(
+ egress_hop.device.id, ([], []))
+ fl_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(egress_hop.ingress_port.port_no)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ action for action in get_actions(flow)
+ if action.type != OUTPUT
+ ] + [
+ output(egress_hop.egress_port.port_no)
+ ],
+ table_id=table_id,
+ meters=meter_ids
+ ))
+
+ if out_port_no is not None and \
+ (out_port_no & 0x7fffffff) == ofp.OFPP_CONTROLLER:
+
+ # UPSTREAM CONTROLLER-BOUND FLOW
+
+ # we assume that the ingress device is already pushing a
+ # customer-specific vlan (c-vid), based on its default flow
+ # rules so there is nothing else to do on the ONU
+
+ # on the olt, we need to push a new tag and set it to 4000
+ # which for now represents in-bound channel to the controller
+ # (via Voltha)
+ # TODO make the 4000 configurable
+ fl_lst, _ = device_rules.setdefault(
+ egress_hop.device.id, ([], []))
+
+ log.info('trap-flow', in_port_no=in_port_no,
+ nni=self._nni_logical_port_no)
+
+ if in_port_no == self._nni_logical_port_no:
+ log.debug('trap-nni')
+ # Trap flow for NNI port
+ fl_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(egress_hop.egress_port.port_no)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ action for action in get_actions(flow)
+ ]
+ ))
+
+ else:
+ log.debug('trap-uni')
+ # Trap flow for UNI port
+
+ # in_port_no is None for wildcard input case, do not include
+ # upstream port for 4000 flow in input
+ if in_port_no is None:
+ in_ports = self.get_wildcard_input_ports(exclude_port=
+ egress_hop.egress_port.port_no)
+ else:
+ in_ports = [in_port_no]
+
+ for input_port in in_ports:
+ fl_lst.append(mk_flow_stat( # Upstream flow
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(egress_hop.ingress_port.port_no),
+ vlan_vid(ofp.OFPVID_PRESENT | input_port)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT, VLAN_VID)
+ ],
+ actions=[
+ push_vlan(0x8100),
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | 4000)),
+ output(egress_hop.egress_port.port_no)]
+ ))
+ fl_lst.append(mk_flow_stat( # Downstream flow
+ priority=flow.priority,
+ match_fields=[
+ in_port(egress_hop.egress_port.port_no),
+ vlan_vid(ofp.OFPVID_PRESENT | 4000),
+ vlan_pcp(0),
+ metadata(input_port)
+ ],
+ actions=[
+ pop_vlan(),
+ output(egress_hop.ingress_port.port_no)]
+ ))
+ else:
+ # NOT A CONTROLLER-BOUND FLOW
+ if is_upstream():
+
+ # We assume that anything that is upstream needs to get Q-in-Q
+ # treatment and that this is expressed via two flow rules,
+ # the first using the goto-statement. We also assume that the
+ # inner tag is applied at the ONU, while the outer tag is
+ # applied at the OLT
+ next_table_id = get_goto_table_id(flow)
+ if next_table_id is not None and next_table_id < tech_profile.DEFAULT_TECH_PROFILE_TABLE_ID:
+ assert out_port_no is None
+ fl_lst, _ = device_rules.setdefault(
+ ingress_hop.device.id, ([], []))
+ fl_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(ingress_hop.ingress_port.port_no)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ action for action in get_actions(flow)
+ ] + [
+ output(ingress_hop.egress_port.port_no)
+ ]
+ ))
+
+ elif next_table_id is not None and next_table_id >= tech_profile.DEFAULT_TECH_PROFILE_TABLE_ID:
+ assert out_port_no is not None
+ meter_ids = get_meter_ids_from_flow(flow)
+ update_devices_rules(flow, device_rules, meter_ids, next_table_id)
+ else:
+ update_devices_rules(flow, device_rules)
+
+ else: # downstream
+ next_table_id = get_goto_table_id(flow)
+ if next_table_id is not None and next_table_id < tech_profile.DEFAULT_TECH_PROFILE_TABLE_ID:
+ assert out_port_no is None
+
+ if get_metadata(flow) is not None:
+ log.debug('creating-metadata-flow', flow=flow)
+ # For downstream flows with dual-tags, recalculate route.
+ port_number = get_port_number_from_metadata(flow)
+
+ if port_number is not None:
+ route = self.get_route(in_port_no, port_number)
+ if route is None:
+ log.error('no-route-double-tag', in_port_no=in_port_no,
+ out_port_no=port_number, comment='deleting flow',
+ metadata=get_metadata_64_bit(flow))
+ self.flow_delete(flow)
+ return device_rules
+ assert len(route) == 2
+ ingress_hop, egress_hop = route
+
+ inner_tag = get_inner_tag_from_metadata(flow)
+
+ if inner_tag is None:
+ log.error('no-inner-tag-double-tag', in_port_no=in_port_no,
+ out_port_no=port_number, comment='deleting flow',
+ metadata=get_metadata_64_bit(flow))
+ self.flow_delete(flow)
+ return device_rules
+
+ fl_lst, _ = device_rules.setdefault(
+ ingress_hop.device.id, ([], []))
+ fl_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(ingress_hop.ingress_port.port_no),
+ metadata(inner_tag)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT, METADATA)
+ ],
+ actions=[
+ action for action in get_actions(flow)
+ ] + [
+ output(ingress_hop.egress_port.port_no)
+ ]
+ ))
+ else:
+ log.debug('creating-standard-flow', flow=flow)
+ fl_lst, _ = device_rules.setdefault(
+ ingress_hop.device.id, ([], []))
+ fl_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(ingress_hop.ingress_port.port_no)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ action for action in get_actions(flow)
+ ] + [
+ output(ingress_hop.egress_port.port_no)
+ ]
+ ))
+
+ elif out_port_no is not None: # unicast case
+
+ actions = [action.type for action in get_actions(flow)]
+ # Transparent ONU and OLT case (No-L2-Modification flow)
+ if len(actions) == 1 and OUTPUT in actions:
+ parent_device_flow_lst, _ = device_rules.setdefault(
+ ingress_hop.device.id, ([], []))
+ child_device_flow_lst, _ = device_rules.setdefault(
+ egress_hop.device.id, ([], []))
+
+ parent_device_flow_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(ingress_hop.ingress_port.port_no)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ output(ingress_hop.egress_port.port_no)
+ ]
+ ))
+
+ child_device_flow_lst.append(mk_flow_stat(
+ priority = flow.priority,
+ cookie=flow.cookie,
+ match_fields = [
+ in_port(egress_hop.ingress_port.port_no),
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT, )
+ ],
+ actions=[
+ output(egress_hop.egress_port.port_no)
+ ]
+ ))
+ else:
+ fl_lst, _ = device_rules.setdefault(
+ egress_hop.device.id, ([], []))
+ fl_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(egress_hop.ingress_port.port_no)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ action for action in get_actions(flow)
+ if action.type not in (OUTPUT,)
+ ] + [
+ output(egress_hop.egress_port.port_no)
+ ],
+ #table_id=flow.table_id,
+ #meters=None if len(get_meter_ids_from_flow(flow)) == 0 else get_meter_ids_from_flow(flow)
+ ))
+ else:
+ grp_id = get_group(flow)
+
+ if grp_id is not None: # multicast case
+ fl_lst_olt, _ = device_rules.setdefault(
+ ingress_hop.device.id, ([], []))
+ # having no group yet is the same as having a group with
+ # no buckets
+ group = group_map.get(grp_id, ofp.ofp_group_entry())
+
+ for bucket in group.desc.buckets:
+ found_pop_vlan = False
+ other_actions = []
+ for action in bucket.actions:
+ if action.type == POP_VLAN:
+ found_pop_vlan = True
+ elif action.type == OUTPUT:
+ out_port_no = action.output.port
+ else:
+ other_actions.append(action)
+ # re-run route request to determine egress device and
+ # ports
+ route2 = self.get_route(in_port_no, out_port_no)
+ if not route2 or len(route2) != 2:
+ log.error('mc-no-route', in_port_no=in_port_no,
+ out_port_no=out_port_no, route2=route2,
+ comment='deleting flow')
+ self.flow_delete(flow)
+ continue
+
+ ingress_hop2, egress_hop = route2
+
+ if ingress_hop.ingress_port != ingress_hop2.ingress_port:
+ log.error('mc-ingress-hop-hop2-mismatch',
+ ingress_hop=ingress_hop,
+ ingress_hop2=ingress_hop2,
+ in_port_no=in_port_no,
+ out_port_no=out_port_no,
+ comment='ignoring flow')
+ continue
+
+ fl_lst_olt.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(ingress_hop.ingress_port.port_no)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ action for action in get_actions(flow)
+ if action.type not in (GROUP,)
+ ] + [
+ pop_vlan(),
+ output(egress_hop.ingress_port.port_no)
+ ]
+ ))
+
+ fl_lst_onu, _ = device_rules.setdefault(
+ egress_hop.device.id, ([], []))
+ fl_lst_onu.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(egress_hop.ingress_port.port_no)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT, VLAN_VID, VLAN_PCP)
+ ],
+ actions=other_actions + [
+ output(egress_hop.egress_port.port_no)
+ ]
+ ))
+ else:
+ raise NotImplementedError('undefined downstream case for flows')
+ return device_rules
+
+ # ~~~~~~~~~~~~ methods expected to be provided by derived class ~~~~~~~~~~~
+
+ def get_all_default_rules(self):
+ raise NotImplementedError('derived class must provide')
+
+ def get_default_rules(self, device_id):
+ raise NotImplementedError('derived class must provide')
+
+ def get_route(self, ingress_port_no, egress_port_no):
+ raise NotImplementedError('derived class must provide')
+
+ def get_wildcard_input_ports(self, exclude_port=None):
+ raise NotImplementedError('derived class must provide')
+
+ def flow_delete(self, mod):
+ raise NotImplementedError('derived class must provide')
diff --git a/python/core/logical_device_agent.py b/python/core/logical_device_agent.py
new file mode 100644
index 0000000..10ec66c
--- /dev/null
+++ b/python/core/logical_device_agent.py
@@ -0,0 +1,973 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Model that captures the current state of a logical device
+"""
+from collections import OrderedDict
+
+import structlog
+
+from common.event_bus import EventBusClient
+from common.frameio.frameio import hexify
+from voltha.registry import registry
+from voltha.core.config.config_proxy import CallbackType
+from voltha.core.device_graph import DeviceGraph
+from voltha.core.flow_decomposer import FlowDecomposer, \
+ flow_stats_entry_from_flow_mod_message, group_entry_from_group_mod, \
+ mk_flow_stat, in_port, vlan_vid, vlan_pcp, pop_vlan, output, set_field, \
+ push_vlan, mk_simple_flow_mod
+from voltha.protos import third_party
+from voltha.protos import openflow_13_pb2 as ofp
+from voltha.protos.device_pb2 import Port
+from voltha.protos.logical_device_pb2 import LogicalPort
+from voltha.protos.openflow_13_pb2 import Flows, Meters, FlowGroups, ofp_meter_config
+
+_ = third_party
+
+def mac_str_to_tuple(mac):
+ return tuple(int(d, 16) for d in mac.split(':'))
+
+
+class LogicalDeviceAgent(FlowDecomposer, DeviceGraph):
+
+ def __init__(self, core, logical_device):
+ try:
+ self.core = core
+ self.local_handler = core.get_local_handler()
+ self.logical_device_id = logical_device.id
+
+ self.root_proxy = core.get_proxy('/')
+ self.flows_proxy = core.get_proxy(
+ '/logical_devices/{}/flows'.format(logical_device.id))
+ self.meters_proxy = core.get_proxy(
+ '/logical_devices/{}/meters'.format(logical_device.id))
+ self.groups_proxy = core.get_proxy(
+ '/logical_devices/{}/flow_groups'.format(logical_device.id))
+ self.self_proxy = core.get_proxy(
+ '/logical_devices/{}'.format(logical_device.id))
+
+ self.flows_proxy.register_callback(
+ CallbackType.PRE_UPDATE, self._pre_process_flows)
+ self.flows_proxy.register_callback(
+ CallbackType.POST_UPDATE, self._flow_table_updated)
+ self.groups_proxy.register_callback(
+ CallbackType.POST_UPDATE, self._group_table_updated)
+ self.self_proxy.register_callback(
+ CallbackType.POST_ADD, self._port_added)
+ self.self_proxy.register_callback(
+ CallbackType.POST_REMOVE, self._port_removed)
+
+ self.port_proxy = {}
+ self.port_status_has_changed = {}
+
+ self.event_bus = EventBusClient()
+ self.packet_in_subscription = self.event_bus.subscribe(
+ topic='packet-in:{}'.format(logical_device.id),
+ callback=self.handle_packet_in_event)
+
+ self.log = structlog.get_logger(logical_device_id=logical_device.id)
+
+ self._routes = None
+ self._no_flow_changes_required = False
+ self._flows_ids_to_add = []
+ self._flows_ids_to_remove = []
+ self._flows_to_remove = []
+
+ self.accepts_direct_logical_flows = False
+ self.device_id = self.self_proxy.get('/').root_device_id
+ device_adapter_type = self.root_proxy.get('/devices/{}'.format(
+ self.device_id)).adapter
+ device_type = self.root_proxy.get('/device_types/{}'.format(
+ device_adapter_type))
+
+ if device_type is not None:
+ self.accepts_direct_logical_flows = \
+ device_type.accepts_direct_logical_flows_update
+
+ if self.accepts_direct_logical_flows:
+
+ self.device_adapter_agent = registry(
+ 'adapter_loader').get_agent(device_adapter_type).adapter
+
+ self.log.debug('this device accepts direct logical flows',
+ device_adapter_type=device_adapter_type)
+
+
+
+ except Exception, e:
+ self.log.exception('init-error', e=e)
+
+ def start(self, reconcile=False):
+ self.log.debug('starting')
+ if reconcile:
+ # Register the callbacks for the ports
+ ports = self.self_proxy.get('/ports')
+ for port in ports:
+ self._reconcile_port(port)
+ self.log.debug('ports-reconciled', ports=ports)
+ self.log.debug('started')
+ return self
+
+ def stop(self):
+ self.log.debug('stopping')
+ try:
+ self.flows_proxy.unregister_callback(
+ CallbackType.POST_UPDATE, self._flow_table_updated)
+ self.groups_proxy.unregister_callback(
+ CallbackType.POST_UPDATE, self._group_table_updated)
+ self.self_proxy.unregister_callback(
+ CallbackType.POST_ADD, self._port_added)
+ self.self_proxy.unregister_callback(
+ CallbackType.POST_REMOVE, self._port_removed)
+
+ # Remove subscription to the event bus
+ self.event_bus.unsubscribe(self.packet_in_subscription)
+ except Exception, e:
+ self.log.info('stop-exception', e=e)
+
+ self.log.debug('stopped')
+
+ def announce_flows_deleted(self, flows):
+ for f in flows:
+ self.announce_flow_deleted(f)
+
+ def announce_flow_deleted(self, flow):
+ if flow.flags & ofp.OFPFF_SEND_FLOW_REM:
+ raise NotImplementedError("announce_flow_deleted")
+
+ def signal_flow_mod_error(self, code, flow_mod):
+ pass # TODO
+
+ def signal_flow_removal(self, code, flow):
+ pass # TODO
+
+ def signal_group_mod_error(self, code, group_mod):
+ pass # TODO
+
+ def update_flow_table(self, flow_mod):
+
+ command = flow_mod.command
+
+ if command == ofp.OFPFC_ADD:
+ self.flow_add(flow_mod)
+
+ elif command == ofp.OFPFC_DELETE:
+ self.flow_delete(flow_mod)
+
+ elif command == ofp.OFPFC_DELETE_STRICT:
+ self.flow_delete_strict(flow_mod)
+
+ elif command == ofp.OFPFC_MODIFY:
+ self.flow_modify(flow_mod)
+
+ elif command == ofp.OFPFC_MODIFY_STRICT:
+ self.flow_modify_strict(flow_mod)
+
+ else:
+ self.log.warn('unhandled-flow-mod', command=command, flow_mod=flow_mod)
+
+ def update_meter_table(self, meter_mod):
+ command = meter_mod.command
+
+ if command == ofp.OFPMC_ADD:
+ self.meter_add(meter_mod)
+
+ elif command == ofp.OFPMC_MODIFY:
+ self.meter_modify(meter_mod)
+
+ elif command == ofp.OFPMC_DELETE:
+ self.meter_delete(meter_mod)
+ else:
+ self.log.warn('unhandled-meter-mod', command=command, flow_mod=meter_mod)
+
+ def update_group_table(self, group_mod):
+
+ command = group_mod.command
+
+ if command == ofp.OFPGC_DELETE:
+ self.group_delete(group_mod)
+
+ elif command == ofp.OFPGC_ADD:
+ self.group_add(group_mod)
+
+ elif command == ofp.OFPGC_MODIFY:
+ self.group_modify(group_mod)
+
+ else:
+ self.log.warn('unhandled-group-mod',
+ command=command, group_mod=group_mod)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL METER HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
+
+ def meter_add(self, meter_mod):
+ assert isinstance(meter_mod, ofp.ofp_meter_mod)
+ # read from model
+ meters = list(self.meters_proxy.get('/').items)
+ if not self.check_meter_id_overlapping(meters, meter_mod):
+ meters.append(ofp_meter_config(flags=meter_mod.flags, \
+ meter_id=meter_mod.meter_id, \
+ bands=meter_mod.bands))
+
+ self.meters_proxy.update('/', Meters(items=meters))
+ else:
+ self.signal_meter_mod_error(ofp.OFPMMFC_METER_EXISTS, meter_mod)
+
+ def meter_modify(self, meter_mod):
+ assert isinstance(meter_mod, ofp.ofp_meter_mod)
+ meters = list(self.meters_proxy.get('/').items)
+ existing_meter = self.check_meter_id_overlapping(meters, meter_mod)
+ if existing_meter:
+ existing_meter.flags = meter_mod.flags
+ existing_meter.bands = meter_mod.bands
+ self.meters_proxy.update('/', Meters(items=meters))
+ else:
+ self.signal_meter_mod_error(ofp.OFPMMFC_UNKNOWN_METER, meter_mod)
+
+ def meter_delete(self, meter_mod):
+ assert isinstance(meter_mod, ofp.ofp_meter_mod)
+ meters = list(self.meters_proxy.get('/').items)
+ to_keep = list()
+ to_delete = 0
+
+ for meter in meters:
+ if meter.meter_id != meter_mod.meter_id:
+ to_keep.append(meter)
+ else:
+ to_delete += 1
+
+ if to_delete == 1:
+ self.meters_proxy.update('/', Meters(items=to_keep))
+ if to_delete == 0:
+ self.signal_meter_mod_error(ofp.OFPMMFC_UNKNOWN_METER, meter_mod)
+ elif to_delete > 1:
+ raise Exception('More than one meter_config sharing the same meter_id cannot exist')
+
+ @staticmethod
+ def check_meter_id_overlapping(meters, meter_mod):
+ for meter in meters:
+ if meter.meter_id == meter_mod.meter_id:
+ return meter
+ return False
+
+ def signal_meter_mod_error(self, error_code, meter_mod):
+ pass # TODO
+
+
+
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL FLOW HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
+
+ def flow_add(self, mod):
+ assert isinstance(mod, ofp.ofp_flow_mod)
+ assert mod.cookie_mask == 0
+
+ # read from model
+ flows = list(self.flows_proxy.get('/').items)
+
+ changed = False
+ check_overlap = mod.flags & ofp.OFPFF_CHECK_OVERLAP
+ if check_overlap:
+ if self.find_overlapping_flows(flows, mod, True):
+ self.signal_flow_mod_error(
+ ofp.OFPFMFC_OVERLAP, mod)
+ else:
+ # free to add as new flow
+ flow = flow_stats_entry_from_flow_mod_message(mod)
+ flows.append(flow)
+ changed = True
+ self.log.debug('flow-added', flow=mod)
+
+ else:
+ flow = flow_stats_entry_from_flow_mod_message(mod)
+ idx = self.find_flow(flows, flow)
+ if idx >= 0:
+ old_flow = flows[idx]
+ if not (mod.flags & ofp.OFPFF_RESET_COUNTS):
+ flow.byte_count = old_flow.byte_count
+ flow.packet_count = old_flow.packet_count
+ flows[idx] = flow
+ changed = True
+ self.log.debug('flow-updated', flow=flow)
+
+ else:
+ flows.append(flow)
+ changed = True
+ self.log.debug('flow-added', flow=mod)
+
+ # write back to model
+ if changed:
+ self.flows_proxy.update('/', Flows(items=flows))
+
+ def flow_delete(self, mod):
+ assert isinstance(mod, (ofp.ofp_flow_mod, ofp.ofp_flow_stats))
+
+ # read from model
+ flows = list(self.flows_proxy.get('/').items)
+
+ # build a list of what to keep vs what to delete
+ to_keep = []
+ to_delete = []
+ for f in flows:
+ if self.flow_matches_spec(f, mod):
+ to_delete.append(f)
+ else:
+ to_keep.append(f)
+
+ # replace flow table with keepers
+ flows = to_keep
+
+ # write back
+ if to_delete:
+ self.flows_proxy.update('/', Flows(items=flows))
+
+ # from mod send announcement
+ if isinstance(mod, ofp.ofp_flow_mod):
+ # send notifications for discarded flow as required by OpenFlow
+ self.announce_flows_deleted(to_delete)
+
+ def flow_delete_strict(self, mod):
+ assert isinstance(mod, ofp.ofp_flow_mod)
+
+ # read from model
+ flows = list(self.flows_proxy.get('/').items)
+ changed = False
+
+ flow = flow_stats_entry_from_flow_mod_message(mod)
+ idx = self.find_flow(flows, flow)
+ if (idx >= 0):
+ del flows[idx]
+ changed = True
+ else:
+ # TODO need to check what to do with this case
+ self.log.warn('flow-cannot-delete', flow=flow)
+
+ if changed:
+ self.flows_proxy.update('/', Flows(items=flows))
+
+ def flow_modify(self, mod):
+ raise NotImplementedError()
+
+ def flow_modify_strict(self, mod):
+ raise NotImplementedError()
+
+ def find_overlapping_flows(self, flows, mod, return_on_first=False):
+ """
+ Return list of overlapping flow(s)
+ Two flows overlap if a packet may match both and if they have the
+ same priority.
+ :param mod: Flow request
+ :param return_on_first: if True, return with the first entry
+ :return:
+ """
+ return [] # TODO finish implementation
+
+ @classmethod
+ def find_flow(cls, flows, flow):
+ for i, f in enumerate(flows):
+ if cls.flow_match(f, flow):
+ return i
+ return -1
+
+ @staticmethod
+ def flow_match(f1, f2):
+ keys_matter = ('table_id', 'priority', 'flags', 'cookie', 'match')
+ for key in keys_matter:
+ if getattr(f1, key) != getattr(f2, key):
+ return False
+ return True
+
+ @classmethod
+ def flow_matches_spec(cls, flow, flow_mod):
+ """
+ Return True if given flow (ofp_flow_stats) is "covered" by the
+ wildcard flow_mod (ofp_flow_mod), taking into consideration of
+ both exact mactches as well as masks-based match fields if any.
+ Otherwise return False
+ :param flow: ofp_flow_stats
+ :param mod: ofp_flow_mod
+ :return: Bool
+ """
+
+ assert isinstance(flow, ofp.ofp_flow_stats)
+ assert isinstance(flow_mod, (ofp.ofp_flow_mod, ofp.ofp_flow_stats))
+
+ if isinstance(flow_mod, ofp.ofp_flow_stats):
+ return cls.flow_match(flow, flow_mod)
+
+ # Check if flow.cookie is covered by mod.cookie and mod.cookie_mask
+ if (flow.cookie & flow_mod.cookie_mask) != \
+ (flow_mod.cookie & flow_mod.cookie_mask):
+ return False
+
+ # Check if flow.table_id is covered by flow_mod.table_id
+ if flow_mod.table_id != ofp.OFPTT_ALL and \
+ flow.table_id != flow_mod.table_id:
+ return False
+
+ # Check out_port
+ if (flow_mod.out_port & 0x7fffffff) != ofp.OFPP_ANY and \
+ not cls.flow_has_out_port(flow, flow_mod.out_port):
+ return False
+
+ # Check out_group
+ if (flow_mod.out_group & 0x7fffffff) != ofp.OFPG_ANY and \
+ not cls.flow_has_out_group(flow, flow_mod.out_group):
+ return False
+ # Priority is ignored
+
+ # Check match condition
+ # If the flow_mod match field is empty, that is a special case and
+ # indicates the flow entry matches
+ match = flow_mod.match
+ assert isinstance(match, ofp.ofp_match)
+ if not match.oxm_fields:
+ # If we got this far and the match is empty in the flow spec,
+ # than the flow matches
+ return True
+ else:
+ raise NotImplementedError(
+ "flow_matches_spec(): No flow match analysis yet")
+
+ @staticmethod
+ def flow_has_out_port(flow, out_port):
+ """
+ Return True if flow has a output command with the given out_port
+ """
+ assert isinstance(flow, ofp.ofp_flow_stats)
+ for instruction in flow.instructions:
+ assert isinstance(instruction, ofp.ofp_instruction)
+ if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
+ for action in instruction.actions.actions:
+ assert isinstance(action, ofp.ofp_action)
+ if action.type == ofp.OFPAT_OUTPUT and \
+ action.output.port == out_port:
+ return True
+
+ # otherwise...
+ return False
+
+ @staticmethod
+ def flow_has_out_group(flow, group_id):
+ """
+ Return True if flow has a output command with the given out_group
+ """
+ assert isinstance(flow, ofp.ofp_flow_stats)
+ for instruction in flow.instructions:
+ assert isinstance(instruction, ofp.ofp_instruction)
+ if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
+ for action in instruction.actions.actions:
+ assert isinstance(action, ofp.ofp_action)
+ if action.type == ofp.OFPAT_GROUP and \
+ action.group.group_id == group_id:
+ return True
+
+ # otherwise...
+ return False
+
+ def flows_delete_by_group_id(self, flows, group_id):
+ """
+ Delete any flow(s) referring to given group_id
+ :param group_id:
+ :return: None
+ """
+ to_keep = []
+ to_delete = []
+ for f in flows:
+ if self.flow_has_out_group(f, group_id):
+ to_delete.append(f)
+ else:
+ to_keep.append(f)
+
+ # replace flow table with keepers
+ flows = to_keep
+
+ # send notification to deleted ones
+ self.announce_flows_deleted(to_delete)
+
+ return bool(to_delete), flows
+
+ # ~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL GROUP HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def group_add(self, group_mod):
+ assert isinstance(group_mod, ofp.ofp_group_mod)
+
+ groups = OrderedDict((g.desc.group_id, g)
+ for g in self.groups_proxy.get('/').items)
+ changed = False
+
+ if group_mod.group_id in groups:
+ self.signal_group_mod_error(ofp.OFPGMFC_GROUP_EXISTS, group_mod)
+ else:
+ group_entry = group_entry_from_group_mod(group_mod)
+ groups[group_mod.group_id] = group_entry
+ changed = True
+
+ if changed:
+ self.groups_proxy.update('/', FlowGroups(items=groups.values()))
+
+ def group_delete(self, group_mod):
+ assert isinstance(group_mod, ofp.ofp_group_mod)
+
+ groups = OrderedDict((g.desc.group_id, g)
+ for g in self.groups_proxy.get('/').items)
+ groups_changed = False
+ flows_changed = False
+
+ group_id = group_mod.group_id
+ if group_id == ofp.OFPG_ALL:
+ # TODO we must delete all flows that point to this group and
+ # signal controller as requested by flow's flag
+ groups = OrderedDict()
+ groups_changed = True
+ self.log.debug('all-groups-deleted')
+
+ else:
+ if group_id not in groups:
+ # per openflow spec, this is not an error
+ pass
+
+ else:
+ flows = list(self.flows_proxy.get('/').items)
+ flows_changed, flows = self.flows_delete_by_group_id(
+ flows, group_id)
+ del groups[group_id]
+ groups_changed = True
+ self.log.debug('group-deleted', group_id=group_id)
+
+ if groups_changed:
+ self.groups_proxy.update('/', FlowGroups(items=groups.values()))
+ if flows_changed:
+ self.flows_proxy.update('/', Flows(items=flows))
+
+ def group_modify(self, group_mod):
+ assert isinstance(group_mod, ofp.ofp_group_mod)
+
+ groups = OrderedDict((g.desc.group_id, g)
+ for g in self.groups_proxy.get('/').items)
+ changed = False
+
+ if group_mod.group_id not in groups:
+ self.signal_group_mod_error(
+ ofp.OFPGMFC_INVALID_GROUP, group_mod)
+ else:
+ # replace existing group entry with new group definition
+ group_entry = group_entry_from_group_mod(group_mod)
+ groups[group_mod.group_id] = group_entry
+ changed = True
+
+ if changed:
+ self.groups_proxy.update('/', FlowGroups(items=groups.values()))
+
+ def port_enable(self, port_id):
+ self.log.info("port-enable", port_id=port_id)
+
+ proxy = self.port_proxy[port_id]
+ port = proxy.get('/')
+ port.ofp_port.config = port.ofp_port.config & ~ofp.OFPPC_PORT_DOWN
+ proxy.update('/', port)
+
+ def port_disable(self, port_id):
+ self.log.info("port-disable", port_id=port_id)
+
+ proxy = self.port_proxy[port_id]
+ port = proxy.get('/')
+ port.ofp_port.config = port.ofp_port.config & ~ofp.OFPPC_PORT_DOWN | ofp.OFPPC_PORT_DOWN
+ proxy.update('/', port)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_OUT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def packet_out(self, ofp_packet_out):
+ self.log.debug('packet-out', packet=ofp_packet_out)
+ topic = 'packet-out:{}'.format(self.logical_device_id)
+ self.event_bus.publish(topic, ofp_packet_out)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_IN ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def handle_packet_in_event(self, _, msg):
+ self.log.debug('handle-packet-in', msg=msg)
+ logical_port_no, packet = msg
+ packet_in = ofp.ofp_packet_in(
+ # buffer_id=0,
+ reason=ofp.OFPR_ACTION,
+ # table_id=0,
+ # cookie=0,
+ match=ofp.ofp_match(
+ type=ofp.OFPMT_OXM,
+ oxm_fields=[
+ ofp.ofp_oxm_field(
+ oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+ ofb_field=in_port(logical_port_no)
+ )
+ ]
+ ),
+ data=packet
+ )
+ self.packet_in(packet_in)
+
+ def packet_in(self, ofp_packet_in):
+ self.log.info('packet-in', logical_device_id=self.logical_device_id,
+ pkt=ofp_packet_in, data=hexify(ofp_packet_in.data))
+ self.local_handler.send_packet_in(
+ self.logical_device_id, ofp_packet_in)
+
+ # ~~~~~~~~~~~~~~~~~~~~~ FLOW TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _pre_process_flows(self, flows):
+ """
+ This method is invoked before a device flow table data model is
+ updated. The resulting data is stored locally and the flow table is
+ updated during the post-processing phase, i.e. via the POST_UPDATE
+ callback
+ :param flows: Desired flows
+ :return: None
+ """
+ current_flows = self.flows_proxy.get('/')
+ self.log.debug('pre-processing-flows',
+ logical_device_id=self.logical_device_id,
+ desired_flows=flows,
+ existing_flows=current_flows)
+
+ current_flow_ids = set(f.id for f in current_flows.items)
+ desired_flow_ids = set(f.id for f in flows.items)
+
+ self._flows_ids_to_add = desired_flow_ids.difference(current_flow_ids)
+ self._flows_ids_to_remove = current_flow_ids.difference(desired_flow_ids)
+ self._flows_to_remove = []
+ for f in current_flows.items:
+ if f.id in self._flows_ids_to_remove:
+ self._flows_to_remove.append(f)
+
+ if len(self._flows_ids_to_add) + len(self._flows_ids_to_remove) == 0:
+ # No changes of flows, just stats are changing
+ self._no_flow_changes_required = True
+ else:
+ self._no_flow_changes_required = False
+
+ self.log.debug('flows-preprocess-output', current_flows=len(
+ current_flow_ids), new_flows=len(desired_flow_ids),
+ adding_flows=len(self._flows_ids_to_add),
+ removing_flows=len(self._flows_ids_to_remove))
+
+
+ def _flow_table_updated(self, flows):
+ self.log.debug('flow-table-updated',
+ logical_device_id=self.logical_device_id, flows=flows)
+
+ if self._no_flow_changes_required:
+ # Stats changes, no need to process further
+ self.log.debug('flow-stats-update')
+ else:
+
+ groups = self.groups_proxy.get('/').items
+ device_rules_map = self.decompose_rules(flows.items, groups)
+
+ # TODO we have to evolve this into a policy-based, event based pattern
+ # This is a raw implementation of the specific use-case with certain
+ # built-in assumptions, and not yet device vendor specific. The policy-
+ # based refinement will be introduced that later.
+
+
+ # Temporary bypass for openolt
+
+ if self.accepts_direct_logical_flows:
+ #give the logical flows directly to the adapter
+ self.log.debug('it is an direct logical flow bypass')
+ if self.device_adapter_agent is None:
+ self.log.error('No device adapter agent',
+ device_id=self.device_id,
+ logical_device_id = self.logical_device_id)
+ return
+
+ flows_to_add = []
+ for f in flows.items:
+ if f.id in self._flows_ids_to_add:
+ flows_to_add.append(f)
+
+
+ self.log.debug('flows to remove',
+ flows_to_remove=self._flows_to_remove,
+ flows_ids=self._flows_ids_to_remove)
+
+ try:
+ self.device_adapter_agent.update_logical_flows(
+ self.device_id, flows_to_add, self._flows_to_remove,
+ groups, device_rules_map)
+ except Exception as e:
+ self.log.error('logical flows bypass error', error=e,
+ flows=flows)
+ else:
+
+ for device_id, (flows, groups) in device_rules_map.iteritems():
+
+ self.root_proxy.update('/devices/{}/flows'.format(device_id),
+ Flows(items=flows.values()))
+ self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
+ FlowGroups(items=groups.values()))
+
+ # ~~~~~~~~~~~~~~~~~~~~ GROUP TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _group_table_updated(self, flow_groups):
+ self.log.debug('group-table-updated',
+ logical_device_id=self.logical_device_id,
+ flow_groups=flow_groups)
+
+ flows = self.flows_proxy.get('/').items
+ device_flows_map = self.decompose_rules(flows, flow_groups.items)
+ for device_id, (flows, groups) in device_flows_map.iteritems():
+ self.root_proxy.update('/devices/{}/flows'.format(device_id),
+ Flows(items=flows.values()))
+ self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
+ FlowGroups(items=groups.values()))
+
+ # ~~~~~~~~~~~~~~~~~~~ APIs NEEDED BY FLOW DECOMPOSER ~~~~~~~~~~~~~~~~~~~~~~
+
+ def _port_added(self, port):
+ self.log.debug('port-added', port=port)
+ assert isinstance(port, LogicalPort)
+ self._port_list_updated(port)
+
+ # Set a proxy and callback for that specific port
+ self.port_proxy[port.id] = self.core.get_proxy(
+ '/logical_devices/{}/ports/{}'.format(self.logical_device_id,
+ port.id))
+ self.port_status_has_changed[port.id] = True
+ self.port_proxy[port.id].register_callback(
+ CallbackType.PRE_UPDATE, self._pre_port_changed)
+ self.port_proxy[port.id].register_callback(
+ CallbackType.POST_UPDATE, self._port_changed)
+
+ self.local_handler.send_port_change_event(
+ device_id=self.logical_device_id,
+ port_status=ofp.ofp_port_status(
+ reason=ofp.OFPPR_ADD,
+ desc=port.ofp_port
+ )
+ )
+
+ def _reconcile_port(self, port):
+ self.log.debug('reconcile-port', port=port)
+ assert isinstance(port, LogicalPort)
+ self._port_list_updated(port)
+
+ # Set a proxy and callback for that specific port
+ self.port_proxy[port.id] = self.core.get_proxy(
+ '/logical_devices/{}/ports/{}'.format(self.logical_device_id,
+ port.id))
+ self.port_status_has_changed[port.id] = True
+ self.port_proxy[port.id].register_callback(
+ CallbackType.PRE_UPDATE, self._pre_port_changed)
+ self.port_proxy[port.id].register_callback(
+ CallbackType.POST_UPDATE, self._port_changed)
+
+ def _port_removed(self, port):
+ self.log.debug('port-removed', port=port)
+ assert isinstance(port, LogicalPort)
+ self._port_list_updated(port)
+
+ # Remove the proxy references
+ self.port_proxy[port.id].unregister_callback(
+ CallbackType.PRE_UPDATE, self._pre_port_changed)
+ self.port_proxy[port.id].unregister_callback(
+ CallbackType.POST_UPDATE, self._port_changed)
+ del self.port_proxy[port.id]
+ del self.port_status_has_changed[port.id]
+
+
+ self.local_handler.send_port_change_event(
+ device_id=self.logical_device_id,
+ port_status=ofp.ofp_port_status(
+ reason=ofp.OFPPR_DELETE,
+ desc=port.ofp_port
+ )
+ )
+
+ def _pre_port_changed(self, port):
+ old_port = self.port_proxy[port.id].get('/')
+ if old_port.ofp_port != port.ofp_port:
+ self.port_status_has_changed[port.id] = True
+ else :
+ self.port_status_has_changed[port.id] = False
+
+ def _port_changed(self, port):
+ self.log.debug('port-changed', port=port)
+ if self.port_status_has_changed[port.id]:
+ assert isinstance(port, LogicalPort)
+ self.local_handler.send_port_change_event(
+ device_id=self.logical_device_id,
+ port_status=ofp.ofp_port_status(
+ reason=ofp.OFPPR_MODIFY,
+ desc=port.ofp_port
+ )
+ )
+
+ def _port_list_updated(self, _):
+ # invalidate the graph and the route table
+ self._invalidate_cached_tables()
+
+ def _invalidate_cached_tables(self):
+ self._routes = None
+ self._default_rules = None
+ self._nni_logical_port_no = None
+
+ def _assure_cached_tables_up_to_date(self):
+ if self._routes is None:
+ logical_ports = self.self_proxy.get('/ports')
+ graph, self._routes = self.compute_routes(
+ self.root_proxy, logical_ports)
+ self._default_rules = self._generate_default_rules(graph)
+ root_ports = [p for p in logical_ports if p.root_port]
+ assert len(root_ports) == 1, 'Only one root port supported at this time'
+ self._nni_logical_port_no = root_ports[0].ofp_port.port_no
+
+
+ def _generate_default_rules(self, graph):
+
+ def root_device_default_rules(device):
+ flows = OrderedDict()
+ groups = OrderedDict()
+ return flows, groups
+
+ def leaf_device_default_rules(device):
+ ports = self.root_proxy.get('/devices/{}/ports'.format(device.id))
+ upstream_ports = [
+ port for port in ports if port.type == Port.PON_ONU \
+ or port.type == Port.VENET_ONU
+ ]
+ assert len(upstream_ports) == 1
+ downstream_ports = [
+ port for port in ports if port.type == Port.ETHERNET_UNI
+ ]
+
+ # it is possible that the downstream ports are not
+ # created, but the flow_decomposition has already
+ # kicked in. In such scenarios, cut short the processing
+ # and return.
+ if len(downstream_ports) == 0:
+ return None, None
+ # assert len(downstream_ports) == 1
+ upstream_port = upstream_ports[0]
+ flows = OrderedDict()
+ for downstream_port in downstream_ports:
+ flows.update(OrderedDict((f.id, f) for f in [
+ mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(downstream_port.port_no),
+ vlan_vid(ofp.OFPVID_PRESENT | 0)
+ ],
+ actions=[
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
+ output(upstream_port.port_no)
+ ]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(downstream_port.port_no),
+ vlan_vid(0)
+ ],
+ actions=[
+ push_vlan(0x8100),
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
+ output(upstream_port.port_no)
+ ]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(upstream_port.port_no),
+ vlan_vid(ofp.OFPVID_PRESENT | device.vlan)
+ ],
+ actions=[
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | 0)),
+ output(downstream_port.port_no)
+ ]
+ ),
+ ]))
+ groups = OrderedDict()
+ return flows, groups
+
+ root_device_id = self.self_proxy.get('/').root_device_id
+ rules = {}
+ for node_key in graph.nodes():
+ node = graph.node[node_key]
+ device = node.get('device', None)
+ if device is None:
+ continue
+ if device.id == root_device_id:
+ rules[device.id] = root_device_default_rules(device)
+ else:
+ rules[device.id] = leaf_device_default_rules(device)
+ return rules
+
+ def get_route(self, ingress_port_no, egress_port_no):
+ self._assure_cached_tables_up_to_date()
+ self.log.info('getting-route', eg_port=egress_port_no, in_port=ingress_port_no,
+ nni_port=self._nni_logical_port_no)
+ if egress_port_no is not None and \
+ (egress_port_no & 0x7fffffff) == ofp.OFPP_CONTROLLER:
+ self.log.info('controller-flow', eg_port=egress_port_no, in_port=ingress_port_no,
+ nni_port=self._nni_logical_port_no)
+ if ingress_port_no == self._nni_logical_port_no:
+ self.log.info('returning half route')
+ # This is a trap on the NNI Port
+ # Return a 'half' route to make the flow decomp logic happy
+ for (ingress, egress), route in self._routes.iteritems():
+ if egress == self._nni_logical_port_no:
+ return [None, route[1]]
+ raise Exception('not a single upstream route')
+ # treat it as if the output port is the NNI of the OLT
+ egress_port_no = self._nni_logical_port_no
+
+ # If ingress_port is not specified (None), it may be a wildcarded
+ # route if egress_port is OFPP_CONTROLLER or _nni_logical_port,
+ # in which case we need to create a half-route where only the egress
+ # hop is filled, the first hope is None
+ if ingress_port_no is None and \
+ egress_port_no == self._nni_logical_port_no:
+ # We can use the 2nd hop of any upstream route, so just find the
+ # first upstream:
+ for (ingress, egress), route in self._routes.iteritems():
+ if egress == self._nni_logical_port_no:
+ return [None, route[1]]
+ raise Exception('not a single upstream route')
+
+ # If egress_port is not specified (None), we can also can return a
+ # "half" route
+ if egress_port_no is None:
+ for (ingress, egress), route in self._routes.iteritems():
+ if ingress == ingress_port_no:
+ return [route[0], None]
+
+ # This can occur is a leaf device is disabled
+ self.log.exception('no-downstream-route',
+ ingress_port_no=ingress_port_no,
+ egress_port_no= egress_port_no
+ )
+ return None
+
+
+ return self._routes.get((ingress_port_no, egress_port_no))
+
+ def get_all_default_rules(self):
+ self._assure_cached_tables_up_to_date()
+ return self._default_rules
+
+ def get_wildcard_input_ports(self, exclude_port=None):
+ logical_ports = self.self_proxy.get('/ports')
+ return [port.ofp_port.port_no for port in logical_ports
+ if port.ofp_port.port_no != exclude_port]
diff --git a/python/core/registry.py b/python/core/registry.py
new file mode 100644
index 0000000..270bd71
--- /dev/null
+++ b/python/core/registry.py
@@ -0,0 +1,69 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Simple component registry to provide centralized access to any registered
+components.
+"""
+from collections import OrderedDict
+from zope.interface import Interface
+
+
+class IComponent(Interface):
+ """
+ A Voltha Component
+ """
+
+ def start():
+ """
+ Called once the componet is instantiated. Can be used for async
+ initialization.
+ :return: (None or Deferred)
+ """
+
+ def stop():
+ """
+ Called once before the component is unloaded. Can be used for async
+ cleanup operations.
+ :return: (None or Deferred)
+ """
+
+
+class Registry(object):
+
+ def __init__(self):
+ self.components = OrderedDict()
+
+ def register(self, name, component):
+ assert IComponent.providedBy(component)
+ assert name not in self.components
+ self.components[name] = component
+ return component
+
+ def unregister(self, name):
+ if name in self.components:
+ del self.components[name]
+
+ def __call__(self, name):
+ return self.components[name]
+
+ def iterate(self):
+ return self.components.values()
+
+
+# public shared registry
+registry = Registry()