This commit cleans up the python directory to ensure the adapters
and the cli runs properly.
Change-Id: Ic68a3ecd1f16a5af44296e3c020c808b185f4c18
diff --git a/python/common/frameio/frameio.py b/python/common/frameio/frameio.py
deleted file mode 100644
index 3f5bcf6..0000000
--- a/python/common/frameio/frameio.py
+++ /dev/null
@@ -1,437 +0,0 @@
-#
-# Copyright 2017 the original author or authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-A module that can send and receive raw ethernet frames on a set of interfaces
-and it can manage a set of vlan interfaces on top of existing
-interfaces. Due to reliance on raw sockets, this module requires
-root access. Also, raw sockets are hard to deal with in Twisted (not
-directly supported) we need to run the receiver select loop on a dedicated
-thread.
-"""
-
-import os
-import socket
-import struct
-import uuid
-from pcapy import BPFProgram
-from threading import Thread, Condition
-
-import fcntl
-
-import select
-import structlog
-import sys
-
-from scapy.data import ETH_P_ALL
-from twisted.internet import reactor
-from zope.interface import implementer
-
-from voltha.registry import IComponent
-
-if sys.platform.startswith('linux'):
- from common.frameio.third_party.oftest import afpacket, netutils
-elif sys.platform == 'darwin':
- from scapy.arch import pcapdnet, BIOCIMMEDIATE, dnet
-
-log = structlog.get_logger()
-
-
-def hexify(buffer):
- """
- Return a hexadecimal string encoding of input buffer
- """
- return ''.join('%02x' % ord(c) for c in buffer)
-
-
-class _SelectWakerDescriptor(object):
- """
- A descriptor that can be mixed into a select loop to wake it up.
- """
- def __init__(self):
- self.pipe_read, self.pipe_write = os.pipe()
- fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
-
- def __del__(self):
- os.close(self.pipe_read)
- os.close(self.pipe_write)
-
- def fileno(self):
- return self.pipe_read
-
- def wait(self):
- os.read(self.pipe_read, 1)
-
- def notify(self):
- """Trigger a select loop"""
- os.write(self.pipe_write, '\x00')
-
-
-class BpfProgramFilter(object):
- """
- Convenience packet filter based on the well-tried Berkeley Packet Filter,
- used by many well known open source tools such as pcap and tcpdump.
- """
- def __init__(self, program_string):
- """
- Create a filter using the BPF command syntax. To learn more,
- consult 'man pcap-filter'.
- :param program_string: The textual definition of the filter. Examples:
- 'vlan 1000'
- 'vlan 1000 and ip src host 10.10.10.10'
- """
- self.bpf = BPFProgram(program_string)
-
- def __call__(self, frame):
- """
- Return 1 if frame passes filter.
- :param frame: Raw frame provided as Python string
- :return: 1 if frame satisfies filter, 0 otherwise.
- """
- return self.bpf.filter(frame)
-
-
-class FrameIOPort(object):
- """
- Represents a network interface which we can send/receive raw
- Ethernet frames.
- """
-
- RCV_SIZE_DEFAULT = 4096
- ETH_P_ALL = 0x03
- RCV_TIMEOUT = 10000
- MIN_PKT_SIZE = 60
-
- def __init__(self, iface_name):
- self.iface_name = iface_name
- self.proxies = []
- self.socket = self.open_socket(self.iface_name)
- log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
- self.received = 0
- self.discarded = 0
-
- def add_proxy(self, proxy):
- self.proxies.append(proxy)
-
- def del_proxy(self, proxy):
- self.proxies = [p for p in self.proxies if p.name != proxy.name]
-
- def open_socket(self, iface_name):
- raise NotImplementedError('to be implemented by derived class')
-
- def rcv_frame(self):
- raise NotImplementedError('to be implemented by derived class')
-
- def __del__(self):
- if self.socket:
- self.socket.close()
- self.socket = None
- log.debug('socket-closed', iface=self.iface_name)
-
- def fileno(self):
- return self.socket.fileno()
-
- def _dispatch(self, proxy, frame):
- log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
- try:
- proxy.callback(proxy, frame)
- except Exception as e:
- log.exception('callback-error',
- explanation='Callback failed while processing frame',
- e=e)
-
- def recv(self):
- """Called on the select thread when a packet arrives"""
- try:
- frame = self.rcv_frame()
- except RuntimeError as e:
- # we observed this happens sometimes right after the socket was
- # attached to a newly created veth interface. So we log it, but
- # allow to continue.
- log.warn('afpacket-recv-error', code=-1)
- return
-
- log.debug('frame-received', iface=self.iface_name, len=len(frame),
- hex=hexify(frame))
- self.received +=1
- dispatched = False
- for proxy in self.proxies:
- if proxy.filter is None or proxy.filter(frame):
- log.debug('frame-dispatched')
- dispatched = True
- reactor.callFromThread(self._dispatch, proxy, frame)
-
- if not dispatched:
- self.discarded += 1
- log.debug('frame-discarded')
-
- def send(self, frame):
- log.debug('sending', len=len(frame), iface=self.iface_name)
- sent_bytes = self.send_frame(frame)
- if sent_bytes != len(frame):
- log.error('send-error', iface=self.iface_name,
- wanted_to_send=len(frame), actually_sent=sent_bytes)
- return sent_bytes
-
- def send_frame(self, frame):
- try:
- return self.socket.send(frame)
- except socket.error, err:
- if err[0] == os.errno.EINVAL:
- if len(frame) < self.MIN_PKT_SIZE:
- padding = '\x00' * (self.MIN_PKT_SIZE - len(frame))
- frame = frame + padding
- return self.socket.send(frame)
- else:
- raise
-
- def up(self):
- if sys.platform.startswith('darwin'):
- pass
- else:
- os.system('ip link set {} up'.format(self.iface_name))
- return self
-
- def down(self):
- if sys.platform.startswith('darwin'):
- pass
- else:
- os.system('ip link set {} down'.format(self.iface_name))
- return self
-
- def statistics(self):
- return self.received, self.discarded
-
-
-class LinuxFrameIOPort(FrameIOPort):
-
- def open_socket(self, iface_name):
- s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
- afpacket.enable_auxdata(s)
- s.bind((self.iface_name, self.ETH_P_ALL))
- netutils.set_promisc(s, iface_name)
- s.settimeout(self.RCV_TIMEOUT)
- return s
-
- def rcv_frame(self):
- return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
-
-
-class DarwinFrameIOPort(FrameIOPort):
-
- def open_socket(self, iface_name):
- sin = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
- try:
- fcntl.ioctl(sin.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
- except:
- pass
-
- # need a different kind of socket for sending out
- self.sout = dnet.eth(iface_name)
-
- return sin
-
- def send_frame(self, frame):
- return self.sout.send(frame)
-
- def rcv_frame(self):
- pkt = self.socket.next()
- if pkt is not None:
- ts, pkt = pkt
- return pkt
-
-
-if sys.platform == 'darwin':
- _FrameIOPort = DarwinFrameIOPort
-elif sys.platform.startswith('linux'):
- _FrameIOPort = LinuxFrameIOPort
-else:
- raise Exception('Unsupported platform {}'.format(sys.platform))
- sys.exit(1)
-
-
-class FrameIOPortProxy(object):
- """Makes FrameIOPort sharable between multiple users"""
-
- def __init__(self, frame_io_port, callback, filter=None, name=None):
- self.frame_io_port = frame_io_port
- self.callback = callback
- self.filter = filter
- self.name = uuid.uuid4().hex[:12] if name is None else name
-
- @property
- def iface_name(self):
- return self.frame_io_port.iface_name
-
- def get_iface_name(self):
- return self.frame_io_port.iface_name
-
- def send(self, frame):
- return self.frame_io_port.send(frame)
-
- def up(self):
- self.frame_io_port.up()
- return self
-
- def down(self):
- self.frame_io_port.down()
- return self
-
-
-@implementer(IComponent)
-class FrameIOManager(Thread):
- """
- Packet/Frame IO manager that can be used to send/receive raw frames
- on a set of network interfaces.
- """
- def __init__(self):
- super(FrameIOManager, self).__init__()
-
- self.ports = {} # iface_name -> ActiveFrameReceiver
- self.queue = {} # iface_name -> TODO
-
- self.cvar = Condition()
- self.waker = _SelectWakerDescriptor()
- self.stopped = False
- self.ports_changed = False
-
- # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
-
- def start(self):
- """
- Start the IO manager and its select loop thread
- """
- log.debug('starting')
- super(FrameIOManager, self).start()
- log.info('started')
- return self
-
- def stop(self):
- """
- Stop the IO manager and its thread with the select loop
- """
- log.debug('stopping')
- self.stopped = True
- self.waker.notify()
- self.join()
- del self.ports
- log.info('stopped')
-
- def list_interfaces(self):
- """
- Return list of interfaces listened on
- :return: List of FrameIOPort objects
- """
- return self.ports
-
- def open_port(self, iface_name, callback, filter=None, name=None):
- """
- Add a new interface and start receiving on it.
- :param iface_name: Name of the interface. Must be an existing Unix
- interface (eth0, en0, etc.)
- :param callback: Called on each received frame;
- signature: def callback(port, frame) where port is the FrameIOPort
- instance at which the frame was received, frame is the actual frame
- received (as binay string)
- :param filter: An optional filter (predicate), with signature:
- def filter(frame). If provided, only frames for which filter evaluates
- to True will be forwarded to callback.
- :return: FrmaeIOPortProxy instance.
- """
-
- port = self.ports.get(iface_name)
- if port is None:
- port = _FrameIOPort(iface_name)
- self.ports[iface_name] = port
- self.ports_changed = True
- self.waker.notify()
-
- proxy = FrameIOPortProxy(port, callback, filter, name)
- port.add_proxy(proxy)
-
- return proxy
-
- def close_port(self, proxy):
- """
- Remove the proxy. If this is the last proxy on an interface, stop and
- remove the named interface as well
- :param proxy: FrameIOPortProxy reference
- :return: None
- """
- assert isinstance(proxy, FrameIOPortProxy)
- iface_name = proxy.get_iface_name()
- assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
- port = self.ports[iface_name]
- port.del_proxy(proxy)
-
- if not port.proxies:
- del self.ports[iface_name]
- # need to exit select loop to reconstruct select fd lists
- self.ports_changed = True
- self.waker.notify()
-
- def send(self, iface_name, frame):
- """
- Send frame on given interface
- :param iface_name: Name of previously registered interface
- :param frame: frame as string
- :return: number of bytes sent
- """
- return self.ports[iface_name].send(frame)
-
- # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
-
- def run(self):
- """
- Called on the alien thread, this is the core multi-port receive loop
- """
-
- log.debug('select-loop-started')
-
- # outer loop constructs sockets list for select
- while not self.stopped:
- sockets = [self.waker] + self.ports.values()
- self.ports_changed = False
- empty = []
- # inner select loop
-
- while not self.stopped:
- try:
- _in, _out, _err = select.select(sockets, empty, empty, 1)
- except Exception as e:
- log.exception('frame-io-select-error', e=e)
- break
- with self.cvar:
- for port in _in:
- if port is self.waker:
- self.waker.wait()
- continue
- else:
- port.recv()
- self.cvar.notify_all()
- if self.ports_changed:
- break # break inner loop so we reconstruct sockets list
-
- log.debug('select-loop-exited')
-
- def del_interface(self, iface_name):
- """
- Delete interface for stopping
- """
-
- log.info('Delete interface')
- del self.ports[iface_name]
- log.info('Interface(port) is deleted')
diff --git a/python/common/frameio/third_party/__init__.py b/python/common/frameio/third_party/__init__.py
deleted file mode 100644
index b0fb0b2..0000000
--- a/python/common/frameio/third_party/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/python/common/frameio/third_party/oftest/LICENSE b/python/common/frameio/third_party/oftest/LICENSE
deleted file mode 100644
index 3216042..0000000
--- a/python/common/frameio/third_party/oftest/LICENSE
+++ /dev/null
@@ -1,36 +0,0 @@
-OpenFlow Test Framework
-
-Copyright (c) 2010 The Board of Trustees of The Leland Stanford
-Junior University
-
-Except where otherwise noted, this software is distributed under
-the OpenFlow Software License. See
-http://www.openflowswitch.org/wp/legal/ for current details.
-
-We are making the OpenFlow specification and associated documentation
-(Software) available for public use and benefit with the expectation
-that others will use, modify and enhance the Software and contribute
-those enhancements back to the community. However, since we would like
-to make the Software available for broadest use, with as few
-restrictions as possible permission is hereby granted, free of charge,
-to any person obtaining a copy of this Software to deal in the
-Software under the copyrights without restriction, including without
-limitation the rights to use, copy, modify, merge, publish,
-distribute, sublicense, and/or sell copies of the Software, and to
-permit persons to whom the Software is furnished to do so, subject to
-the following conditions:
-
-The above copyright notice and this permission notice shall be
-included in all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED -Y´AS IS¡, WITHOUT WARRANTY OF ANY KIND,
-EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
-LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
-OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
-WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-The name and trademarks of copyright holder(s) may NOT be used in
-advertising or publicity pertaining to the Software or any derivatives
-without specific, written prior permission.
diff --git a/python/common/frameio/third_party/oftest/README.md b/python/common/frameio/third_party/oftest/README.md
deleted file mode 100644
index f0cb649..0000000
--- a/python/common/frameio/third_party/oftest/README.md
+++ /dev/null
@@ -1,6 +0,0 @@
-Files in this directory are derived from the respective files
-in oftest (http://github.com/floodlight/oftest).
-
-For the licensing terms of these files, see LICENSE in this dir.
-
-
diff --git a/python/common/frameio/third_party/oftest/__init__.py b/python/common/frameio/third_party/oftest/__init__.py
deleted file mode 100644
index b0fb0b2..0000000
--- a/python/common/frameio/third_party/oftest/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/python/common/frameio/third_party/oftest/afpacket.py b/python/common/frameio/third_party/oftest/afpacket.py
deleted file mode 100644
index 9ae8075..0000000
--- a/python/common/frameio/third_party/oftest/afpacket.py
+++ /dev/null
@@ -1,124 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-AF_PACKET receive support
-
-When VLAN offload is enabled on the NIC Linux will not deliver the VLAN tag
-in the data returned by recv. Instead, it delivers the VLAN TCI in a control
-message. Python 2.x doesn't have built-in support for recvmsg, so we have to
-use ctypes to call it. The recv function exported by this module reconstructs
-the VLAN tag if it was offloaded.
-"""
-
-import struct
-from ctypes import *
-
-ETH_P_8021Q = 0x8100
-SOL_PACKET = 263
-PACKET_AUXDATA = 8
-TP_STATUS_VLAN_VALID = 1 << 4
-
-class struct_iovec(Structure):
- _fields_ = [
- ("iov_base", c_void_p),
- ("iov_len", c_size_t),
- ]
-
-class struct_msghdr(Structure):
- _fields_ = [
- ("msg_name", c_void_p),
- ("msg_namelen", c_uint32),
- ("msg_iov", POINTER(struct_iovec)),
- ("msg_iovlen", c_size_t),
- ("msg_control", c_void_p),
- ("msg_controllen", c_size_t),
- ("msg_flags", c_int),
- ]
-
-class struct_cmsghdr(Structure):
- _fields_ = [
- ("cmsg_len", c_size_t),
- ("cmsg_level", c_int),
- ("cmsg_type", c_int),
- ]
-
-class struct_tpacket_auxdata(Structure):
- _fields_ = [
- ("tp_status", c_uint),
- ("tp_len", c_uint),
- ("tp_snaplen", c_uint),
- ("tp_mac", c_ushort),
- ("tp_net", c_ushort),
- ("tp_vlan_tci", c_ushort),
- ("tp_padding", c_ushort),
- ]
-
-libc = CDLL("libc.so.6")
-recvmsg = libc.recvmsg
-recvmsg.argtypes = [c_int, POINTER(struct_msghdr), c_int]
-recvmsg.retype = c_int
-
-def enable_auxdata(sk):
- """
- Ask the kernel to return the VLAN tag in a control message
-
- Must be called on the socket before afpacket.recv.
- """
- sk.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1)
-
-def recv(sk, bufsize):
- """
- Receive a packet from an AF_PACKET socket
- @sk Socket
- @bufsize Maximum packet size
- """
- buf = create_string_buffer(bufsize)
-
- ctrl_bufsize = sizeof(struct_cmsghdr) + sizeof(struct_tpacket_auxdata) + sizeof(c_size_t)
- ctrl_buf = create_string_buffer(ctrl_bufsize)
-
- iov = struct_iovec()
- iov.iov_base = cast(buf, c_void_p)
- iov.iov_len = bufsize
-
- msghdr = struct_msghdr()
- msghdr.msg_name = None
- msghdr.msg_namelen = 0
- msghdr.msg_iov = pointer(iov)
- msghdr.msg_iovlen = 1
- msghdr.msg_control = cast(ctrl_buf, c_void_p)
- msghdr.msg_controllen = ctrl_bufsize
- msghdr.msg_flags = 0
-
- rv = recvmsg(sk.fileno(), byref(msghdr), 0)
- if rv < 0:
- raise RuntimeError("recvmsg failed: rv=%d", rv)
-
- # The kernel only delivers control messages we ask for. We
- # only enabled PACKET_AUXDATA, so we can assume it's the
- # only control message.
- assert msghdr.msg_controllen >= sizeof(struct_cmsghdr)
-
- cmsghdr = struct_cmsghdr.from_buffer(ctrl_buf) # pylint: disable=E1101
- assert cmsghdr.cmsg_level == SOL_PACKET
- assert cmsghdr.cmsg_type == PACKET_AUXDATA
-
- auxdata = struct_tpacket_auxdata.from_buffer(ctrl_buf, sizeof(struct_cmsghdr)) # pylint: disable=E1101
-
- if auxdata.tp_vlan_tci != 0 or auxdata.tp_status & TP_STATUS_VLAN_VALID:
- # Insert VLAN tag
- tag = struct.pack("!HH", ETH_P_8021Q, auxdata.tp_vlan_tci)
- return buf.raw[:12] + tag + buf.raw[12:rv]
- else:
- return buf.raw[:rv]
diff --git a/python/common/frameio/third_party/oftest/netutils.py b/python/common/frameio/third_party/oftest/netutils.py
deleted file mode 100644
index 092d490..0000000
--- a/python/common/frameio/third_party/oftest/netutils.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-Network utilities for the OpenFlow test framework
-"""
-
-###########################################################################
-## ##
-## Promiscuous mode enable/disable ##
-## ##
-## Based on code from Scapy by Phillippe Biondi ##
-## ##
-## ##
-## This program is free software; you can redistribute it and/or modify it ##
-## under the terms of the GNU General Public License as published by the ##
-## Free Software Foundation; either version 2, or (at your option) any ##
-## later version. ##
-## ##
-## This program is distributed in the hope that it will be useful, but ##
-## WITHOUT ANY WARRANTY; without even the implied warranty of ##
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ##
-## General Public License for more details. ##
-## ##
-#############################################################################
-
-import socket
-from fcntl import ioctl
-import struct
-
-# From net/if_arp.h
-ARPHDR_ETHER = 1
-ARPHDR_LOOPBACK = 772
-
-# From bits/ioctls.h
-SIOCGIFHWADDR = 0x8927 # Get hardware address
-SIOCGIFINDEX = 0x8933 # name -> if_index mapping
-
-# From netpacket/packet.h
-PACKET_ADD_MEMBERSHIP = 1
-PACKET_DROP_MEMBERSHIP = 2
-PACKET_MR_PROMISC = 1
-
-# From bits/socket.h
-SOL_PACKET = 263
-
-def get_if(iff,cmd):
- s=socket.socket()
- ifreq = ioctl(s, cmd, struct.pack("16s16x",iff))
- s.close()
- return ifreq
-
-def get_if_index(iff):
- return int(struct.unpack("I",get_if(iff, SIOCGIFINDEX)[16:20])[0])
-
-def set_promisc(s,iff,val=1):
- mreq = struct.pack("IHH8s", get_if_index(iff), PACKET_MR_PROMISC, 0, "")
- if val:
- cmd = PACKET_ADD_MEMBERSHIP
- else:
- cmd = PACKET_DROP_MEMBERSHIP
- s.setsockopt(SOL_PACKET, cmd, mreq)
-
diff --git a/python/common/kvstore/__init__.py b/python/common/kvstore/__init__.py
deleted file mode 100644
index 4a82628..0000000
--- a/python/common/kvstore/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2018-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/python/common/kvstore/consul_client.py b/python/common/kvstore/consul_client.py
deleted file mode 100644
index bc14759..0000000
--- a/python/common/kvstore/consul_client.py
+++ /dev/null
@@ -1,304 +0,0 @@
-# Copyright 2018-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from common.kvstore.kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair, RETRY_BACKOFF
-from common.utils.asleep import asleep
-from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
-from consul import ConsulException
-from consul.twisted import Consul
-from structlog import get_logger
-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
-
-log = get_logger()
-
-class ConsulClient(KVClient):
-
- def __init__(self, kv_host, kv_port):
- KVClient.__init__(self, kv_host, kv_port)
- self.session_id = None
- self.client = Consul(kv_host, kv_port)
-
- def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
- self._retriggering_watch(key, key_change_callback, timeout)
-
- @inlineCallbacks
- def _retriggering_watch(self, key, key_change_callback, timeout):
- self.key_watches[key] = ConsulWatch(self.client, key, key_change_callback, timeout)
- yield self.key_watches[key].start()
-
- def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
- if key in self.key_watches:
- self.key_watches[key].stop()
-
- @inlineCallbacks
- def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
- log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
- err = None
- result = None
- while True:
- try:
- if operation == 'GET':
- result = yield self._get(key, **kw)
- elif operation == 'LIST':
- result, err = yield self._list(key)
- elif operation == 'PUT':
- # Put returns a boolean response
- result = yield self.client.kv.put(key, value)
- if not result:
- err = 'put-failed'
- elif operation == 'DELETE':
- # Delete returns a boolean response
- result = yield self.client.kv.delete(key)
- if not result:
- err = 'delete-failed'
- elif operation == 'RESERVE':
- result, err = yield self._reserve(key, value, **kw)
- elif operation == 'RENEW':
- result, err = yield self._renew_reservation(key)
- elif operation == 'RELEASE':
- result, err = yield self._release_reservation(key)
- elif operation == 'RELEASE-ALL':
- err = yield self._release_all_reservations()
- self._clear_backoff()
- break
- except ConsulException as ex:
- if 'ConnectionRefusedError' in ex.message:
- log.exception('comms-exception', ex=ex)
- yield self._backoff('consul-not-up')
- else:
- log.error('consul-specific-exception', ex=ex)
- err = ex
- except Exception as ex:
- log.error('consul-exception', ex=ex)
- err = ex
-
- if timeout > 0 and self.retry_time > timeout:
- err = 'operation-timed-out'
- if err is not None:
- self._clear_backoff()
- break
-
- returnValue((result,err))
-
- @inlineCallbacks
- def _get(self, key, **kw):
- kvp = None
- index, rec = yield self.client.kv.get(key, **kw)
- if rec is not None:
- kvp = KVPair(rec['Key'], rec['Value'], index)
- returnValue(kvp)
-
- @inlineCallbacks
- def _list(self, key):
- err = None
- list = []
- index, recs = yield self.client.kv.get(key, recurse=True)
- for rec in recs:
- list.append(KVPair(rec['Key'], rec['Value'], rec['ModifyIndex']))
- returnValue((list, err))
-
- @inlineCallbacks
- def _reserve(self, key, value, **kw):
- for name, val in kw.items():
- if name == 'ttl':
- ttl = val
- break
- reserved = False
- err = 'reservation-failed'
- owner = None
-
- # Create a session
- self.session_id = yield self.client.session.create(behavior='delete',
- ttl=ttl) # lock_delay=1)
- log.debug('create-session', id=self.session_id)
- # Try to acquire the key
- result = yield self.client.kv.put(key, value, acquire=self.session_id)
- log.debug('key-acquire', key=key, value=value, sess=self.session_id, result=result)
-
- # Check if reservation succeeded
- index, record = yield self.client.kv.get(key)
- if record is not None and 'Value' in record:
- owner = record['Value']
- log.debug('get-key', session=record['Session'], owner=owner)
- if record['Session'] == self.session_id and owner == value:
- reserved = True
- log.debug('key-reserved', key=key, value=value, ttl=ttl)
- # Add key to reservation list
- self.key_reservations[key] = self.session_id
- else:
- log.debug('reservation-held-by-another', owner=owner)
-
- if reserved:
- err = None
- returnValue((owner, err))
-
- @inlineCallbacks
- def _renew_reservation(self, key):
- result = None
- err = None
- if key not in self.key_reservations:
- err = 'key-not-reserved'
- else:
- session_id = self.key_reservations[key]
- # A successfully renewed session returns an object with fields:
- # Node, CreateIndex, Name, ModifyIndex, ID, Behavior, TTL,
- # LockDelay, and Checks
- result = yield self.client.session.renew(session_id=session_id)
- log.debug('session-renew', result=result)
- if result is None:
- err = 'session-renewal-failed'
- returnValue((result, err))
-
- @inlineCallbacks
- def _release_reservation(self, key):
- err = None
- if key not in self.key_reservations:
- err = 'key-not-reserved'
- else:
- session_id = self.key_reservations[key]
- # A successfully destroyed session returns a boolean result
- success = yield self.client.session.destroy(session_id)
- log.debug('session-destroy', result=success)
- if not success:
- err = 'session-destroy-failed'
- self.session_id = None
- self.key_reservations.pop(key)
- returnValue((success, err))
-
- @inlineCallbacks
- def _release_all_reservations(self):
- err = None
- keys_to_delete = []
- for key in self.key_reservations:
- session_id = self.key_reservations[key]
- # A successfully destroyed session returns a boolean result
- success = yield self.client.session.destroy(session_id)
- if not success:
- err = 'session-destroy-failed'
- log.debug('session-destroy', id=session_id, result=success)
- self.session_id = None
- keys_to_delete.append(key)
- for key in keys_to_delete:
- self.key_reservations.pop(key)
- returnValue(err)
-
-
-class ConsulWatch():
-
- def __init__(self, consul, key, callback, timeout):
- self.client = consul
- self.key = key
- self.index = None
- self.callback = callback
- self.timeout = timeout
- self.period = 60
- self.running = True
- self.retries = 0
- self.retry_time = 0
-
- @inlineCallbacks
- def start(self):
- self.running = True
- index, rec = yield self._get_with_retry(self.key, None,
- timeout=self.timeout)
- self.index = str(index)
-
- @inlineCallbacks
- def _get(key, deferred):
- try:
- index, rec = yield self._get_with_retry(key, None,
- timeout=self.timeout,
- index=self.index)
- self.index = str(index)
- if not deferred.called:
- log.debug('got-result-cancelling-deferred')
- deferred.callback((self.index, rec))
- except Exception as e:
- log.exception('got-exception', e=e)
-
- while self.running:
- try:
- rcvd = DeferredWithTimeout(timeout=self.period)
- _get(self.key, rcvd)
- try:
- # Update index for next watch iteration
- index, rec = yield rcvd
- log.debug('event-received', index=index, rec=rec)
- # Notify client of key change event
- if rec is None:
- # Key has been deleted
- self._send_event(Event(Event.DELETE, self.key, None))
- else:
- self._send_event(Event(Event.PUT, rec['Key'], rec['Value']))
- except TimeOutError as e:
- log.debug('no-events-over-watch-period', key=self.key)
- except Exception as e:
- log.exception('exception', e=e)
- except Exception as e:
- log.exception('exception', e=e)
-
- log.debug('close-watch', key=self.key)
-
- def stop(self):
- self.running = False
- self.callback = None
-
- @inlineCallbacks
- def _get_with_retry(self, key, value, timeout, *args, **kw):
- log.debug('watch-period', key=key, period=self.period, timeout=timeout, args=args, kw=kw)
- err = None
- result = None
- while True:
- try:
- result = yield self.client.kv.get(key, **kw)
- self._clear_backoff()
- break
- except ConsulException as ex:
- err = ex
- if 'ConnectionRefusedError' in ex.message:
- self._send_event(Event(Event.CONNECTION_DOWN, self.key, None))
- log.exception('comms-exception', ex=ex)
- yield self._backoff('consul-not-up')
- else:
- log.error('consul-specific-exception', ex=ex)
- except Exception as ex:
- err = ex
- log.error('consul-exception', ex=ex)
-
- if timeout > 0 and self.retry_time > timeout:
- err = 'operation-timed-out'
- if err is not None:
- self._clear_backoff()
- break
-
- returnValue(result)
-
- def _send_event(self, event):
- if self.callback is not None:
- self.callback(event)
-
- def _backoff(self, msg):
- wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
- self.retry_time += wait_time
- self.retries += 1
- log.error(msg, next_retry_in_secs=wait_time,
- total_delay_in_secs = self.retry_time,
- retries=self.retries)
- return asleep(wait_time)
-
- def _clear_backoff(self):
- if self.retries:
- log.debug('reconnected-to-kv', after_retries=self.retries)
- self.retries = 0
- self.retry_time = 0
diff --git a/python/common/kvstore/etcd_client.py b/python/common/kvstore/etcd_client.py
deleted file mode 100644
index a958b71..0000000
--- a/python/common/kvstore/etcd_client.py
+++ /dev/null
@@ -1,240 +0,0 @@
-# Copyright 2018-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-################################################################################
-#
-# Most of the txaioetcd methods provide a timeout parameter. This parameter
-# is likely intended to limit the amount of time spent by any one method
-# waiting for a response from the etcd server. However, if the server is
-# down, the method immediately throws a ConnectionRefusedError exception;
-# it does not perform any retries. The timeout parameter provided by the
-# methods in EtcdClient cover this contingency.
-#
-################################################################################
-
-from common.kvstore.kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair
-from structlog import get_logger
-from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
-from twisted.internet.error import ConnectionRefusedError
-from txaioetcd import Client, CompVersion, Failed, KeySet, OpGet, OpSet, Transaction
-
-log = get_logger()
-
-class EtcdClient(KVClient):
-
- def __init__(self, kv_host, kv_port):
- KVClient.__init__(self, kv_host, kv_port)
- self.url = u'http://' + kv_host + u':' + str(kv_port)
- self.client = Client(reactor, self.url)
-
- @inlineCallbacks
- def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
- self.key_watches[key] = key_change_callback
- result = yield self._op_with_retry('WATCH', key, None, timeout, callback=self.key_changed)
- returnValue(result)
-
- def key_changed(self, kv):
- key = kv.key
- value = kv.value
- log.debug('key-changed', key=key, value=value)
- # Notify client of key change event
- if value is not None:
- evt = Event(Event.PUT, key, value)
- else:
- evt = Event(Event.DELETE, key, None)
- if key in self.key_watches:
- self.key_watches[key](evt)
-
- def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
- log.debug('close-watch', key=key)
- if key in self.key_watches:
- self.key_watches.pop(key)
-
- @inlineCallbacks
- def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
- log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
- err = None
- result = None
- if type(key) == str:
- key = bytes(key)
- if value is not None:
- value = bytes(value)
- while True:
- try:
- if operation == 'GET':
- result = yield self._get(key)
- elif operation == 'LIST':
- result, err = yield self._list(key)
- elif operation == 'PUT':
- # Put returns an object of type Revision
- result = yield self.client.set(key, value, **kw)
- elif operation == 'DELETE':
- # Delete returns an object of type Deleted
- result = yield self.client.delete(key)
- elif operation == 'RESERVE':
- result, err = yield self._reserve(key, value, **kw)
- elif operation == 'RENEW':
- result, err = yield self._renew_reservation(key)
- elif operation == 'RELEASE':
- result, err = yield self._release_reservation(key)
- elif operation == 'RELEASE-ALL':
- err = yield self._release_all_reservations()
- elif operation == 'WATCH':
- for name, val in kw.items():
- if name == 'callback':
- callback = val
- break
- result = self.client.watch([KeySet(key, prefix=True)], callback)
- self._clear_backoff()
- break
- except ConnectionRefusedError as ex:
- log.error('comms-exception', ex=ex)
- yield self._backoff('etcd-not-up')
- except Exception as ex:
- log.error('etcd-exception', ex=ex)
- err = ex
-
- if timeout > 0 and self.retry_time > timeout:
- err = 'operation-timed-out'
- if err is not None:
- self._clear_backoff()
- break
-
- returnValue((result, err))
-
- @inlineCallbacks
- def _get(self, key):
- kvp = None
- resp = yield self.client.get(key)
- if resp.kvs is not None and len(resp.kvs) == 1:
- kv = resp.kvs[0]
- kvp = KVPair(kv.key, kv.value, kv.mod_revision)
- returnValue(kvp)
-
- @inlineCallbacks
- def _list(self, key):
- err = None
- list = []
- resp = yield self.client.get(KeySet(key, prefix=True))
- if resp.kvs is not None and len(resp.kvs) > 0:
- for kv in resp.kvs:
- list.append(KVPair(kv.key, kv.value, kv.mod_revision))
- returnValue((list, err))
-
- @inlineCallbacks
- def _reserve(self, key, value, **kw):
- for name, val in kw.items():
- if name == 'ttl':
- ttl = val
- break
- reserved = False
- err = 'reservation-failed'
- owner = None
-
- # Create a lease
- lease = yield self.client.lease(ttl)
-
- # Create a transaction
- txn = Transaction(
- compare=[ CompVersion(key, '==', 0) ],
- success=[ OpSet(key, bytes(value), lease=lease) ],
- failure=[ OpGet(key) ]
- )
- newly_acquired = False
- try:
- result = yield self.client.submit(txn)
- except Failed as failed:
- log.debug('key-already-present', key=key)
- if len(failed.responses) > 0:
- response = failed.responses[0]
- if response.kvs is not None and len(response.kvs) > 0:
- kv = response.kvs[0]
- log.debug('key-already-present', value=kv.value)
- if kv.value == value:
- reserved = True
- log.debug('key-already-reserved', key = kv.key, value=kv.value)
- else:
- newly_acquired = True
- log.debug('key-was-absent', key=key, result=result)
-
- # Check if reservation succeeded
- resp = yield self.client.get(key)
- if resp.kvs is not None and len(resp.kvs) == 1:
- owner = resp.kvs[0].value
- if owner == value:
- if newly_acquired:
- log.debug('key-reserved', key=key, value=value, ttl=ttl,
- lease_id=lease.lease_id)
- reserved = True
- # Add key to reservation list
- self.key_reservations[key] = lease
- else:
- log.debug("reservation-still-held")
- else:
- log.debug('reservation-held-by-another', value=owner)
-
- if reserved:
- err = None
- returnValue((owner, err))
-
- @inlineCallbacks
- def _renew_reservation(self, key):
- result = None
- err = None
- if key not in self.key_reservations:
- err = 'key-not-reserved'
- else:
- lease = self.key_reservations[key]
- # A successfully refreshed lease returns an object of type Header
- result = yield lease.refresh()
- if result is None:
- err = 'lease-refresh-failed'
- returnValue((result, err))
-
- @inlineCallbacks
- def _release_reservation(self, key):
- err = None
- if key not in self.key_reservations:
- err = 'key-not-reserved'
- else:
- lease = self.key_reservations[key]
- time_left = yield lease.remaining()
- # A successfully revoked lease returns an object of type Header
- log.debug('release-reservation', key=key, lease_id=lease.lease_id,
- time_left_in_secs=time_left)
- result = yield lease.revoke()
- if result is None:
- err = 'lease-revoke-failed'
- self.key_reservations.pop(key)
- returnValue((result, err))
-
- @inlineCallbacks
- def _release_all_reservations(self):
- err = None
- keys_to_delete = []
- for key in self.key_reservations:
- lease = self.key_reservations[key]
- time_left = yield lease.remaining()
- # A successfully revoked lease returns an object of type Header
- log.debug('release-reservation', key=key, lease_id=lease.lease_id,
- time_left_in_secs=time_left)
- result = yield lease.revoke()
- if result is None:
- err = 'lease-revoke-failed'
- log.debug('lease-revoke', result=result)
- keys_to_delete.append(key)
- for key in keys_to_delete:
- self.key_reservations.pop(key)
- returnValue(err)
diff --git a/python/common/kvstore/kv_client.py b/python/common/kvstore/kv_client.py
deleted file mode 100644
index 69a6480..0000000
--- a/python/common/kvstore/kv_client.py
+++ /dev/null
@@ -1,206 +0,0 @@
-# Copyright 2018-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from common.utils.asleep import asleep
-from structlog import get_logger
-from twisted.internet.defer import inlineCallbacks, returnValue
-
-log = get_logger()
-
-class KVPair():
- def __init__(self, key, value, index):
- self.key = key
- self.value = value
- self.index = index
-
-class Event():
- PUT = 0
- DELETE = 1
- CONNECTION_DOWN = 2
-
- def __init__(self, event_type, key, value):
- self.event_type = event_type
- self.key = key
- self.value = value
-
-RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
-DEFAULT_TIMEOUT = 0.0
-for i in range(len(RETRY_BACKOFF)):
- DEFAULT_TIMEOUT += RETRY_BACKOFF[i]
-
-class KVClient():
-
- def __init__(self, kv_host, kv_port):
- self.host = kv_host
- self.port = kv_port
- self.key_reservations = {}
- self.key_watches = {}
- self.retries = 0
- self.retry_time = 0
-
- @inlineCallbacks
- def get(self, key, timeout=DEFAULT_TIMEOUT):
- '''
- This method returns the value of the given key in KV store.
-
- :param key: The key whose value is requested
- :param timeout: The length of time in seconds the method will wait for a response
- :return: (KVPair, error) where KVPair is None if an error occurred
- '''
- result = yield self._op_with_retry('GET', key, None, timeout)
- returnValue(result)
-
- @inlineCallbacks
- def list(self, key, timeout=DEFAULT_TIMEOUT):
- '''
- The list method returns an array of key-value pairs all of which
- share the same key prefix.
-
- :param key: The key prefix
- :param timeout: The length of time in seconds the method will wait for a response
- :return: ([]KVPair, error) where []KVPair is a list of KVPair objects
- '''
- result = yield self._op_with_retry('LIST', key, None, timeout)
- returnValue(result)
-
- @inlineCallbacks
- def put(self, key, value, timeout=DEFAULT_TIMEOUT):
- '''
- The put method writes a value to the given key in KV store.
- Do NOT modify a reserved key in an etcd store; doing so seems
- to nullify the TTL of the key. In other words, the key lasts
- forever.
-
- :param key: The key to be written to
- :param value: The value of the key
- :param timeout: The length of time in seconds the method will wait for a response
- :return: error, which is set to None for a successful write
- '''
- _, err = yield self._op_with_retry('PUT', key, value, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def delete(self, key, timeout=DEFAULT_TIMEOUT):
- '''
- The delete method removes a key from the KV store.
-
- :param key: The key to be deleted
- :param timeout: The length of time in seconds the method will wait for a response
- :return: error, which is set to None for a successful deletion
- '''
- _, err = yield self._op_with_retry('DELETE', key, None, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
- '''
- This method acts essentially like a semaphore. The underlying mechanism
- differs depending on the KV store: etcd uses a test-and-set transaction;
- consul uses an acquire lock. If using etcd, do NOT write to the key
- subsequent to the initial reservation; the TTL functionality may become
- impaired (i.e. the reservation never expires).
-
- :param key: The key under reservation
- :param value: The reservation owner
- :param ttl: The time-to-live (TTL) for the reservation. The key is unreserved
- by the KV store when the TTL expires.
- :param timeout: The length of time in seconds the method will wait for a response
- :return: (key_value, error) If the key is acquired, then the value returned will
- be the value passed in. If the key is already acquired, then the value assigned
- to that key will be returned.
- '''
- result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
- returnValue(result)
-
- @inlineCallbacks
- def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
- '''
- This method renews the reservation for a given key. A reservation expires
- after the TTL (Time To Live) period specified when reserving the key.
-
- :param key: The reserved key
- :param timeout: The length of time in seconds the method will wait for a response
- :return: error, which is set to None for a successful renewal
- '''
- result, err = yield self._op_with_retry('RENEW', key, None, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
- '''
- The release_reservation method cancels the reservation for a given key.
-
- :param key: The reserved key
- :param timeout: The length of time in seconds the method will wait for a response
- :return: error, which is set to None for a successful cancellation
- '''
- result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
- '''
- This method cancels all key reservations made previously
- using the reserve API.
-
- :param timeout: The length of time in seconds the method will wait for a response
- :return: error, which is set to None for a successful cancellation
- '''
- result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
- '''
- This method provides a watch capability for the given key. If the value of the key
- changes or the key is deleted, then an event indicating the change is passed to
- the given callback function.
-
- :param key: The key to be watched
- :param key_change_callback: The function invoked whenever the key changes
- :param timeout: The length of time in seconds the method will wait for a response
- :return: There is no return; key change events are passed to the callback function
- '''
- raise NotImplementedError('Method not implemented')
-
- @inlineCallbacks
- def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
- '''
- This method closes the watch on the given key. Once the watch is closed, key
- change events are no longer passed to the key change callback function.
-
- :param key: The key under watch
- :param timeout: The length of time in seconds the method will wait for a response
- :return: There is no return
- '''
- raise NotImplementedError('Method not implemented')
-
- @inlineCallbacks
- def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
- raise NotImplementedError('Method not implemented')
-
- def _backoff(self, msg):
- wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
- self.retry_time += wait_time
- self.retries += 1
- log.error(msg, next_retry_in_secs=wait_time,
- total_delay_in_secs = self.retry_time,
- retries=self.retries)
- return asleep(wait_time)
-
- def _clear_backoff(self):
- if self.retries:
- log.debug('reset-backoff', after_retries=self.retries)
- self.retries = 0
- self.retry_time = 0
\ No newline at end of file
diff --git a/python/common/kvstore/kvstore.py b/python/common/kvstore/kvstore.py
deleted file mode 100644
index 662b34d..0000000
--- a/python/common/kvstore/kvstore.py
+++ /dev/null
@@ -1,31 +0,0 @@
-# Copyright 2018-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from common.kvstore.consul_client import ConsulClient
-from common.kvstore.etcd_client import EtcdClient
-
-def create_kv_client(kv_store, host, port):
- '''
- Factory for creating a client interface to a KV store
-
- :param kv_store: Specify either 'etcd' or 'consul'
- :param host: Name or IP address of host serving the KV store
- :param port: Port number (integer) of the KV service
- :return: Reference to newly created client interface
- '''
- if kv_store == 'etcd':
- return EtcdClient(host, port)
- elif kv_store == 'consul':
- return ConsulClient(host, port)
- return None
diff --git a/python/common/frameio/__init__.py b/python/common/openflow/__init__.py
similarity index 100%
rename from python/common/frameio/__init__.py
rename to python/common/openflow/__init__.py
diff --git a/python/common/openflow/utils.py b/python/common/openflow/utils.py
new file mode 100644
index 0000000..456ae06
--- /dev/null
+++ b/python/common/openflow/utils.py
@@ -0,0 +1,558 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+
+from python.protos import openflow_13_pb2 as ofp
+from hashlib import md5
+
+log = structlog.get_logger()
+
+# aliases
+ofb_field = ofp.ofp_oxm_ofb_field
+action = ofp.ofp_action
+
+# OFPAT_* shortcuts
+OUTPUT = ofp.OFPAT_OUTPUT
+COPY_TTL_OUT = ofp.OFPAT_COPY_TTL_OUT
+COPY_TTL_IN = ofp.OFPAT_COPY_TTL_IN
+SET_MPLS_TTL = ofp.OFPAT_SET_MPLS_TTL
+DEC_MPLS_TTL = ofp.OFPAT_DEC_MPLS_TTL
+PUSH_VLAN = ofp.OFPAT_PUSH_VLAN
+POP_VLAN = ofp.OFPAT_POP_VLAN
+PUSH_MPLS = ofp.OFPAT_PUSH_MPLS
+POP_MPLS = ofp.OFPAT_POP_MPLS
+SET_QUEUE = ofp.OFPAT_SET_QUEUE
+GROUP = ofp.OFPAT_GROUP
+SET_NW_TTL = ofp.OFPAT_SET_NW_TTL
+NW_TTL = ofp.OFPAT_DEC_NW_TTL
+SET_FIELD = ofp.OFPAT_SET_FIELD
+PUSH_PBB = ofp.OFPAT_PUSH_PBB
+POP_PBB = ofp.OFPAT_POP_PBB
+EXPERIMENTER = ofp.OFPAT_EXPERIMENTER
+
+# OFPXMT_OFB_* shortcuts (incomplete)
+IN_PORT = ofp.OFPXMT_OFB_IN_PORT
+IN_PHY_PORT = ofp.OFPXMT_OFB_IN_PHY_PORT
+METADATA = ofp.OFPXMT_OFB_METADATA
+ETH_DST = ofp.OFPXMT_OFB_ETH_DST
+ETH_SRC = ofp.OFPXMT_OFB_ETH_SRC
+ETH_TYPE = ofp.OFPXMT_OFB_ETH_TYPE
+VLAN_VID = ofp.OFPXMT_OFB_VLAN_VID
+VLAN_PCP = ofp.OFPXMT_OFB_VLAN_PCP
+IP_DSCP = ofp.OFPXMT_OFB_IP_DSCP
+IP_ECN = ofp.OFPXMT_OFB_IP_ECN
+IP_PROTO = ofp.OFPXMT_OFB_IP_PROTO
+IPV4_SRC = ofp.OFPXMT_OFB_IPV4_SRC
+IPV4_DST = ofp.OFPXMT_OFB_IPV4_DST
+TCP_SRC = ofp.OFPXMT_OFB_TCP_SRC
+TCP_DST = ofp.OFPXMT_OFB_TCP_DST
+UDP_SRC = ofp.OFPXMT_OFB_UDP_SRC
+UDP_DST = ofp.OFPXMT_OFB_UDP_DST
+SCTP_SRC = ofp.OFPXMT_OFB_SCTP_SRC
+SCTP_DST = ofp.OFPXMT_OFB_SCTP_DST
+ICMPV4_TYPE = ofp.OFPXMT_OFB_ICMPV4_TYPE
+ICMPV4_CODE = ofp.OFPXMT_OFB_ICMPV4_CODE
+ARP_OP = ofp.OFPXMT_OFB_ARP_OP
+ARP_SPA = ofp.OFPXMT_OFB_ARP_SPA
+ARP_TPA = ofp.OFPXMT_OFB_ARP_TPA
+ARP_SHA = ofp.OFPXMT_OFB_ARP_SHA
+ARP_THA = ofp.OFPXMT_OFB_ARP_THA
+IPV6_SRC = ofp.OFPXMT_OFB_IPV6_SRC
+IPV6_DST = ofp.OFPXMT_OFB_IPV6_DST
+IPV6_FLABEL = ofp.OFPXMT_OFB_IPV6_FLABEL
+ICMPV6_TYPE = ofp.OFPXMT_OFB_ICMPV6_TYPE
+ICMPV6_CODE = ofp.OFPXMT_OFB_ICMPV6_CODE
+IPV6_ND_TARGET = ofp.OFPXMT_OFB_IPV6_ND_TARGET
+OFB_IPV6_ND_SLL = ofp.OFPXMT_OFB_IPV6_ND_SLL
+IPV6_ND_TLL = ofp.OFPXMT_OFB_IPV6_ND_TLL
+MPLS_LABEL = ofp.OFPXMT_OFB_MPLS_LABEL
+MPLS_TC = ofp.OFPXMT_OFB_MPLS_TC
+MPLS_BOS = ofp.OFPXMT_OFB_MPLS_BOS
+PBB_ISID = ofp.OFPXMT_OFB_PBB_ISID
+TUNNEL_ID = ofp.OFPXMT_OFB_TUNNEL_ID
+IPV6_EXTHDR = ofp.OFPXMT_OFB_IPV6_EXTHDR
+
+
+# ofp_action_* shortcuts
+
+def output(port, max_len=ofp.OFPCML_MAX):
+ return action(
+ type=OUTPUT,
+ output=ofp.ofp_action_output(port=port, max_len=max_len)
+ )
+
+
+def mpls_ttl(ttl):
+ return action(
+ type=SET_MPLS_TTL,
+ mpls_ttl=ofp.ofp_action_mpls_ttl(mpls_ttl=ttl)
+ )
+
+
+def push_vlan(eth_type):
+ return action(
+ type=PUSH_VLAN,
+ push=ofp.ofp_action_push(ethertype=eth_type)
+ )
+
+
+def pop_vlan():
+ return action(
+ type=POP_VLAN
+ )
+
+
+def pop_mpls(eth_type):
+ return action(
+ type=POP_MPLS,
+ pop_mpls=ofp.ofp_action_pop_mpls(ethertype=eth_type)
+ )
+
+
+def group(group_id):
+ return action(
+ type=GROUP,
+ group=ofp.ofp_action_group(group_id=group_id)
+ )
+
+
+def nw_ttl(nw_ttl):
+ return action(
+ type=NW_TTL,
+ nw_ttl=ofp.ofp_action_nw_ttl(nw_ttl=nw_ttl)
+ )
+
+
+def set_field(field):
+ return action(
+ type=SET_FIELD,
+ set_field=ofp.ofp_action_set_field(
+ field=ofp.ofp_oxm_field(
+ oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+ ofb_field=field))
+ )
+
+
+def experimenter(experimenter, data):
+ return action(
+ type=EXPERIMENTER,
+ experimenter=ofp.ofp_action_experimenter(
+ experimenter=experimenter, data=data)
+ )
+
+
+# ofb_field generators (incomplete set)
+
+def in_port(_in_port):
+ return ofb_field(type=IN_PORT, port=_in_port)
+
+
+def in_phy_port(_in_phy_port):
+ return ofb_field(type=IN_PHY_PORT, port=_in_phy_port)
+
+
+def metadata(_table_metadata):
+ return ofb_field(type=METADATA, table_metadata=_table_metadata)
+
+
+def eth_dst(_eth_dst):
+ return ofb_field(type=ETH_DST, table_metadata=_eth_dst)
+
+
+def eth_src(_eth_src):
+ return ofb_field(type=ETH_SRC, table_metadata=_eth_src)
+
+
+def eth_type(_eth_type):
+ return ofb_field(type=ETH_TYPE, eth_type=_eth_type)
+
+
+def vlan_vid(_vlan_vid):
+ return ofb_field(type=VLAN_VID, vlan_vid=_vlan_vid)
+
+
+def vlan_pcp(_vlan_pcp):
+ return ofb_field(type=VLAN_PCP, vlan_pcp=_vlan_pcp)
+
+
+def ip_dscp(_ip_dscp):
+ return ofb_field(type=IP_DSCP, ip_dscp=_ip_dscp)
+
+
+def ip_ecn(_ip_ecn):
+ return ofb_field(type=IP_ECN, ip_ecn=_ip_ecn)
+
+
+def ip_proto(_ip_proto):
+ return ofb_field(type=IP_PROTO, ip_proto=_ip_proto)
+
+
+def ipv4_src(_ipv4_src):
+ return ofb_field(type=IPV4_SRC, ipv4_src=_ipv4_src)
+
+
+def ipv4_dst(_ipv4_dst):
+ return ofb_field(type=IPV4_DST, ipv4_dst=_ipv4_dst)
+
+
+def tcp_src(_tcp_src):
+ return ofb_field(type=TCP_SRC, tcp_src=_tcp_src)
+
+
+def tcp_dst(_tcp_dst):
+ return ofb_field(type=TCP_DST, tcp_dst=_tcp_dst)
+
+
+def udp_src(_udp_src):
+ return ofb_field(type=UDP_SRC, udp_src=_udp_src)
+
+
+def udp_dst(_udp_dst):
+ return ofb_field(type=UDP_DST, udp_dst=_udp_dst)
+
+
+def sctp_src(_sctp_src):
+ return ofb_field(type=SCTP_SRC, sctp_src=_sctp_src)
+
+
+def sctp_dst(_sctp_dst):
+ return ofb_field(type=SCTP_DST, sctp_dst=_sctp_dst)
+
+
+def icmpv4_type(_icmpv4_type):
+ return ofb_field(type=ICMPV4_TYPE, icmpv4_type=_icmpv4_type)
+
+
+def icmpv4_code(_icmpv4_code):
+ return ofb_field(type=ICMPV4_CODE, icmpv4_code=_icmpv4_code)
+
+
+def arp_op(_arp_op):
+ return ofb_field(type=ARP_OP, arp_op=_arp_op)
+
+
+def arp_spa(_arp_spa):
+ return ofb_field(type=ARP_SPA, arp_spa=_arp_spa)
+
+
+def arp_tpa(_arp_tpa):
+ return ofb_field(type=ARP_TPA, arp_tpa=_arp_tpa)
+
+
+def arp_sha(_arp_sha):
+ return ofb_field(type=ARP_SHA, arp_sha=_arp_sha)
+
+
+def arp_tha(_arp_tha):
+ return ofb_field(type=ARP_THA, arp_tha=_arp_tha)
+
+
+def ipv6_src(_ipv6_src):
+ return ofb_field(type=IPV6_SRC, arp_tha=_ipv6_src)
+
+
+def ipv6_dst(_ipv6_dst):
+ return ofb_field(type=IPV6_DST, arp_tha=_ipv6_dst)
+
+
+def ipv6_flabel(_ipv6_flabel):
+ return ofb_field(type=IPV6_FLABEL, arp_tha=_ipv6_flabel)
+
+
+def ipmpv6_type(_icmpv6_type):
+ return ofb_field(type=ICMPV6_TYPE, arp_tha=_icmpv6_type)
+
+
+def icmpv6_code(_icmpv6_code):
+ return ofb_field(type=ICMPV6_CODE, arp_tha=_icmpv6_code)
+
+
+def ipv6_nd_target(_ipv6_nd_target):
+ return ofb_field(type=IPV6_ND_TARGET, arp_tha=_ipv6_nd_target)
+
+
+def ofb_ipv6_nd_sll(_ofb_ipv6_nd_sll):
+ return ofb_field(type=OFB_IPV6_ND_SLL, arp_tha=_ofb_ipv6_nd_sll)
+
+
+def ipv6_nd_tll(_ipv6_nd_tll):
+ return ofb_field(type=IPV6_ND_TLL, arp_tha=_ipv6_nd_tll)
+
+
+def mpls_label(_mpls_label):
+ return ofb_field(type=MPLS_LABEL, arp_tha=_mpls_label)
+
+
+def mpls_tc(_mpls_tc):
+ return ofb_field(type=MPLS_TC, arp_tha=_mpls_tc)
+
+
+def mpls_bos(_mpls_bos):
+ return ofb_field(type=MPLS_BOS, arp_tha=_mpls_bos)
+
+
+def pbb_isid(_pbb_isid):
+ return ofb_field(type=PBB_ISID, arp_tha=_pbb_isid)
+
+
+def tunnel_id(_tunnel_id):
+ return ofb_field(type=TUNNEL_ID, arp_tha=_tunnel_id)
+
+
+def ipv6_exthdr(_ipv6_exthdr):
+ return ofb_field(type=IPV6_EXTHDR, arp_tha=_ipv6_exthdr)
+
+
+# frequently used extractors:
+
+def get_actions(flow):
+ """Extract list of ofp_action objects from flow spec object"""
+ assert isinstance(flow, ofp.ofp_flow_stats)
+ # we have the following hard assumptions for now
+ for instruction in flow.instructions:
+ if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
+ return instruction.actions.actions
+
+
+def get_ofb_fields(flow):
+ assert isinstance(flow, ofp.ofp_flow_stats)
+ assert flow.match.type == ofp.OFPMT_OXM
+ ofb_fields = []
+ for field in flow.match.oxm_fields:
+ assert field.oxm_class == ofp.OFPXMC_OPENFLOW_BASIC
+ ofb_fields.append(field.ofb_field)
+ return ofb_fields
+
+
+def get_out_port(flow):
+ for action in get_actions(flow):
+ if action.type == OUTPUT:
+ return action.output.port
+ return None
+
+
+def get_in_port(flow):
+ for field in get_ofb_fields(flow):
+ if field.type == IN_PORT:
+ return field.port
+ return None
+
+
+def get_goto_table_id(flow):
+ for instruction in flow.instructions:
+ if instruction.type == ofp.OFPIT_GOTO_TABLE:
+ return instruction.goto_table.table_id
+ return None
+
+
+def get_metadata(flow):
+ ''' legacy get method (only want lower 32 bits '''
+ for field in get_ofb_fields(flow):
+ if field.type == METADATA:
+ return field.table_metadata & 0xffffffff
+ return None
+
+
+def get_metadata_64_bit(flow):
+ for field in get_ofb_fields(flow):
+ if field.type == METADATA:
+ return field.table_metadata
+ return None
+
+
+def get_port_number_from_metadata(flow):
+ """
+ The port number (UNI on ONU) is in the lower 32-bits of metadata and
+ the inner_tag is in the upper 32-bits
+
+ This is set in the ONOS OltPipeline as a metadata field
+ """
+ md = get_metadata_64_bit(flow)
+
+ if md is None:
+ return None
+
+ if md <= 0xffffffff:
+ log.warn('onos-upgrade-suggested',
+ netadata=md,
+ message='Legacy MetaData detected form OltPipeline')
+ return md
+
+ return md & 0xffffffff
+
+
+def get_inner_tag_from_metadata(flow):
+ """
+ The port number (UNI on ONU) is in the lower 32-bits of metadata and
+ the inner_tag is in the upper 32-bits
+
+ This is set in the ONOS OltPipeline as a metadata field
+ """
+ md = get_metadata_64_bit(flow)
+
+ if md is None:
+ return None
+
+ if md <= 0xffffffff:
+ log.warn('onos-upgrade-suggested',
+ netadata=md,
+ message='Legacy MetaData detected form OltPipeline')
+ return md
+
+ return (md >> 32) & 0xffffffff
+
+
+# test and extract next table and group information
+def has_next_table(flow):
+ return get_goto_table_id(flow) is not None
+
+
+def get_group(flow):
+ for action in get_actions(flow):
+ if action.type == GROUP:
+ return action.group.group_id
+ return None
+
+
+def has_group(flow):
+ return get_group(flow) is not None
+
+
+def mk_oxm_fields(match_fields):
+ oxm_fields = [
+ ofp.ofp_oxm_field(
+ oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+ ofb_field=field
+ ) for field in match_fields
+ ]
+
+ return oxm_fields
+
+
+def mk_instructions_from_actions(actions):
+ instructions_action = ofp.ofp_instruction_actions()
+ instructions_action.actions.extend(actions)
+ instruction = ofp.ofp_instruction(type=ofp.OFPIT_APPLY_ACTIONS,
+ actions=instructions_action)
+ return [instruction]
+
+
+def mk_simple_flow_mod(match_fields, actions, command=ofp.OFPFC_ADD,
+ next_table_id=None, **kw):
+ """
+ Convenience function to generare ofp_flow_mod message with OXM BASIC match
+ composed from the match_fields, and single APPLY_ACTIONS instruction with
+ a list if ofp_action objects.
+ :param match_fields: list(ofp_oxm_ofb_field)
+ :param actions: list(ofp_action)
+ :param command: one of OFPFC_*
+ :param kw: additional keyword-based params to ofp_flow_mod
+ :return: initialized ofp_flow_mod object
+ """
+ instructions = [
+ ofp.ofp_instruction(
+ type=ofp.OFPIT_APPLY_ACTIONS,
+ actions=ofp.ofp_instruction_actions(actions=actions)
+ )
+ ]
+ if next_table_id is not None:
+ instructions.append(ofp.ofp_instruction(
+ type=ofp.OFPIT_GOTO_TABLE,
+ goto_table=ofp.ofp_instruction_goto_table(table_id=next_table_id)
+ ))
+
+ return ofp.ofp_flow_mod(
+ command=command,
+ match=ofp.ofp_match(
+ type=ofp.OFPMT_OXM,
+ oxm_fields=[
+ ofp.ofp_oxm_field(
+ oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
+ ofb_field=field
+ ) for field in match_fields
+ ]
+ ),
+ instructions=instructions,
+ **kw
+ )
+
+
+def mk_multicast_group_mod(group_id, buckets, command=ofp.OFPGC_ADD):
+ group = ofp.ofp_group_mod(
+ command=command,
+ type=ofp.OFPGT_ALL,
+ group_id=group_id,
+ buckets=buckets
+ )
+ return group
+
+
+def hash_flow_stats(flow):
+ """
+ Return unique 64-bit integer hash for flow covering the following
+ attributes: 'table_id', 'priority', 'flags', 'cookie', 'match', '_instruction_string'
+ """
+ _instruction_string = ""
+ for _instruction in flow.instructions:
+ _instruction_string += _instruction.SerializeToString()
+
+ hex = md5('{},{},{},{},{},{}'.format(
+ flow.table_id,
+ flow.priority,
+ flow.flags,
+ flow.cookie,
+ flow.match.SerializeToString(),
+ _instruction_string
+ )).hexdigest()
+ return int(hex[:16], 16)
+
+
+def flow_stats_entry_from_flow_mod_message(mod):
+ flow = ofp.ofp_flow_stats(
+ table_id=mod.table_id,
+ priority=mod.priority,
+ idle_timeout=mod.idle_timeout,
+ hard_timeout=mod.hard_timeout,
+ flags=mod.flags,
+ cookie=mod.cookie,
+ match=mod.match,
+ instructions=mod.instructions
+ )
+ flow.id = hash_flow_stats(flow)
+ return flow
+
+
+def group_entry_from_group_mod(mod):
+ group = ofp.ofp_group_entry(
+ desc=ofp.ofp_group_desc(
+ type=mod.type,
+ group_id=mod.group_id,
+ buckets=mod.buckets
+ ),
+ stats=ofp.ofp_group_stats(
+ group_id=mod.group_id
+ # TODO do we need to instantiate bucket bins?
+ )
+ )
+ return group
+
+
+def mk_flow_stat(**kw):
+ return flow_stats_entry_from_flow_mod_message(mk_simple_flow_mod(**kw))
+
+
+def mk_group_stat(**kw):
+ return group_entry_from_group_mod(mk_multicast_group_mod(**kw))
diff --git a/python/common/pon_resource_manager/__init__.py b/python/common/pon_resource_manager/__init__.py
deleted file mode 100644
index 2d104e0..0000000
--- a/python/common/pon_resource_manager/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/python/common/pon_resource_manager/resource_kv_store.py b/python/common/pon_resource_manager/resource_kv_store.py
deleted file mode 100644
index a1a5c14..0000000
--- a/python/common/pon_resource_manager/resource_kv_store.py
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-# Copyright 2018 the original author or authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Resource KV store - interface between Resource Manager and backend store."""
-import structlog
-
-from voltha.core.config.config_backend import ConsulStore
-from voltha.core.config.config_backend import EtcdStore
-
-# KV store uses this prefix to store resource info
-PATH_PREFIX = 'resource_manager/{}'
-
-
-class ResourceKvStore(object):
- """Implements apis to store/get/remove resource in backend store."""
-
- def __init__(self, technology, device_id, backend, host, port):
- """
- Create ResourceKvStore object.
-
- Based on backend ('consul' and 'etcd' use the host and port
- to create the respective object.
-
- :param technology: PON technology
- :param device_id: OLT device id
- :param backend: Type of backend storage (etcd or consul)
- :param host: host ip info for backend storage
- :param port: port for the backend storage
- :raises exception when invalid backend store passed as an argument
- """
- # logger
- self._log = structlog.get_logger()
-
- path = PATH_PREFIX.format(technology)
- try:
- if backend == 'consul':
- self._kv_store = ConsulStore(host, port, path)
- elif backend == 'etcd':
- self._kv_store = EtcdStore(host, port, path)
- else:
- self._log.error('Invalid-backend')
- raise Exception("Invalid-backend-for-kv-store")
- except Exception as e:
- self._log.exception("exception-in-init")
- raise Exception(e)
-
- def update_to_kv_store(self, path, resource):
- """
- Update resource.
-
- :param path: path to update the resource
- :param resource: updated resource
- """
- try:
- self._kv_store[path] = str(resource)
- self._log.debug("Resource-updated-in-kv-store", path=path)
- return True
- except BaseException:
- self._log.exception("Resource-update-in-kv-store-failed",
- path=path, resource=resource)
- return False
-
- def get_from_kv_store(self, path):
- """
- Get resource.
-
- :param path: path to get the resource
- """
- resource = None
- try:
- resource = self._kv_store[path]
- self._log.debug("Got-resource-from-kv-store", path=path)
- except KeyError:
- self._log.info("Resource-not-found-updating-resource",
- path=path)
- except BaseException:
- self._log.exception("Getting-resource-from-kv-store-failed",
- path=path)
- return resource
-
- def remove_from_kv_store(self, path):
- """
- Remove resource.
-
- :param path: path to remove the resource
- """
- try:
- del self._kv_store[path]
- self._log.debug("Resource-deleted-in-kv-store", path=path)
- return True
- except BaseException:
- self._log.exception("Resource-delete-in-kv-store-failed",
- path=path)
- return False
diff --git a/python/common/pon_resource_manager/resource_manager.py b/python/common/pon_resource_manager/resource_manager.py
deleted file mode 100644
index 17b2871..0000000
--- a/python/common/pon_resource_manager/resource_manager.py
+++ /dev/null
@@ -1,677 +0,0 @@
-#
-# Copyright 2018 the original author or authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Resource Manager will be unique for each OLT device.
-
-It exposes APIs to create/free alloc_ids/onu_ids/gemport_ids. Resource Manager
-uses a KV store in backend to ensure resiliency of the data.
-"""
-import json
-import structlog
-from bitstring import BitArray
-from ast import literal_eval
-import shlex
-from argparse import ArgumentParser, ArgumentError
-
-from common.pon_resource_manager.resource_kv_store import ResourceKvStore
-
-
-# Used to parse extra arguments to OpenOlt adapter from the NBI
-class OltVendorArgumentParser(ArgumentParser):
- # Must override the exit command to prevent it from
- # calling sys.exit(). Return exception instead.
- def exit(self, status=0, message=None):
- raise Exception(message)
-
-
-class PONResourceManager(object):
- """Implements APIs to initialize/allocate/release alloc/gemport/onu IDs."""
-
- # Constants to identify resource pool
- ONU_ID = 'ONU_ID'
- ALLOC_ID = 'ALLOC_ID'
- GEMPORT_ID = 'GEMPORT_ID'
-
- # The resource ranges for a given device vendor_type should be placed
- # at 'resource_manager/<technology>/resource_ranges/<olt_vendor_type>'
- # path on the KV store.
- # If Resource Range parameters are to be read from the external KV store,
- # they are expected to be stored in the following format.
- # Note: All parameters are MANDATORY for now.
- '''
- {
- "onu_id_start": 1,
- "onu_id_end": 127,
- "alloc_id_start": 1024,
- "alloc_id_end": 2816,
- "gemport_id_start": 1024,
- "gemport_id_end": 8960,
- "pon_ports": 16
- }
-
- '''
- # constants used as keys to reference the resource range parameters from
- # and external KV store.
- ONU_START_IDX = "onu_id_start"
- ONU_END_IDX = "onu_id_end"
- ALLOC_ID_START_IDX = "alloc_id_start"
- ALLOC_ID_END_IDX = "alloc_id_end"
- GEM_PORT_ID_START_IDX = "gemport_id_start"
- GEM_PORT_ID_END_IDX = "gemport_id_end"
- NUM_OF_PON_PORT = "pon_ports"
-
- # PON Resource range configuration on the KV store.
- # Format: 'resource_manager/<technology>/resource_ranges/<olt_vendor_type>'
- # The KV store backend is initialized with a path prefix and we need to
- # provide only the suffix.
- PON_RESOURCE_RANGE_CONFIG_PATH = 'resource_ranges/{}'
-
- # resource path suffix
- ALLOC_ID_POOL_PATH = '{}/alloc_id_pool/{}'
- GEMPORT_ID_POOL_PATH = '{}/gemport_id_pool/{}'
- ONU_ID_POOL_PATH = '{}/onu_id_pool/{}'
-
- # Path on the KV store for storing list of alloc IDs for a given ONU
- # Format: <device_id>/<(pon_intf_id, onu_id)>/alloc_ids
- ALLOC_ID_RESOURCE_MAP_PATH = '{}/{}/alloc_ids'
-
- # Path on the KV store for storing list of gemport IDs for a given ONU
- # Format: <device_id>/<(pon_intf_id, onu_id)>/gemport_ids
- GEMPORT_ID_RESOURCE_MAP_PATH = '{}/{}/gemport_ids'
-
- # Constants for internal usage.
- PON_INTF_ID = 'pon_intf_id'
- START_IDX = 'start_idx'
- END_IDX = 'end_idx'
- POOL = 'pool'
-
- def __init__(self, technology, extra_args, device_id,
- backend, host, port):
- """
- Create PONResourceManager object.
-
- :param technology: PON technology
- :param: extra_args: This string contains extra arguments passed during
- pre-provisioning of OLT and specifies the OLT Vendor type
- :param device_id: OLT device id
- :param backend: backend store
- :param host: ip of backend store
- :param port: port on which backend store listens
- :raises exception when invalid backend store passed as an argument
- """
- # logger
- self._log = structlog.get_logger()
-
- try:
- self.technology = technology
- self.extra_args = extra_args
- self.device_id = device_id
- self.backend = backend
- self.host = host
- self.port = port
- self.olt_vendor = None
- self._kv_store = ResourceKvStore(technology, device_id, backend,
- host, port)
- # Below attribute, pon_resource_ranges, should be initialized
- # by reading from KV store.
- self.pon_resource_ranges = dict()
- except Exception as e:
- self._log.exception("exception-in-init")
- raise Exception(e)
-
- def init_resource_ranges_from_kv_store(self):
- """
- Initialize PON resource ranges with config fetched from kv store.
-
- :return boolean: True if PON resource ranges initialized else false
- """
- self.olt_vendor = self._get_olt_vendor()
- # Try to initialize the PON Resource Ranges from KV store based on the
- # OLT vendor key, if available
- if self.olt_vendor is None:
- self._log.info("olt-vendor-unavailable--not-reading-from-kv-store")
- return False
-
- path = self.PON_RESOURCE_RANGE_CONFIG_PATH.format(self.olt_vendor)
- try:
- # get resource from kv store
- result = self._kv_store.get_from_kv_store(path)
-
- if result is None:
- self._log.debug("resource-range-config-unavailable-on-kvstore")
- return False
-
- resource_range_config = result
-
- if resource_range_config is not None:
- self.pon_resource_ranges = json.loads(resource_range_config)
- self._log.debug("Init-resource-ranges-from-kvstore-success",
- pon_resource_ranges=self.pon_resource_ranges,
- path=path)
- return True
-
- except Exception as e:
- self._log.exception("error-initializing-resource-range-from-kv-store",
- e=e)
- return False
-
- def init_default_pon_resource_ranges(self, onu_start_idx=1,
- onu_end_idx=127,
- alloc_id_start_idx=1024,
- alloc_id_end_idx=2816,
- gem_port_id_start_idx=1024,
- gem_port_id_end_idx=8960,
- num_of_pon_ports=16):
- """
- Initialize default PON resource ranges
-
- :param onu_start_idx: onu id start index
- :param onu_end_idx: onu id end index
- :param alloc_id_start_idx: alloc id start index
- :param alloc_id_end_idx: alloc id end index
- :param gem_port_id_start_idx: gemport id start index
- :param gem_port_id_end_idx: gemport id end index
- :param num_of_pon_ports: number of PON ports
- """
- self._log.info("initialize-default-resource-range-values")
- self.pon_resource_ranges[
- PONResourceManager.ONU_START_IDX] = onu_start_idx
- self.pon_resource_ranges[PONResourceManager.ONU_END_IDX] = onu_end_idx
- self.pon_resource_ranges[
- PONResourceManager.ALLOC_ID_START_IDX] = alloc_id_start_idx
- self.pon_resource_ranges[
- PONResourceManager.ALLOC_ID_END_IDX] = alloc_id_end_idx
- self.pon_resource_ranges[
- PONResourceManager.GEM_PORT_ID_START_IDX] = gem_port_id_start_idx
- self.pon_resource_ranges[
- PONResourceManager.GEM_PORT_ID_END_IDX] = gem_port_id_end_idx
- self.pon_resource_ranges[
- PONResourceManager.NUM_OF_PON_PORT] = num_of_pon_ports
-
- def init_device_resource_pool(self):
- """
- Initialize resource pool for all PON ports.
- """
- i = 0
- while i < self.pon_resource_ranges[PONResourceManager.NUM_OF_PON_PORT]:
- self.init_resource_id_pool(
- pon_intf_id=i,
- resource_type=PONResourceManager.ONU_ID,
- start_idx=self.pon_resource_ranges[
- PONResourceManager.ONU_START_IDX],
- end_idx=self.pon_resource_ranges[
- PONResourceManager.ONU_END_IDX])
-
- i += 1
-
- # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
- # across OLT. To keep it simple, a single pool (POOL 0) is maintained
- # for both the resource types. This may need to change later.
- self.init_resource_id_pool(
- pon_intf_id=0,
- resource_type=PONResourceManager.ALLOC_ID,
- start_idx=self.pon_resource_ranges[
- PONResourceManager.ALLOC_ID_START_IDX],
- end_idx=self.pon_resource_ranges[
- PONResourceManager.ALLOC_ID_END_IDX])
-
- self.init_resource_id_pool(
- pon_intf_id=0,
- resource_type=PONResourceManager.GEMPORT_ID,
- start_idx=self.pon_resource_ranges[
- PONResourceManager.GEM_PORT_ID_START_IDX],
- end_idx=self.pon_resource_ranges[
- PONResourceManager.GEM_PORT_ID_END_IDX])
-
- def clear_device_resource_pool(self):
- """
- Clear resource pool of all PON ports.
- """
- i = 0
- while i < self.pon_resource_ranges[PONResourceManager.NUM_OF_PON_PORT]:
- self.clear_resource_id_pool(
- pon_intf_id=i,
- resource_type=PONResourceManager.ONU_ID,
- )
- i += 1
-
- self.clear_resource_id_pool(
- pon_intf_id=0,
- resource_type=PONResourceManager.ALLOC_ID,
- )
-
- self.clear_resource_id_pool(
- pon_intf_id=0,
- resource_type=PONResourceManager.GEMPORT_ID,
- )
-
- def init_resource_id_pool(self, pon_intf_id, resource_type, start_idx,
- end_idx):
- """
- Initialize Resource ID pool for a given Resource Type on a given PON Port
-
- :param pon_intf_id: OLT PON interface id
- :param resource_type: String to identify type of resource
- :param start_idx: start index for onu id pool
- :param end_idx: end index for onu id pool
- :return boolean: True if resource id pool initialized else false
- """
- status = False
- path = self._get_path(pon_intf_id, resource_type)
- if path is None:
- return status
-
- try:
- # In case of adapter reboot and reconciliation resource in kv store
- # checked for its presence if not kv store update happens
- resource = self._get_resource(path)
-
- if resource is not None:
- self._log.info("Resource-already-present-in-store", path=path)
- status = True
- else:
- resource = self._format_resource(pon_intf_id, start_idx,
- end_idx)
- self._log.info("Resource-initialized", path=path)
-
- # Add resource as json in kv store.
- result = self._kv_store.update_to_kv_store(path, resource)
- if result is True:
- status = True
-
- except Exception as e:
- self._log.exception("error-initializing-resource-pool", e=e)
-
- return status
-
- def get_resource_id(self, pon_intf_id, resource_type, num_of_id=1):
- """
- Create alloc/gemport/onu id for given OLT PON interface.
-
- :param pon_intf_id: OLT PON interface id
- :param resource_type: String to identify type of resource
- :param num_of_id: required number of ids
- :return list/int/None: list, int or None if resource type is
- alloc_id/gemport_id, onu_id or invalid type
- respectively
- """
- result = None
-
- # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
- # across OLT. To keep it simple, a single pool (POOL 0) is maintained
- # for both the resource types. This may need to change later.
- # Override the incoming pon_intf_id to PON0
- if resource_type == PONResourceManager.GEMPORT_ID or \
- resource_type == PONResourceManager.ALLOC_ID:
- pon_intf_id = 0
-
- path = self._get_path(pon_intf_id, resource_type)
- if path is None:
- return result
-
- try:
- resource = self._get_resource(path)
- if resource is not None and resource_type == \
- PONResourceManager.ONU_ID:
- result = self._generate_next_id(resource)
- elif resource is not None and (
- resource_type == PONResourceManager.GEMPORT_ID or
- resource_type == PONResourceManager.ALLOC_ID):
- result = list()
- while num_of_id > 0:
- result.append(self._generate_next_id(resource))
- num_of_id -= 1
- else:
- raise Exception("get-resource-failed")
-
- self._log.debug("Get-" + resource_type + "-success", result=result,
- path=path)
- # Update resource in kv store
- self._update_resource(path, resource)
-
- except Exception as e:
- self._log.exception("Get-" + resource_type + "-id-failed",
- path=path, e=e)
- return result
-
- def free_resource_id(self, pon_intf_id, resource_type, release_content):
- """
- Release alloc/gemport/onu id for given OLT PON interface.
-
- :param pon_intf_id: OLT PON interface id
- :param resource_type: String to identify type of resource
- :param release_content: required number of ids
- :return boolean: True if all IDs in given release_content released
- else False
- """
- status = False
-
- # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
- # across OLT. To keep it simple, a single pool (POOL 0) is maintained
- # for both the resource types. This may need to change later.
- # Override the incoming pon_intf_id to PON0
- if resource_type == PONResourceManager.GEMPORT_ID or \
- resource_type == PONResourceManager.ALLOC_ID:
- pon_intf_id = 0
-
- path = self._get_path(pon_intf_id, resource_type)
- if path is None:
- return status
-
- try:
- resource = self._get_resource(path)
- if resource is not None and resource_type == \
- PONResourceManager.ONU_ID:
- self._release_id(resource, release_content)
- elif resource is not None and (
- resource_type == PONResourceManager.ALLOC_ID or
- resource_type == PONResourceManager.GEMPORT_ID):
- for content in release_content:
- self._release_id(resource, content)
- else:
- raise Exception("get-resource-failed")
-
- self._log.debug("Free-" + resource_type + "-success", path=path)
-
- # Update resource in kv store
- status = self._update_resource(path, resource)
-
- except Exception as e:
- self._log.exception("Free-" + resource_type + "-failed",
- path=path, e=e)
- return status
-
- def clear_resource_id_pool(self, pon_intf_id, resource_type):
- """
- Clear Resource Pool for a given Resource Type on a given PON Port.
-
- :return boolean: True if removed else False
- """
- path = self._get_path(pon_intf_id, resource_type)
- if path is None:
- return False
-
- try:
- result = self._kv_store.remove_from_kv_store(path)
- if result is True:
- self._log.debug("Resource-pool-cleared",
- device_id=self.device_id,
- path=path)
- return True
- except Exception as e:
- self._log.exception("error-clearing-resource-pool", e=e)
-
- self._log.error("Clear-resource-pool-failed", device_id=self.device_id,
- path=path)
- return False
-
- def init_resource_map(self, pon_intf_onu_id):
- """
- Initialize resource map
-
- :param pon_intf_onu_id: reference of PON interface id and onu id
- """
- # initialize pon_intf_onu_id tuple to alloc_ids map
- alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
- self.device_id, str(pon_intf_onu_id)
- )
- alloc_ids = list()
- self._kv_store.update_to_kv_store(
- alloc_id_path, json.dumps(alloc_ids)
- )
-
- # initialize pon_intf_onu_id tuple to gemport_ids map
- gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
- self.device_id, str(pon_intf_onu_id)
- )
- gemport_ids = list()
- self._kv_store.update_to_kv_store(
- gemport_id_path, json.dumps(gemport_ids)
- )
-
- def remove_resource_map(self, pon_intf_onu_id):
- """
- Remove resource map
-
- :param pon_intf_onu_id: reference of PON interface id and onu id
- """
- # remove pon_intf_onu_id tuple to alloc_ids map
- alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
- self.device_id, str(pon_intf_onu_id)
- )
- self._kv_store.remove_from_kv_store(alloc_id_path)
-
- # remove pon_intf_onu_id tuple to gemport_ids map
- gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
- self.device_id, str(pon_intf_onu_id)
- )
- self._kv_store.remove_from_kv_store(gemport_id_path)
-
- def get_current_alloc_ids_for_onu(self, pon_intf_onu_id):
- """
- Get currently configured alloc ids for given pon_intf_onu_id
-
- :param pon_intf_onu_id: reference of PON interface id and onu id
- """
- path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
- self.device_id,
- str(pon_intf_onu_id))
- value = self._kv_store.get_from_kv_store(path)
- if value is not None:
- alloc_id_list = json.loads(value)
- if len(alloc_id_list) > 0:
- return alloc_id_list
-
- return None
-
- def get_current_gemport_ids_for_onu(self, pon_intf_onu_id):
- """
- Get currently configured gemport ids for given pon_intf_onu_id
-
- :param pon_intf_onu_id: reference of PON interface id and onu id
- """
-
- path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
- self.device_id,
- str(pon_intf_onu_id))
- value = self._kv_store.get_from_kv_store(path)
- if value is not None:
- gemport_id_list = json.loads(value)
- if len(gemport_id_list) > 0:
- return gemport_id_list
-
- return None
-
- def update_alloc_ids_for_onu(self, pon_intf_onu_id, alloc_ids):
- """
- Update currently configured alloc ids for given pon_intf_onu_id
-
- :param pon_intf_onu_id: reference of PON interface id and onu id
- """
- path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
- self.device_id, str(pon_intf_onu_id)
- )
- self._kv_store.update_to_kv_store(
- path, json.dumps(alloc_ids)
- )
-
- def update_gemport_ids_for_onu(self, pon_intf_onu_id, gemport_ids):
- """
- Update currently configured gemport ids for given pon_intf_onu_id
-
- :param pon_intf_onu_id: reference of PON interface id and onu id
- """
- path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
- self.device_id, str(pon_intf_onu_id)
- )
- self._kv_store.update_to_kv_store(
- path, json.dumps(gemport_ids)
- )
-
- def _get_olt_vendor(self):
- """
- Get olt vendor variant
-
- :return: type of olt vendor
- """
- olt_vendor = None
- if self.extra_args and len(self.extra_args) > 0:
- parser = OltVendorArgumentParser(add_help=False)
- parser.add_argument('--olt_vendor', '-o', action='store',
- choices=['default', 'asfvolt16', 'cigolt24'],
- default='default')
- try:
- args = parser.parse_args(shlex.split(self.extra_args))
- self._log.debug('parsing-extra-arguments', args=args)
- olt_vendor = args.olt_vendor
- except ArgumentError as e:
- self._log.exception('invalid-arguments: {}', e=e)
- except Exception as e:
- self._log.exception('option-parsing-error: {}', e=e)
-
- return olt_vendor
-
- def _generate_next_id(self, resource):
- """
- Generate unique id having OFFSET as start index.
-
- :param resource: resource used to generate ID
- :return int: generated id
- """
- pos = resource[PONResourceManager.POOL].find('0b0')
- resource[PONResourceManager.POOL].set(1, pos)
- return pos[0] + resource[PONResourceManager.START_IDX]
-
- def _release_id(self, resource, unique_id):
- """
- Release unique id having OFFSET as start index.
-
- :param resource: resource used to release ID
- :param unique_id: id need to be released
- """
- pos = ((int(unique_id)) - resource[PONResourceManager.START_IDX])
- resource[PONResourceManager.POOL].set(0, pos)
-
- def _get_path(self, pon_intf_id, resource_type):
- """
- Get path for given resource type.
-
- :param pon_intf_id: OLT PON interface id
- :param resource_type: String to identify type of resource
- :return: path for given resource type
- """
- path = None
- if resource_type == PONResourceManager.ONU_ID:
- path = self._get_onu_id_resource_path(pon_intf_id)
- elif resource_type == PONResourceManager.ALLOC_ID:
- path = self._get_alloc_id_resource_path(pon_intf_id)
- elif resource_type == PONResourceManager.GEMPORT_ID:
- path = self._get_gemport_id_resource_path(pon_intf_id)
- else:
- self._log.error("invalid-resource-pool-identifier")
- return path
-
- def _get_alloc_id_resource_path(self, pon_intf_id):
- """
- Get alloc id resource path.
-
- :param pon_intf_id: OLT PON interface id
- :return: alloc id resource path
- """
- return PONResourceManager.ALLOC_ID_POOL_PATH.format(
- self.device_id, pon_intf_id)
-
- def _get_gemport_id_resource_path(self, pon_intf_id):
- """
- Get gemport id resource path.
-
- :param pon_intf_id: OLT PON interface id
- :return: gemport id resource path
- """
- return PONResourceManager.GEMPORT_ID_POOL_PATH.format(
- self.device_id, pon_intf_id)
-
- def _get_onu_id_resource_path(self, pon_intf_id):
- """
- Get onu id resource path.
-
- :param pon_intf_id: OLT PON interface id
- :return: onu id resource path
- """
- return PONResourceManager.ONU_ID_POOL_PATH.format(
- self.device_id, pon_intf_id)
-
- def _update_resource(self, path, resource):
- """
- Update resource in resource kv store.
-
- :param path: path to update resource
- :param resource: resource need to be updated
- :return boolean: True if resource updated in kv store else False
- """
- resource[PONResourceManager.POOL] = \
- resource[PONResourceManager.POOL].bin
- result = self._kv_store.update_to_kv_store(path, json.dumps(resource))
- if result is True:
- return True
- return False
-
- def _get_resource(self, path):
- """
- Get resource from kv store.
-
- :param path: path to get resource
- :return: resource if resource present in kv store else None
- """
- # get resource from kv store
- result = self._kv_store.get_from_kv_store(path)
- if result is None:
- return result
- self._log.info("dumping resource", result=result)
- resource = result
-
- if resource is not None:
- # decode resource fetched from backend store to dictionary
- resource = json.loads(resource)
-
- # resource pool in backend store stored as binary string whereas to
- # access the pool to generate/release IDs it need to be converted
- # as BitArray
- resource[PONResourceManager.POOL] = \
- BitArray('0b' + resource[PONResourceManager.POOL])
-
- return resource
-
- def _format_resource(self, pon_intf_id, start_idx, end_idx):
- """
- Format resource as json.
-
- :param pon_intf_id: OLT PON interface id
- :param start_idx: start index for id pool
- :param end_idx: end index for id pool
- :return dictionary: resource formatted as dictionary
- """
- # Format resource as json to be stored in backend store
- resource = dict()
- resource[PONResourceManager.PON_INTF_ID] = pon_intf_id
- resource[PONResourceManager.START_IDX] = start_idx
- resource[PONResourceManager.END_IDX] = end_idx
-
- # resource pool stored in backend store as binary string
- resource[PONResourceManager.POOL] = BitArray(end_idx).bin
-
- return json.dumps(resource)
diff --git a/python/common/utils/consulhelpers.py b/python/common/utils/consulhelpers.py
index df4dd58..853143b 100644
--- a/python/common/utils/consulhelpers.py
+++ b/python/common/utils/consulhelpers.py
@@ -21,7 +21,7 @@
from structlog import get_logger
from consul import Consul
from random import randint
-from common.utils.nethelpers import get_my_primary_local_ipv4
+from nethelpers import get_my_primary_local_ipv4
log = get_logger()
diff --git a/python/common/utils/registry.py b/python/common/utils/registry.py
new file mode 100644
index 0000000..270bd71
--- /dev/null
+++ b/python/common/utils/registry.py
@@ -0,0 +1,69 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Simple component registry to provide centralized access to any registered
+components.
+"""
+from collections import OrderedDict
+from zope.interface import Interface
+
+
+class IComponent(Interface):
+ """
+ A Voltha Component
+ """
+
+ def start():
+ """
+ Called once the componet is instantiated. Can be used for async
+ initialization.
+ :return: (None or Deferred)
+ """
+
+ def stop():
+ """
+ Called once before the component is unloaded. Can be used for async
+ cleanup operations.
+ :return: (None or Deferred)
+ """
+
+
+class Registry(object):
+
+ def __init__(self):
+ self.components = OrderedDict()
+
+ def register(self, name, component):
+ assert IComponent.providedBy(component)
+ assert name not in self.components
+ self.components[name] = component
+ return component
+
+ def unregister(self, name):
+ if name in self.components:
+ del self.components[name]
+
+ def __call__(self, name):
+ return self.components[name]
+
+ def iterate(self):
+ return self.components.values()
+
+
+# public shared registry
+registry = Registry()