Initial commit moving openolt adapter from voltha-go to the new repo.
This version works with ponsim rather than openolt, this is temporary.
It is currently being fixed to work with openolt.
Change-Id: I34a800c98f050140b367e2d474b7aa8b79f34b9a
Signed-off-by: William Kurkian <wkurkian@cisco.com>
diff --git a/python/adapters/common/__init__.py b/python/adapters/common/__init__.py
new file mode 100644
index 0000000..58aca1e
--- /dev/null
+++ b/python/adapters/common/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/adapters/common/frameio/__init__.py b/python/adapters/common/frameio/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/adapters/common/frameio/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/adapters/common/frameio/frameio.py b/python/adapters/common/frameio/frameio.py
new file mode 100644
index 0000000..0657257
--- /dev/null
+++ b/python/adapters/common/frameio/frameio.py
@@ -0,0 +1,437 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A module that can send and receive raw ethernet frames on a set of interfaces
+and it can manage a set of vlan interfaces on top of existing
+interfaces. Due to reliance on raw sockets, this module requires
+root access. Also, raw sockets are hard to deal with in Twisted (not
+directly supported) we need to run the receiver select loop on a dedicated
+thread.
+"""
+
+import os
+import socket
+import struct
+import uuid
+from pcapy import BPFProgram
+from threading import Thread, Condition
+
+import fcntl
+
+import select
+import structlog
+import sys
+
+from scapy.data import ETH_P_ALL
+from twisted.internet import reactor
+from zope.interface import implementer
+
+from python.common.utils.registry import IComponent
+
+if sys.platform.startswith('linux'):
+ from third_party.oftest import afpacket, netutils
+elif sys.platform == 'darwin':
+ from scapy.arch import pcapdnet, BIOCIMMEDIATE, dnet
+
+log = structlog.get_logger()
+
+
+def hexify(buffer):
+ """
+ Return a hexadecimal string encoding of input buffer
+ """
+ return ''.join('%02x' % ord(c) for c in buffer)
+
+
+class _SelectWakerDescriptor(object):
+ """
+ A descriptor that can be mixed into a select loop to wake it up.
+ """
+ def __init__(self):
+ self.pipe_read, self.pipe_write = os.pipe()
+ fcntl.fcntl(self.pipe_write, fcntl.F_SETFL, os.O_NONBLOCK)
+
+ def __del__(self):
+ os.close(self.pipe_read)
+ os.close(self.pipe_write)
+
+ def fileno(self):
+ return self.pipe_read
+
+ def wait(self):
+ os.read(self.pipe_read, 1)
+
+ def notify(self):
+ """Trigger a select loop"""
+ os.write(self.pipe_write, '\x00')
+
+
+class BpfProgramFilter(object):
+ """
+ Convenience packet filter based on the well-tried Berkeley Packet Filter,
+ used by many well known open source tools such as pcap and tcpdump.
+ """
+ def __init__(self, program_string):
+ """
+ Create a filter using the BPF command syntax. To learn more,
+ consult 'man pcap-filter'.
+ :param program_string: The textual definition of the filter. Examples:
+ 'vlan 1000'
+ 'vlan 1000 and ip src host 10.10.10.10'
+ """
+ self.bpf = BPFProgram(program_string)
+
+ def __call__(self, frame):
+ """
+ Return 1 if frame passes filter.
+ :param frame: Raw frame provided as Python string
+ :return: 1 if frame satisfies filter, 0 otherwise.
+ """
+ return self.bpf.filter(frame)
+
+
+class FrameIOPort(object):
+ """
+ Represents a network interface which we can send/receive raw
+ Ethernet frames.
+ """
+
+ RCV_SIZE_DEFAULT = 4096
+ ETH_P_ALL = 0x03
+ RCV_TIMEOUT = 10000
+ MIN_PKT_SIZE = 60
+
+ def __init__(self, iface_name):
+ self.iface_name = iface_name
+ self.proxies = []
+ self.socket = self.open_socket(self.iface_name)
+ log.debug('socket-opened', fn=self.fileno(), iface=iface_name)
+ self.received = 0
+ self.discarded = 0
+
+ def add_proxy(self, proxy):
+ self.proxies.append(proxy)
+
+ def del_proxy(self, proxy):
+ self.proxies = [p for p in self.proxies if p.name != proxy.name]
+
+ def open_socket(self, iface_name):
+ raise NotImplementedError('to be implemented by derived class')
+
+ def rcv_frame(self):
+ raise NotImplementedError('to be implemented by derived class')
+
+ def __del__(self):
+ if self.socket:
+ self.socket.close()
+ self.socket = None
+ log.debug('socket-closed', iface=self.iface_name)
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def _dispatch(self, proxy, frame):
+ log.debug('calling-publisher', proxy=proxy.name, frame=hexify(frame))
+ try:
+ proxy.callback(proxy, frame)
+ except Exception as e:
+ log.exception('callback-error',
+ explanation='Callback failed while processing frame',
+ e=e)
+
+ def recv(self):
+ """Called on the select thread when a packet arrives"""
+ try:
+ frame = self.rcv_frame()
+ except RuntimeError as e:
+ # we observed this happens sometimes right after the socket was
+ # attached to a newly created veth interface. So we log it, but
+ # allow to continue.
+ log.warn('afpacket-recv-error', code=-1)
+ return
+
+ log.debug('frame-received', iface=self.iface_name, len=len(frame),
+ hex=hexify(frame))
+ self.received +=1
+ dispatched = False
+ for proxy in self.proxies:
+ if proxy.filter is None or proxy.filter(frame):
+ log.debug('frame-dispatched')
+ dispatched = True
+ reactor.callFromThread(self._dispatch, proxy, frame)
+
+ if not dispatched:
+ self.discarded += 1
+ log.debug('frame-discarded')
+
+ def send(self, frame):
+ log.debug('sending', len=len(frame), iface=self.iface_name)
+ sent_bytes = self.send_frame(frame)
+ if sent_bytes != len(frame):
+ log.error('send-error', iface=self.iface_name,
+ wanted_to_send=len(frame), actually_sent=sent_bytes)
+ return sent_bytes
+
+ def send_frame(self, frame):
+ try:
+ return self.socket.send(frame)
+ except socket.error, err:
+ if err[0] == os.errno.EINVAL:
+ if len(frame) < self.MIN_PKT_SIZE:
+ padding = '\x00' * (self.MIN_PKT_SIZE - len(frame))
+ frame = frame + padding
+ return self.socket.send(frame)
+ else:
+ raise
+
+ def up(self):
+ if sys.platform.startswith('darwin'):
+ pass
+ else:
+ os.system('ip link set {} up'.format(self.iface_name))
+ return self
+
+ def down(self):
+ if sys.platform.startswith('darwin'):
+ pass
+ else:
+ os.system('ip link set {} down'.format(self.iface_name))
+ return self
+
+ def statistics(self):
+ return self.received, self.discarded
+
+
+class LinuxFrameIOPort(FrameIOPort):
+
+ def open_socket(self, iface_name):
+ s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
+ afpacket.enable_auxdata(s)
+ s.bind((self.iface_name, self.ETH_P_ALL))
+ netutils.set_promisc(s, iface_name)
+ s.settimeout(self.RCV_TIMEOUT)
+ return s
+
+ def rcv_frame(self):
+ return afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
+
+
+class DarwinFrameIOPort(FrameIOPort):
+
+ def open_socket(self, iface_name):
+ sin = pcapdnet.open_pcap(iface_name, 1600, 1, 100)
+ try:
+ fcntl.ioctl(sin.fileno(), BIOCIMMEDIATE, struct.pack("I",1))
+ except:
+ pass
+
+ # need a different kind of socket for sending out
+ self.sout = dnet.eth(iface_name)
+
+ return sin
+
+ def send_frame(self, frame):
+ return self.sout.send(frame)
+
+ def rcv_frame(self):
+ pkt = self.socket.next()
+ if pkt is not None:
+ ts, pkt = pkt
+ return pkt
+
+
+if sys.platform == 'darwin':
+ _FrameIOPort = DarwinFrameIOPort
+elif sys.platform.startswith('linux'):
+ _FrameIOPort = LinuxFrameIOPort
+else:
+ raise Exception('Unsupported platform {}'.format(sys.platform))
+ sys.exit(1)
+
+
+class FrameIOPortProxy(object):
+ """Makes FrameIOPort sharable between multiple users"""
+
+ def __init__(self, frame_io_port, callback, filter=None, name=None):
+ self.frame_io_port = frame_io_port
+ self.callback = callback
+ self.filter = filter
+ self.name = uuid.uuid4().hex[:12] if name is None else name
+
+ @property
+ def iface_name(self):
+ return self.frame_io_port.iface_name
+
+ def get_iface_name(self):
+ return self.frame_io_port.iface_name
+
+ def send(self, frame):
+ return self.frame_io_port.send(frame)
+
+ def up(self):
+ self.frame_io_port.up()
+ return self
+
+ def down(self):
+ self.frame_io_port.down()
+ return self
+
+
+@implementer(IComponent)
+class FrameIOManager(Thread):
+ """
+ Packet/Frame IO manager that can be used to send/receive raw frames
+ on a set of network interfaces.
+ """
+ def __init__(self):
+ super(FrameIOManager, self).__init__()
+
+ self.ports = {} # iface_name -> ActiveFrameReceiver
+ self.queue = {} # iface_name -> TODO
+
+ self.cvar = Condition()
+ self.waker = _SelectWakerDescriptor()
+ self.stopped = False
+ self.ports_changed = False
+
+ # ~~~~~~~~~~~ exposed methods callable from main thread ~~~~~~~~~~~~~~~~~~~
+
+ def start(self):
+ """
+ Start the IO manager and its select loop thread
+ """
+ log.debug('starting')
+ super(FrameIOManager, self).start()
+ log.info('started')
+ return self
+
+ def stop(self):
+ """
+ Stop the IO manager and its thread with the select loop
+ """
+ log.debug('stopping')
+ self.stopped = True
+ self.waker.notify()
+ self.join()
+ del self.ports
+ log.info('stopped')
+
+ def list_interfaces(self):
+ """
+ Return list of interfaces listened on
+ :return: List of FrameIOPort objects
+ """
+ return self.ports
+
+ def open_port(self, iface_name, callback, filter=None, name=None):
+ """
+ Add a new interface and start receiving on it.
+ :param iface_name: Name of the interface. Must be an existing Unix
+ interface (eth0, en0, etc.)
+ :param callback: Called on each received frame;
+ signature: def callback(port, frame) where port is the FrameIOPort
+ instance at which the frame was received, frame is the actual frame
+ received (as binay string)
+ :param filter: An optional filter (predicate), with signature:
+ def filter(frame). If provided, only frames for which filter evaluates
+ to True will be forwarded to callback.
+ :return: FrmaeIOPortProxy instance.
+ """
+
+ port = self.ports.get(iface_name)
+ if port is None:
+ port = _FrameIOPort(iface_name)
+ self.ports[iface_name] = port
+ self.ports_changed = True
+ self.waker.notify()
+
+ proxy = FrameIOPortProxy(port, callback, filter, name)
+ port.add_proxy(proxy)
+
+ return proxy
+
+ def close_port(self, proxy):
+ """
+ Remove the proxy. If this is the last proxy on an interface, stop and
+ remove the named interface as well
+ :param proxy: FrameIOPortProxy reference
+ :return: None
+ """
+ assert isinstance(proxy, FrameIOPortProxy)
+ iface_name = proxy.get_iface_name()
+ assert iface_name in self.ports, "iface_name {} unknown".format(iface_name)
+ port = self.ports[iface_name]
+ port.del_proxy(proxy)
+
+ if not port.proxies:
+ del self.ports[iface_name]
+ # need to exit select loop to reconstruct select fd lists
+ self.ports_changed = True
+ self.waker.notify()
+
+ def send(self, iface_name, frame):
+ """
+ Send frame on given interface
+ :param iface_name: Name of previously registered interface
+ :param frame: frame as string
+ :return: number of bytes sent
+ """
+ return self.ports[iface_name].send(frame)
+
+ # ~~~~~~~~~~~~~ Thread methods (running on non-main thread ~~~~~~~~~~~~~~~~
+
+ def run(self):
+ """
+ Called on the alien thread, this is the core multi-port receive loop
+ """
+
+ log.debug('select-loop-started')
+
+ # outer loop constructs sockets list for select
+ while not self.stopped:
+ sockets = [self.waker] + self.ports.values()
+ self.ports_changed = False
+ empty = []
+ # inner select loop
+
+ while not self.stopped:
+ try:
+ _in, _out, _err = select.select(sockets, empty, empty, 1)
+ except Exception as e:
+ log.exception('frame-io-select-error', e=e)
+ break
+ with self.cvar:
+ for port in _in:
+ if port is self.waker:
+ self.waker.wait()
+ continue
+ else:
+ port.recv()
+ self.cvar.notify_all()
+ if self.ports_changed:
+ break # break inner loop so we reconstruct sockets list
+
+ log.debug('select-loop-exited')
+
+ def del_interface(self, iface_name):
+ """
+ Delete interface for stopping
+ """
+
+ log.info('Delete interface')
+ del self.ports[iface_name]
+ log.info('Interface(port) is deleted')
diff --git a/python/adapters/common/frameio/third_party/__init__.py b/python/adapters/common/frameio/third_party/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/adapters/common/frameio/third_party/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/adapters/common/frameio/third_party/oftest/LICENSE b/python/adapters/common/frameio/third_party/oftest/LICENSE
new file mode 100644
index 0000000..3216042
--- /dev/null
+++ b/python/adapters/common/frameio/third_party/oftest/LICENSE
@@ -0,0 +1,36 @@
+OpenFlow Test Framework
+
+Copyright (c) 2010 The Board of Trustees of The Leland Stanford
+Junior University
+
+Except where otherwise noted, this software is distributed under
+the OpenFlow Software License. See
+http://www.openflowswitch.org/wp/legal/ for current details.
+
+We are making the OpenFlow specification and associated documentation
+(Software) available for public use and benefit with the expectation
+that others will use, modify and enhance the Software and contribute
+those enhancements back to the community. However, since we would like
+to make the Software available for broadest use, with as few
+restrictions as possible permission is hereby granted, free of charge,
+to any person obtaining a copy of this Software to deal in the
+Software under the copyrights without restriction, including without
+limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED -Y´AS IS¡, WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+The name and trademarks of copyright holder(s) may NOT be used in
+advertising or publicity pertaining to the Software or any derivatives
+without specific, written prior permission.
diff --git a/python/adapters/common/frameio/third_party/oftest/README.md b/python/adapters/common/frameio/third_party/oftest/README.md
new file mode 100644
index 0000000..f0cb649
--- /dev/null
+++ b/python/adapters/common/frameio/third_party/oftest/README.md
@@ -0,0 +1,6 @@
+Files in this directory are derived from the respective files
+in oftest (http://github.com/floodlight/oftest).
+
+For the licensing terms of these files, see LICENSE in this dir.
+
+
diff --git a/python/adapters/common/frameio/third_party/oftest/__init__.py b/python/adapters/common/frameio/third_party/oftest/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/python/adapters/common/frameio/third_party/oftest/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/adapters/common/frameio/third_party/oftest/afpacket.py b/python/adapters/common/frameio/third_party/oftest/afpacket.py
new file mode 100644
index 0000000..9ae8075
--- /dev/null
+++ b/python/adapters/common/frameio/third_party/oftest/afpacket.py
@@ -0,0 +1,124 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+AF_PACKET receive support
+
+When VLAN offload is enabled on the NIC Linux will not deliver the VLAN tag
+in the data returned by recv. Instead, it delivers the VLAN TCI in a control
+message. Python 2.x doesn't have built-in support for recvmsg, so we have to
+use ctypes to call it. The recv function exported by this module reconstructs
+the VLAN tag if it was offloaded.
+"""
+
+import struct
+from ctypes import *
+
+ETH_P_8021Q = 0x8100
+SOL_PACKET = 263
+PACKET_AUXDATA = 8
+TP_STATUS_VLAN_VALID = 1 << 4
+
+class struct_iovec(Structure):
+ _fields_ = [
+ ("iov_base", c_void_p),
+ ("iov_len", c_size_t),
+ ]
+
+class struct_msghdr(Structure):
+ _fields_ = [
+ ("msg_name", c_void_p),
+ ("msg_namelen", c_uint32),
+ ("msg_iov", POINTER(struct_iovec)),
+ ("msg_iovlen", c_size_t),
+ ("msg_control", c_void_p),
+ ("msg_controllen", c_size_t),
+ ("msg_flags", c_int),
+ ]
+
+class struct_cmsghdr(Structure):
+ _fields_ = [
+ ("cmsg_len", c_size_t),
+ ("cmsg_level", c_int),
+ ("cmsg_type", c_int),
+ ]
+
+class struct_tpacket_auxdata(Structure):
+ _fields_ = [
+ ("tp_status", c_uint),
+ ("tp_len", c_uint),
+ ("tp_snaplen", c_uint),
+ ("tp_mac", c_ushort),
+ ("tp_net", c_ushort),
+ ("tp_vlan_tci", c_ushort),
+ ("tp_padding", c_ushort),
+ ]
+
+libc = CDLL("libc.so.6")
+recvmsg = libc.recvmsg
+recvmsg.argtypes = [c_int, POINTER(struct_msghdr), c_int]
+recvmsg.retype = c_int
+
+def enable_auxdata(sk):
+ """
+ Ask the kernel to return the VLAN tag in a control message
+
+ Must be called on the socket before afpacket.recv.
+ """
+ sk.setsockopt(SOL_PACKET, PACKET_AUXDATA, 1)
+
+def recv(sk, bufsize):
+ """
+ Receive a packet from an AF_PACKET socket
+ @sk Socket
+ @bufsize Maximum packet size
+ """
+ buf = create_string_buffer(bufsize)
+
+ ctrl_bufsize = sizeof(struct_cmsghdr) + sizeof(struct_tpacket_auxdata) + sizeof(c_size_t)
+ ctrl_buf = create_string_buffer(ctrl_bufsize)
+
+ iov = struct_iovec()
+ iov.iov_base = cast(buf, c_void_p)
+ iov.iov_len = bufsize
+
+ msghdr = struct_msghdr()
+ msghdr.msg_name = None
+ msghdr.msg_namelen = 0
+ msghdr.msg_iov = pointer(iov)
+ msghdr.msg_iovlen = 1
+ msghdr.msg_control = cast(ctrl_buf, c_void_p)
+ msghdr.msg_controllen = ctrl_bufsize
+ msghdr.msg_flags = 0
+
+ rv = recvmsg(sk.fileno(), byref(msghdr), 0)
+ if rv < 0:
+ raise RuntimeError("recvmsg failed: rv=%d", rv)
+
+ # The kernel only delivers control messages we ask for. We
+ # only enabled PACKET_AUXDATA, so we can assume it's the
+ # only control message.
+ assert msghdr.msg_controllen >= sizeof(struct_cmsghdr)
+
+ cmsghdr = struct_cmsghdr.from_buffer(ctrl_buf) # pylint: disable=E1101
+ assert cmsghdr.cmsg_level == SOL_PACKET
+ assert cmsghdr.cmsg_type == PACKET_AUXDATA
+
+ auxdata = struct_tpacket_auxdata.from_buffer(ctrl_buf, sizeof(struct_cmsghdr)) # pylint: disable=E1101
+
+ if auxdata.tp_vlan_tci != 0 or auxdata.tp_status & TP_STATUS_VLAN_VALID:
+ # Insert VLAN tag
+ tag = struct.pack("!HH", ETH_P_8021Q, auxdata.tp_vlan_tci)
+ return buf.raw[:12] + tag + buf.raw[12:rv]
+ else:
+ return buf.raw[:rv]
diff --git a/python/adapters/common/frameio/third_party/oftest/netutils.py b/python/adapters/common/frameio/third_party/oftest/netutils.py
new file mode 100644
index 0000000..092d490
--- /dev/null
+++ b/python/adapters/common/frameio/third_party/oftest/netutils.py
@@ -0,0 +1,73 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Network utilities for the OpenFlow test framework
+"""
+
+###########################################################################
+## ##
+## Promiscuous mode enable/disable ##
+## ##
+## Based on code from Scapy by Phillippe Biondi ##
+## ##
+## ##
+## This program is free software; you can redistribute it and/or modify it ##
+## under the terms of the GNU General Public License as published by the ##
+## Free Software Foundation; either version 2, or (at your option) any ##
+## later version. ##
+## ##
+## This program is distributed in the hope that it will be useful, but ##
+## WITHOUT ANY WARRANTY; without even the implied warranty of ##
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ##
+## General Public License for more details. ##
+## ##
+#############################################################################
+
+import socket
+from fcntl import ioctl
+import struct
+
+# From net/if_arp.h
+ARPHDR_ETHER = 1
+ARPHDR_LOOPBACK = 772
+
+# From bits/ioctls.h
+SIOCGIFHWADDR = 0x8927 # Get hardware address
+SIOCGIFINDEX = 0x8933 # name -> if_index mapping
+
+# From netpacket/packet.h
+PACKET_ADD_MEMBERSHIP = 1
+PACKET_DROP_MEMBERSHIP = 2
+PACKET_MR_PROMISC = 1
+
+# From bits/socket.h
+SOL_PACKET = 263
+
+def get_if(iff,cmd):
+ s=socket.socket()
+ ifreq = ioctl(s, cmd, struct.pack("16s16x",iff))
+ s.close()
+ return ifreq
+
+def get_if_index(iff):
+ return int(struct.unpack("I",get_if(iff, SIOCGIFINDEX)[16:20])[0])
+
+def set_promisc(s,iff,val=1):
+ mreq = struct.pack("IHH8s", get_if_index(iff), PACKET_MR_PROMISC, 0, "")
+ if val:
+ cmd = PACKET_ADD_MEMBERSHIP
+ else:
+ cmd = PACKET_DROP_MEMBERSHIP
+ s.setsockopt(SOL_PACKET, cmd, mreq)
+
diff --git a/python/adapters/common/kvstore/__init__.py b/python/adapters/common/kvstore/__init__.py
new file mode 100644
index 0000000..4a82628
--- /dev/null
+++ b/python/adapters/common/kvstore/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/adapters/common/kvstore/consul_client.py b/python/adapters/common/kvstore/consul_client.py
new file mode 100644
index 0000000..789e797
--- /dev/null
+++ b/python/adapters/common/kvstore/consul_client.py
@@ -0,0 +1,304 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair, RETRY_BACKOFF
+from python.common.utils.asleep import asleep
+from python.common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+from consul import ConsulException
+from consul.twisted import Consul
+from structlog import get_logger
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+
+log = get_logger()
+
+class ConsulClient(KVClient):
+
+ def __init__(self, kv_host, kv_port):
+ KVClient.__init__(self, kv_host, kv_port)
+ self.session_id = None
+ self.client = Consul(kv_host, kv_port)
+
+ def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
+ self._retriggering_watch(key, key_change_callback, timeout)
+
+ @inlineCallbacks
+ def _retriggering_watch(self, key, key_change_callback, timeout):
+ self.key_watches[key] = ConsulWatch(self.client, key, key_change_callback, timeout)
+ yield self.key_watches[key].start()
+
+ def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
+ if key in self.key_watches:
+ self.key_watches[key].stop()
+
+ @inlineCallbacks
+ def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
+ log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
+ err = None
+ result = None
+ while True:
+ try:
+ if operation == 'GET':
+ result = yield self._get(key, **kw)
+ elif operation == 'LIST':
+ result, err = yield self._list(key)
+ elif operation == 'PUT':
+ # Put returns a boolean response
+ result = yield self.client.kv.put(key, value)
+ if not result:
+ err = 'put-failed'
+ elif operation == 'DELETE':
+ # Delete returns a boolean response
+ result = yield self.client.kv.delete(key)
+ if not result:
+ err = 'delete-failed'
+ elif operation == 'RESERVE':
+ result, err = yield self._reserve(key, value, **kw)
+ elif operation == 'RENEW':
+ result, err = yield self._renew_reservation(key)
+ elif operation == 'RELEASE':
+ result, err = yield self._release_reservation(key)
+ elif operation == 'RELEASE-ALL':
+ err = yield self._release_all_reservations()
+ self._clear_backoff()
+ break
+ except ConsulException as ex:
+ if 'ConnectionRefusedError' in ex.message:
+ log.exception('comms-exception', ex=ex)
+ yield self._backoff('consul-not-up')
+ else:
+ log.error('consul-specific-exception', ex=ex)
+ err = ex
+ except Exception as ex:
+ log.error('consul-exception', ex=ex)
+ err = ex
+
+ if timeout > 0 and self.retry_time > timeout:
+ err = 'operation-timed-out'
+ if err is not None:
+ self._clear_backoff()
+ break
+
+ returnValue((result,err))
+
+ @inlineCallbacks
+ def _get(self, key, **kw):
+ kvp = None
+ index, rec = yield self.client.kv.get(key, **kw)
+ if rec is not None:
+ kvp = KVPair(rec['Key'], rec['Value'], index)
+ returnValue(kvp)
+
+ @inlineCallbacks
+ def _list(self, key):
+ err = None
+ list = []
+ index, recs = yield self.client.kv.get(key, recurse=True)
+ for rec in recs:
+ list.append(KVPair(rec['Key'], rec['Value'], rec['ModifyIndex']))
+ returnValue((list, err))
+
+ @inlineCallbacks
+ def _reserve(self, key, value, **kw):
+ for name, val in kw.items():
+ if name == 'ttl':
+ ttl = val
+ break
+ reserved = False
+ err = 'reservation-failed'
+ owner = None
+
+ # Create a session
+ self.session_id = yield self.client.session.create(behavior='delete',
+ ttl=ttl) # lock_delay=1)
+ log.debug('create-session', id=self.session_id)
+ # Try to acquire the key
+ result = yield self.client.kv.put(key, value, acquire=self.session_id)
+ log.debug('key-acquire', key=key, value=value, sess=self.session_id, result=result)
+
+ # Check if reservation succeeded
+ index, record = yield self.client.kv.get(key)
+ if record is not None and 'Value' in record:
+ owner = record['Value']
+ log.debug('get-key', session=record['Session'], owner=owner)
+ if record['Session'] == self.session_id and owner == value:
+ reserved = True
+ log.debug('key-reserved', key=key, value=value, ttl=ttl)
+ # Add key to reservation list
+ self.key_reservations[key] = self.session_id
+ else:
+ log.debug('reservation-held-by-another', owner=owner)
+
+ if reserved:
+ err = None
+ returnValue((owner, err))
+
+ @inlineCallbacks
+ def _renew_reservation(self, key):
+ result = None
+ err = None
+ if key not in self.key_reservations:
+ err = 'key-not-reserved'
+ else:
+ session_id = self.key_reservations[key]
+ # A successfully renewed session returns an object with fields:
+ # Node, CreateIndex, Name, ModifyIndex, ID, Behavior, TTL,
+ # LockDelay, and Checks
+ result = yield self.client.session.renew(session_id=session_id)
+ log.debug('session-renew', result=result)
+ if result is None:
+ err = 'session-renewal-failed'
+ returnValue((result, err))
+
+ @inlineCallbacks
+ def _release_reservation(self, key):
+ err = None
+ if key not in self.key_reservations:
+ err = 'key-not-reserved'
+ else:
+ session_id = self.key_reservations[key]
+ # A successfully destroyed session returns a boolean result
+ success = yield self.client.session.destroy(session_id)
+ log.debug('session-destroy', result=success)
+ if not success:
+ err = 'session-destroy-failed'
+ self.session_id = None
+ self.key_reservations.pop(key)
+ returnValue((success, err))
+
+ @inlineCallbacks
+ def _release_all_reservations(self):
+ err = None
+ keys_to_delete = []
+ for key in self.key_reservations:
+ session_id = self.key_reservations[key]
+ # A successfully destroyed session returns a boolean result
+ success = yield self.client.session.destroy(session_id)
+ if not success:
+ err = 'session-destroy-failed'
+ log.debug('session-destroy', id=session_id, result=success)
+ self.session_id = None
+ keys_to_delete.append(key)
+ for key in keys_to_delete:
+ self.key_reservations.pop(key)
+ returnValue(err)
+
+
+class ConsulWatch():
+
+ def __init__(self, consul, key, callback, timeout):
+ self.client = consul
+ self.key = key
+ self.index = None
+ self.callback = callback
+ self.timeout = timeout
+ self.period = 60
+ self.running = True
+ self.retries = 0
+ self.retry_time = 0
+
+ @inlineCallbacks
+ def start(self):
+ self.running = True
+ index, rec = yield self._get_with_retry(self.key, None,
+ timeout=self.timeout)
+ self.index = str(index)
+
+ @inlineCallbacks
+ def _get(key, deferred):
+ try:
+ index, rec = yield self._get_with_retry(key, None,
+ timeout=self.timeout,
+ index=self.index)
+ self.index = str(index)
+ if not deferred.called:
+ log.debug('got-result-cancelling-deferred')
+ deferred.callback((self.index, rec))
+ except Exception as e:
+ log.exception('got-exception', e=e)
+
+ while self.running:
+ try:
+ rcvd = DeferredWithTimeout(timeout=self.period)
+ _get(self.key, rcvd)
+ try:
+ # Update index for next watch iteration
+ index, rec = yield rcvd
+ log.debug('event-received', index=index, rec=rec)
+ # Notify client of key change event
+ if rec is None:
+ # Key has been deleted
+ self._send_event(Event(Event.DELETE, self.key, None))
+ else:
+ self._send_event(Event(Event.PUT, rec['Key'], rec['Value']))
+ except TimeOutError as e:
+ log.debug('no-events-over-watch-period', key=self.key)
+ except Exception as e:
+ log.exception('exception', e=e)
+ except Exception as e:
+ log.exception('exception', e=e)
+
+ log.debug('close-watch', key=self.key)
+
+ def stop(self):
+ self.running = False
+ self.callback = None
+
+ @inlineCallbacks
+ def _get_with_retry(self, key, value, timeout, *args, **kw):
+ log.debug('watch-period', key=key, period=self.period, timeout=timeout, args=args, kw=kw)
+ err = None
+ result = None
+ while True:
+ try:
+ result = yield self.client.kv.get(key, **kw)
+ self._clear_backoff()
+ break
+ except ConsulException as ex:
+ err = ex
+ if 'ConnectionRefusedError' in ex.message:
+ self._send_event(Event(Event.CONNECTION_DOWN, self.key, None))
+ log.exception('comms-exception', ex=ex)
+ yield self._backoff('consul-not-up')
+ else:
+ log.error('consul-specific-exception', ex=ex)
+ except Exception as ex:
+ err = ex
+ log.error('consul-exception', ex=ex)
+
+ if timeout > 0 and self.retry_time > timeout:
+ err = 'operation-timed-out'
+ if err is not None:
+ self._clear_backoff()
+ break
+
+ returnValue(result)
+
+ def _send_event(self, event):
+ if self.callback is not None:
+ self.callback(event)
+
+ def _backoff(self, msg):
+ wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
+ self.retry_time += wait_time
+ self.retries += 1
+ log.error(msg, next_retry_in_secs=wait_time,
+ total_delay_in_secs = self.retry_time,
+ retries=self.retries)
+ return asleep(wait_time)
+
+ def _clear_backoff(self):
+ if self.retries:
+ log.debug('reconnected-to-kv', after_retries=self.retries)
+ self.retries = 0
+ self.retry_time = 0
diff --git a/python/adapters/common/kvstore/etcd_client.py b/python/adapters/common/kvstore/etcd_client.py
new file mode 100644
index 0000000..e1850e7
--- /dev/null
+++ b/python/adapters/common/kvstore/etcd_client.py
@@ -0,0 +1,240 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+################################################################################
+#
+# Most of the txaioetcd methods provide a timeout parameter. This parameter
+# is likely intended to limit the amount of time spent by any one method
+# waiting for a response from the etcd server. However, if the server is
+# down, the method immediately throws a ConnectionRefusedError exception;
+# it does not perform any retries. The timeout parameter provided by the
+# methods in EtcdClient cover this contingency.
+#
+################################################################################
+
+from kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from twisted.internet.error import ConnectionRefusedError
+from txaioetcd import Client, CompVersion, Failed, KeySet, OpGet, OpSet, Transaction
+
+log = get_logger()
+
+class EtcdClient(KVClient):
+
+ def __init__(self, kv_host, kv_port):
+ KVClient.__init__(self, kv_host, kv_port)
+ self.url = u'http://' + kv_host + u':' + str(kv_port)
+ self.client = Client(reactor, self.url)
+
+ @inlineCallbacks
+ def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
+ self.key_watches[key] = key_change_callback
+ result = yield self._op_with_retry('WATCH', key, None, timeout, callback=self.key_changed)
+ returnValue(result)
+
+ def key_changed(self, kv):
+ key = kv.key
+ value = kv.value
+ log.debug('key-changed', key=key, value=value)
+ # Notify client of key change event
+ if value is not None:
+ evt = Event(Event.PUT, key, value)
+ else:
+ evt = Event(Event.DELETE, key, None)
+ if key in self.key_watches:
+ self.key_watches[key](evt)
+
+ def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
+ log.debug('close-watch', key=key)
+ if key in self.key_watches:
+ self.key_watches.pop(key)
+
+ @inlineCallbacks
+ def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
+ log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
+ err = None
+ result = None
+ if type(key) == str:
+ key = bytes(key)
+ if value is not None:
+ value = bytes(value)
+ while True:
+ try:
+ if operation == 'GET':
+ result = yield self._get(key)
+ elif operation == 'LIST':
+ result, err = yield self._list(key)
+ elif operation == 'PUT':
+ # Put returns an object of type Revision
+ result = yield self.client.set(key, value, **kw)
+ elif operation == 'DELETE':
+ # Delete returns an object of type Deleted
+ result = yield self.client.delete(key)
+ elif operation == 'RESERVE':
+ result, err = yield self._reserve(key, value, **kw)
+ elif operation == 'RENEW':
+ result, err = yield self._renew_reservation(key)
+ elif operation == 'RELEASE':
+ result, err = yield self._release_reservation(key)
+ elif operation == 'RELEASE-ALL':
+ err = yield self._release_all_reservations()
+ elif operation == 'WATCH':
+ for name, val in kw.items():
+ if name == 'callback':
+ callback = val
+ break
+ result = self.client.watch([KeySet(key, prefix=True)], callback)
+ self._clear_backoff()
+ break
+ except ConnectionRefusedError as ex:
+ log.error('comms-exception', ex=ex)
+ yield self._backoff('etcd-not-up')
+ except Exception as ex:
+ log.error('etcd-exception', ex=ex)
+ err = ex
+
+ if timeout > 0 and self.retry_time > timeout:
+ err = 'operation-timed-out'
+ if err is not None:
+ self._clear_backoff()
+ break
+
+ returnValue((result, err))
+
+ @inlineCallbacks
+ def _get(self, key):
+ kvp = None
+ resp = yield self.client.get(key)
+ if resp.kvs is not None and len(resp.kvs) == 1:
+ kv = resp.kvs[0]
+ kvp = KVPair(kv.key, kv.value, kv.mod_revision)
+ returnValue(kvp)
+
+ @inlineCallbacks
+ def _list(self, key):
+ err = None
+ list = []
+ resp = yield self.client.get(KeySet(key, prefix=True))
+ if resp.kvs is not None and len(resp.kvs) > 0:
+ for kv in resp.kvs:
+ list.append(KVPair(kv.key, kv.value, kv.mod_revision))
+ returnValue((list, err))
+
+ @inlineCallbacks
+ def _reserve(self, key, value, **kw):
+ for name, val in kw.items():
+ if name == 'ttl':
+ ttl = val
+ break
+ reserved = False
+ err = 'reservation-failed'
+ owner = None
+
+ # Create a lease
+ lease = yield self.client.lease(ttl)
+
+ # Create a transaction
+ txn = Transaction(
+ compare=[ CompVersion(key, '==', 0) ],
+ success=[ OpSet(key, bytes(value), lease=lease) ],
+ failure=[ OpGet(key) ]
+ )
+ newly_acquired = False
+ try:
+ result = yield self.client.submit(txn)
+ except Failed as failed:
+ log.debug('key-already-present', key=key)
+ if len(failed.responses) > 0:
+ response = failed.responses[0]
+ if response.kvs is not None and len(response.kvs) > 0:
+ kv = response.kvs[0]
+ log.debug('key-already-present', value=kv.value)
+ if kv.value == value:
+ reserved = True
+ log.debug('key-already-reserved', key = kv.key, value=kv.value)
+ else:
+ newly_acquired = True
+ log.debug('key-was-absent', key=key, result=result)
+
+ # Check if reservation succeeded
+ resp = yield self.client.get(key)
+ if resp.kvs is not None and len(resp.kvs) == 1:
+ owner = resp.kvs[0].value
+ if owner == value:
+ if newly_acquired:
+ log.debug('key-reserved', key=key, value=value, ttl=ttl,
+ lease_id=lease.lease_id)
+ reserved = True
+ # Add key to reservation list
+ self.key_reservations[key] = lease
+ else:
+ log.debug("reservation-still-held")
+ else:
+ log.debug('reservation-held-by-another', value=owner)
+
+ if reserved:
+ err = None
+ returnValue((owner, err))
+
+ @inlineCallbacks
+ def _renew_reservation(self, key):
+ result = None
+ err = None
+ if key not in self.key_reservations:
+ err = 'key-not-reserved'
+ else:
+ lease = self.key_reservations[key]
+ # A successfully refreshed lease returns an object of type Header
+ result = yield lease.refresh()
+ if result is None:
+ err = 'lease-refresh-failed'
+ returnValue((result, err))
+
+ @inlineCallbacks
+ def _release_reservation(self, key):
+ err = None
+ if key not in self.key_reservations:
+ err = 'key-not-reserved'
+ else:
+ lease = self.key_reservations[key]
+ time_left = yield lease.remaining()
+ # A successfully revoked lease returns an object of type Header
+ log.debug('release-reservation', key=key, lease_id=lease.lease_id,
+ time_left_in_secs=time_left)
+ result = yield lease.revoke()
+ if result is None:
+ err = 'lease-revoke-failed'
+ self.key_reservations.pop(key)
+ returnValue((result, err))
+
+ @inlineCallbacks
+ def _release_all_reservations(self):
+ err = None
+ keys_to_delete = []
+ for key in self.key_reservations:
+ lease = self.key_reservations[key]
+ time_left = yield lease.remaining()
+ # A successfully revoked lease returns an object of type Header
+ log.debug('release-reservation', key=key, lease_id=lease.lease_id,
+ time_left_in_secs=time_left)
+ result = yield lease.revoke()
+ if result is None:
+ err = 'lease-revoke-failed'
+ log.debug('lease-revoke', result=result)
+ keys_to_delete.append(key)
+ for key in keys_to_delete:
+ self.key_reservations.pop(key)
+ returnValue(err)
diff --git a/python/adapters/common/kvstore/kv_client.py b/python/adapters/common/kvstore/kv_client.py
new file mode 100644
index 0000000..f6486f3
--- /dev/null
+++ b/python/adapters/common/kvstore/kv_client.py
@@ -0,0 +1,206 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from python.common.utils.asleep import asleep
+from structlog import get_logger
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+log = get_logger()
+
+class KVPair():
+ def __init__(self, key, value, index):
+ self.key = key
+ self.value = value
+ self.index = index
+
+class Event():
+ PUT = 0
+ DELETE = 1
+ CONNECTION_DOWN = 2
+
+ def __init__(self, event_type, key, value):
+ self.event_type = event_type
+ self.key = key
+ self.value = value
+
+RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+DEFAULT_TIMEOUT = 0.0
+for i in range(len(RETRY_BACKOFF)):
+ DEFAULT_TIMEOUT += RETRY_BACKOFF[i]
+
+class KVClient():
+
+ def __init__(self, kv_host, kv_port):
+ self.host = kv_host
+ self.port = kv_port
+ self.key_reservations = {}
+ self.key_watches = {}
+ self.retries = 0
+ self.retry_time = 0
+
+ @inlineCallbacks
+ def get(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method returns the value of the given key in KV store.
+
+ :param key: The key whose value is requested
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: (KVPair, error) where KVPair is None if an error occurred
+ '''
+ result = yield self._op_with_retry('GET', key, None, timeout)
+ returnValue(result)
+
+ @inlineCallbacks
+ def list(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ The list method returns an array of key-value pairs all of which
+ share the same key prefix.
+
+ :param key: The key prefix
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: ([]KVPair, error) where []KVPair is a list of KVPair objects
+ '''
+ result = yield self._op_with_retry('LIST', key, None, timeout)
+ returnValue(result)
+
+ @inlineCallbacks
+ def put(self, key, value, timeout=DEFAULT_TIMEOUT):
+ '''
+ The put method writes a value to the given key in KV store.
+ Do NOT modify a reserved key in an etcd store; doing so seems
+ to nullify the TTL of the key. In other words, the key lasts
+ forever.
+
+ :param key: The key to be written to
+ :param value: The value of the key
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: error, which is set to None for a successful write
+ '''
+ _, err = yield self._op_with_retry('PUT', key, value, timeout)
+ returnValue(err)
+
+ @inlineCallbacks
+ def delete(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ The delete method removes a key from the KV store.
+
+ :param key: The key to be deleted
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: error, which is set to None for a successful deletion
+ '''
+ _, err = yield self._op_with_retry('DELETE', key, None, timeout)
+ returnValue(err)
+
+ @inlineCallbacks
+ def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method acts essentially like a semaphore. The underlying mechanism
+ differs depending on the KV store: etcd uses a test-and-set transaction;
+ consul uses an acquire lock. If using etcd, do NOT write to the key
+ subsequent to the initial reservation; the TTL functionality may become
+ impaired (i.e. the reservation never expires).
+
+ :param key: The key under reservation
+ :param value: The reservation owner
+ :param ttl: The time-to-live (TTL) for the reservation. The key is unreserved
+ by the KV store when the TTL expires.
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: (key_value, error) If the key is acquired, then the value returned will
+ be the value passed in. If the key is already acquired, then the value assigned
+ to that key will be returned.
+ '''
+ result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
+ returnValue(result)
+
+ @inlineCallbacks
+ def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method renews the reservation for a given key. A reservation expires
+ after the TTL (Time To Live) period specified when reserving the key.
+
+ :param key: The reserved key
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: error, which is set to None for a successful renewal
+ '''
+ result, err = yield self._op_with_retry('RENEW', key, None, timeout)
+ returnValue(err)
+
+ @inlineCallbacks
+ def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ The release_reservation method cancels the reservation for a given key.
+
+ :param key: The reserved key
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: error, which is set to None for a successful cancellation
+ '''
+ result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
+ returnValue(err)
+
+ @inlineCallbacks
+ def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method cancels all key reservations made previously
+ using the reserve API.
+
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: error, which is set to None for a successful cancellation
+ '''
+ result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
+ returnValue(err)
+
+ @inlineCallbacks
+ def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method provides a watch capability for the given key. If the value of the key
+ changes or the key is deleted, then an event indicating the change is passed to
+ the given callback function.
+
+ :param key: The key to be watched
+ :param key_change_callback: The function invoked whenever the key changes
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: There is no return; key change events are passed to the callback function
+ '''
+ raise NotImplementedError('Method not implemented')
+
+ @inlineCallbacks
+ def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
+ '''
+ This method closes the watch on the given key. Once the watch is closed, key
+ change events are no longer passed to the key change callback function.
+
+ :param key: The key under watch
+ :param timeout: The length of time in seconds the method will wait for a response
+ :return: There is no return
+ '''
+ raise NotImplementedError('Method not implemented')
+
+ @inlineCallbacks
+ def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
+ raise NotImplementedError('Method not implemented')
+
+ def _backoff(self, msg):
+ wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
+ self.retry_time += wait_time
+ self.retries += 1
+ log.error(msg, next_retry_in_secs=wait_time,
+ total_delay_in_secs = self.retry_time,
+ retries=self.retries)
+ return asleep(wait_time)
+
+ def _clear_backoff(self):
+ if self.retries:
+ log.debug('reset-backoff', after_retries=self.retries)
+ self.retries = 0
+ self.retry_time = 0
\ No newline at end of file
diff --git a/python/adapters/common/kvstore/kvstore.py b/python/adapters/common/kvstore/kvstore.py
new file mode 100644
index 0000000..ed7f246
--- /dev/null
+++ b/python/adapters/common/kvstore/kvstore.py
@@ -0,0 +1,31 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from consul_client import ConsulClient
+from etcd_client import EtcdClient
+
+def create_kv_client(kv_store, host, port):
+ '''
+ Factory for creating a client interface to a KV store
+
+ :param kv_store: Specify either 'etcd' or 'consul'
+ :param host: Name or IP address of host serving the KV store
+ :param port: Port number (integer) of the KV service
+ :return: Reference to newly created client interface
+ '''
+ if kv_store == 'etcd':
+ return EtcdClient(host, port)
+ elif kv_store == 'consul':
+ return ConsulClient(host, port)
+ return None
diff --git a/python/adapters/common/pon_resource_manager/__init__.py b/python/adapters/common/pon_resource_manager/__init__.py
new file mode 100644
index 0000000..2d104e0
--- /dev/null
+++ b/python/adapters/common/pon_resource_manager/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/adapters/common/pon_resource_manager/resource_kv_store.py b/python/adapters/common/pon_resource_manager/resource_kv_store.py
new file mode 100644
index 0000000..a1a5c14
--- /dev/null
+++ b/python/adapters/common/pon_resource_manager/resource_kv_store.py
@@ -0,0 +1,107 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Resource KV store - interface between Resource Manager and backend store."""
+import structlog
+
+from voltha.core.config.config_backend import ConsulStore
+from voltha.core.config.config_backend import EtcdStore
+
+# KV store uses this prefix to store resource info
+PATH_PREFIX = 'resource_manager/{}'
+
+
+class ResourceKvStore(object):
+ """Implements apis to store/get/remove resource in backend store."""
+
+ def __init__(self, technology, device_id, backend, host, port):
+ """
+ Create ResourceKvStore object.
+
+ Based on backend ('consul' and 'etcd' use the host and port
+ to create the respective object.
+
+ :param technology: PON technology
+ :param device_id: OLT device id
+ :param backend: Type of backend storage (etcd or consul)
+ :param host: host ip info for backend storage
+ :param port: port for the backend storage
+ :raises exception when invalid backend store passed as an argument
+ """
+ # logger
+ self._log = structlog.get_logger()
+
+ path = PATH_PREFIX.format(technology)
+ try:
+ if backend == 'consul':
+ self._kv_store = ConsulStore(host, port, path)
+ elif backend == 'etcd':
+ self._kv_store = EtcdStore(host, port, path)
+ else:
+ self._log.error('Invalid-backend')
+ raise Exception("Invalid-backend-for-kv-store")
+ except Exception as e:
+ self._log.exception("exception-in-init")
+ raise Exception(e)
+
+ def update_to_kv_store(self, path, resource):
+ """
+ Update resource.
+
+ :param path: path to update the resource
+ :param resource: updated resource
+ """
+ try:
+ self._kv_store[path] = str(resource)
+ self._log.debug("Resource-updated-in-kv-store", path=path)
+ return True
+ except BaseException:
+ self._log.exception("Resource-update-in-kv-store-failed",
+ path=path, resource=resource)
+ return False
+
+ def get_from_kv_store(self, path):
+ """
+ Get resource.
+
+ :param path: path to get the resource
+ """
+ resource = None
+ try:
+ resource = self._kv_store[path]
+ self._log.debug("Got-resource-from-kv-store", path=path)
+ except KeyError:
+ self._log.info("Resource-not-found-updating-resource",
+ path=path)
+ except BaseException:
+ self._log.exception("Getting-resource-from-kv-store-failed",
+ path=path)
+ return resource
+
+ def remove_from_kv_store(self, path):
+ """
+ Remove resource.
+
+ :param path: path to remove the resource
+ """
+ try:
+ del self._kv_store[path]
+ self._log.debug("Resource-deleted-in-kv-store", path=path)
+ return True
+ except BaseException:
+ self._log.exception("Resource-delete-in-kv-store-failed",
+ path=path)
+ return False
diff --git a/python/adapters/common/pon_resource_manager/resource_manager.py b/python/adapters/common/pon_resource_manager/resource_manager.py
new file mode 100644
index 0000000..17b2871
--- /dev/null
+++ b/python/adapters/common/pon_resource_manager/resource_manager.py
@@ -0,0 +1,677 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Resource Manager will be unique for each OLT device.
+
+It exposes APIs to create/free alloc_ids/onu_ids/gemport_ids. Resource Manager
+uses a KV store in backend to ensure resiliency of the data.
+"""
+import json
+import structlog
+from bitstring import BitArray
+from ast import literal_eval
+import shlex
+from argparse import ArgumentParser, ArgumentError
+
+from common.pon_resource_manager.resource_kv_store import ResourceKvStore
+
+
+# Used to parse extra arguments to OpenOlt adapter from the NBI
+class OltVendorArgumentParser(ArgumentParser):
+ # Must override the exit command to prevent it from
+ # calling sys.exit(). Return exception instead.
+ def exit(self, status=0, message=None):
+ raise Exception(message)
+
+
+class PONResourceManager(object):
+ """Implements APIs to initialize/allocate/release alloc/gemport/onu IDs."""
+
+ # Constants to identify resource pool
+ ONU_ID = 'ONU_ID'
+ ALLOC_ID = 'ALLOC_ID'
+ GEMPORT_ID = 'GEMPORT_ID'
+
+ # The resource ranges for a given device vendor_type should be placed
+ # at 'resource_manager/<technology>/resource_ranges/<olt_vendor_type>'
+ # path on the KV store.
+ # If Resource Range parameters are to be read from the external KV store,
+ # they are expected to be stored in the following format.
+ # Note: All parameters are MANDATORY for now.
+ '''
+ {
+ "onu_id_start": 1,
+ "onu_id_end": 127,
+ "alloc_id_start": 1024,
+ "alloc_id_end": 2816,
+ "gemport_id_start": 1024,
+ "gemport_id_end": 8960,
+ "pon_ports": 16
+ }
+
+ '''
+ # constants used as keys to reference the resource range parameters from
+ # and external KV store.
+ ONU_START_IDX = "onu_id_start"
+ ONU_END_IDX = "onu_id_end"
+ ALLOC_ID_START_IDX = "alloc_id_start"
+ ALLOC_ID_END_IDX = "alloc_id_end"
+ GEM_PORT_ID_START_IDX = "gemport_id_start"
+ GEM_PORT_ID_END_IDX = "gemport_id_end"
+ NUM_OF_PON_PORT = "pon_ports"
+
+ # PON Resource range configuration on the KV store.
+ # Format: 'resource_manager/<technology>/resource_ranges/<olt_vendor_type>'
+ # The KV store backend is initialized with a path prefix and we need to
+ # provide only the suffix.
+ PON_RESOURCE_RANGE_CONFIG_PATH = 'resource_ranges/{}'
+
+ # resource path suffix
+ ALLOC_ID_POOL_PATH = '{}/alloc_id_pool/{}'
+ GEMPORT_ID_POOL_PATH = '{}/gemport_id_pool/{}'
+ ONU_ID_POOL_PATH = '{}/onu_id_pool/{}'
+
+ # Path on the KV store for storing list of alloc IDs for a given ONU
+ # Format: <device_id>/<(pon_intf_id, onu_id)>/alloc_ids
+ ALLOC_ID_RESOURCE_MAP_PATH = '{}/{}/alloc_ids'
+
+ # Path on the KV store for storing list of gemport IDs for a given ONU
+ # Format: <device_id>/<(pon_intf_id, onu_id)>/gemport_ids
+ GEMPORT_ID_RESOURCE_MAP_PATH = '{}/{}/gemport_ids'
+
+ # Constants for internal usage.
+ PON_INTF_ID = 'pon_intf_id'
+ START_IDX = 'start_idx'
+ END_IDX = 'end_idx'
+ POOL = 'pool'
+
+ def __init__(self, technology, extra_args, device_id,
+ backend, host, port):
+ """
+ Create PONResourceManager object.
+
+ :param technology: PON technology
+ :param: extra_args: This string contains extra arguments passed during
+ pre-provisioning of OLT and specifies the OLT Vendor type
+ :param device_id: OLT device id
+ :param backend: backend store
+ :param host: ip of backend store
+ :param port: port on which backend store listens
+ :raises exception when invalid backend store passed as an argument
+ """
+ # logger
+ self._log = structlog.get_logger()
+
+ try:
+ self.technology = technology
+ self.extra_args = extra_args
+ self.device_id = device_id
+ self.backend = backend
+ self.host = host
+ self.port = port
+ self.olt_vendor = None
+ self._kv_store = ResourceKvStore(technology, device_id, backend,
+ host, port)
+ # Below attribute, pon_resource_ranges, should be initialized
+ # by reading from KV store.
+ self.pon_resource_ranges = dict()
+ except Exception as e:
+ self._log.exception("exception-in-init")
+ raise Exception(e)
+
+ def init_resource_ranges_from_kv_store(self):
+ """
+ Initialize PON resource ranges with config fetched from kv store.
+
+ :return boolean: True if PON resource ranges initialized else false
+ """
+ self.olt_vendor = self._get_olt_vendor()
+ # Try to initialize the PON Resource Ranges from KV store based on the
+ # OLT vendor key, if available
+ if self.olt_vendor is None:
+ self._log.info("olt-vendor-unavailable--not-reading-from-kv-store")
+ return False
+
+ path = self.PON_RESOURCE_RANGE_CONFIG_PATH.format(self.olt_vendor)
+ try:
+ # get resource from kv store
+ result = self._kv_store.get_from_kv_store(path)
+
+ if result is None:
+ self._log.debug("resource-range-config-unavailable-on-kvstore")
+ return False
+
+ resource_range_config = result
+
+ if resource_range_config is not None:
+ self.pon_resource_ranges = json.loads(resource_range_config)
+ self._log.debug("Init-resource-ranges-from-kvstore-success",
+ pon_resource_ranges=self.pon_resource_ranges,
+ path=path)
+ return True
+
+ except Exception as e:
+ self._log.exception("error-initializing-resource-range-from-kv-store",
+ e=e)
+ return False
+
+ def init_default_pon_resource_ranges(self, onu_start_idx=1,
+ onu_end_idx=127,
+ alloc_id_start_idx=1024,
+ alloc_id_end_idx=2816,
+ gem_port_id_start_idx=1024,
+ gem_port_id_end_idx=8960,
+ num_of_pon_ports=16):
+ """
+ Initialize default PON resource ranges
+
+ :param onu_start_idx: onu id start index
+ :param onu_end_idx: onu id end index
+ :param alloc_id_start_idx: alloc id start index
+ :param alloc_id_end_idx: alloc id end index
+ :param gem_port_id_start_idx: gemport id start index
+ :param gem_port_id_end_idx: gemport id end index
+ :param num_of_pon_ports: number of PON ports
+ """
+ self._log.info("initialize-default-resource-range-values")
+ self.pon_resource_ranges[
+ PONResourceManager.ONU_START_IDX] = onu_start_idx
+ self.pon_resource_ranges[PONResourceManager.ONU_END_IDX] = onu_end_idx
+ self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_START_IDX] = alloc_id_start_idx
+ self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_END_IDX] = alloc_id_end_idx
+ self.pon_resource_ranges[
+ PONResourceManager.GEM_PORT_ID_START_IDX] = gem_port_id_start_idx
+ self.pon_resource_ranges[
+ PONResourceManager.GEM_PORT_ID_END_IDX] = gem_port_id_end_idx
+ self.pon_resource_ranges[
+ PONResourceManager.NUM_OF_PON_PORT] = num_of_pon_ports
+
+ def init_device_resource_pool(self):
+ """
+ Initialize resource pool for all PON ports.
+ """
+ i = 0
+ while i < self.pon_resource_ranges[PONResourceManager.NUM_OF_PON_PORT]:
+ self.init_resource_id_pool(
+ pon_intf_id=i,
+ resource_type=PONResourceManager.ONU_ID,
+ start_idx=self.pon_resource_ranges[
+ PONResourceManager.ONU_START_IDX],
+ end_idx=self.pon_resource_ranges[
+ PONResourceManager.ONU_END_IDX])
+
+ i += 1
+
+ # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+ # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+ # for both the resource types. This may need to change later.
+ self.init_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.ALLOC_ID,
+ start_idx=self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_START_IDX],
+ end_idx=self.pon_resource_ranges[
+ PONResourceManager.ALLOC_ID_END_IDX])
+
+ self.init_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.GEMPORT_ID,
+ start_idx=self.pon_resource_ranges[
+ PONResourceManager.GEM_PORT_ID_START_IDX],
+ end_idx=self.pon_resource_ranges[
+ PONResourceManager.GEM_PORT_ID_END_IDX])
+
+ def clear_device_resource_pool(self):
+ """
+ Clear resource pool of all PON ports.
+ """
+ i = 0
+ while i < self.pon_resource_ranges[PONResourceManager.NUM_OF_PON_PORT]:
+ self.clear_resource_id_pool(
+ pon_intf_id=i,
+ resource_type=PONResourceManager.ONU_ID,
+ )
+ i += 1
+
+ self.clear_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.ALLOC_ID,
+ )
+
+ self.clear_resource_id_pool(
+ pon_intf_id=0,
+ resource_type=PONResourceManager.GEMPORT_ID,
+ )
+
+ def init_resource_id_pool(self, pon_intf_id, resource_type, start_idx,
+ end_idx):
+ """
+ Initialize Resource ID pool for a given Resource Type on a given PON Port
+
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param start_idx: start index for onu id pool
+ :param end_idx: end index for onu id pool
+ :return boolean: True if resource id pool initialized else false
+ """
+ status = False
+ path = self._get_path(pon_intf_id, resource_type)
+ if path is None:
+ return status
+
+ try:
+ # In case of adapter reboot and reconciliation resource in kv store
+ # checked for its presence if not kv store update happens
+ resource = self._get_resource(path)
+
+ if resource is not None:
+ self._log.info("Resource-already-present-in-store", path=path)
+ status = True
+ else:
+ resource = self._format_resource(pon_intf_id, start_idx,
+ end_idx)
+ self._log.info("Resource-initialized", path=path)
+
+ # Add resource as json in kv store.
+ result = self._kv_store.update_to_kv_store(path, resource)
+ if result is True:
+ status = True
+
+ except Exception as e:
+ self._log.exception("error-initializing-resource-pool", e=e)
+
+ return status
+
+ def get_resource_id(self, pon_intf_id, resource_type, num_of_id=1):
+ """
+ Create alloc/gemport/onu id for given OLT PON interface.
+
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param num_of_id: required number of ids
+ :return list/int/None: list, int or None if resource type is
+ alloc_id/gemport_id, onu_id or invalid type
+ respectively
+ """
+ result = None
+
+ # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+ # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+ # for both the resource types. This may need to change later.
+ # Override the incoming pon_intf_id to PON0
+ if resource_type == PONResourceManager.GEMPORT_ID or \
+ resource_type == PONResourceManager.ALLOC_ID:
+ pon_intf_id = 0
+
+ path = self._get_path(pon_intf_id, resource_type)
+ if path is None:
+ return result
+
+ try:
+ resource = self._get_resource(path)
+ if resource is not None and resource_type == \
+ PONResourceManager.ONU_ID:
+ result = self._generate_next_id(resource)
+ elif resource is not None and (
+ resource_type == PONResourceManager.GEMPORT_ID or
+ resource_type == PONResourceManager.ALLOC_ID):
+ result = list()
+ while num_of_id > 0:
+ result.append(self._generate_next_id(resource))
+ num_of_id -= 1
+ else:
+ raise Exception("get-resource-failed")
+
+ self._log.debug("Get-" + resource_type + "-success", result=result,
+ path=path)
+ # Update resource in kv store
+ self._update_resource(path, resource)
+
+ except Exception as e:
+ self._log.exception("Get-" + resource_type + "-id-failed",
+ path=path, e=e)
+ return result
+
+ def free_resource_id(self, pon_intf_id, resource_type, release_content):
+ """
+ Release alloc/gemport/onu id for given OLT PON interface.
+
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param release_content: required number of ids
+ :return boolean: True if all IDs in given release_content released
+ else False
+ """
+ status = False
+
+ # TODO: ASFvOLT16 platform requires alloc and gemport ID to be unique
+ # across OLT. To keep it simple, a single pool (POOL 0) is maintained
+ # for both the resource types. This may need to change later.
+ # Override the incoming pon_intf_id to PON0
+ if resource_type == PONResourceManager.GEMPORT_ID or \
+ resource_type == PONResourceManager.ALLOC_ID:
+ pon_intf_id = 0
+
+ path = self._get_path(pon_intf_id, resource_type)
+ if path is None:
+ return status
+
+ try:
+ resource = self._get_resource(path)
+ if resource is not None and resource_type == \
+ PONResourceManager.ONU_ID:
+ self._release_id(resource, release_content)
+ elif resource is not None and (
+ resource_type == PONResourceManager.ALLOC_ID or
+ resource_type == PONResourceManager.GEMPORT_ID):
+ for content in release_content:
+ self._release_id(resource, content)
+ else:
+ raise Exception("get-resource-failed")
+
+ self._log.debug("Free-" + resource_type + "-success", path=path)
+
+ # Update resource in kv store
+ status = self._update_resource(path, resource)
+
+ except Exception as e:
+ self._log.exception("Free-" + resource_type + "-failed",
+ path=path, e=e)
+ return status
+
+ def clear_resource_id_pool(self, pon_intf_id, resource_type):
+ """
+ Clear Resource Pool for a given Resource Type on a given PON Port.
+
+ :return boolean: True if removed else False
+ """
+ path = self._get_path(pon_intf_id, resource_type)
+ if path is None:
+ return False
+
+ try:
+ result = self._kv_store.remove_from_kv_store(path)
+ if result is True:
+ self._log.debug("Resource-pool-cleared",
+ device_id=self.device_id,
+ path=path)
+ return True
+ except Exception as e:
+ self._log.exception("error-clearing-resource-pool", e=e)
+
+ self._log.error("Clear-resource-pool-failed", device_id=self.device_id,
+ path=path)
+ return False
+
+ def init_resource_map(self, pon_intf_onu_id):
+ """
+ Initialize resource map
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ # initialize pon_intf_onu_id tuple to alloc_ids map
+ alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ alloc_ids = list()
+ self._kv_store.update_to_kv_store(
+ alloc_id_path, json.dumps(alloc_ids)
+ )
+
+ # initialize pon_intf_onu_id tuple to gemport_ids map
+ gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ gemport_ids = list()
+ self._kv_store.update_to_kv_store(
+ gemport_id_path, json.dumps(gemport_ids)
+ )
+
+ def remove_resource_map(self, pon_intf_onu_id):
+ """
+ Remove resource map
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ # remove pon_intf_onu_id tuple to alloc_ids map
+ alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.remove_from_kv_store(alloc_id_path)
+
+ # remove pon_intf_onu_id tuple to gemport_ids map
+ gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.remove_from_kv_store(gemport_id_path)
+
+ def get_current_alloc_ids_for_onu(self, pon_intf_onu_id):
+ """
+ Get currently configured alloc ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id))
+ value = self._kv_store.get_from_kv_store(path)
+ if value is not None:
+ alloc_id_list = json.loads(value)
+ if len(alloc_id_list) > 0:
+ return alloc_id_list
+
+ return None
+
+ def get_current_gemport_ids_for_onu(self, pon_intf_onu_id):
+ """
+ Get currently configured gemport ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+
+ path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id,
+ str(pon_intf_onu_id))
+ value = self._kv_store.get_from_kv_store(path)
+ if value is not None:
+ gemport_id_list = json.loads(value)
+ if len(gemport_id_list) > 0:
+ return gemport_id_list
+
+ return None
+
+ def update_alloc_ids_for_onu(self, pon_intf_onu_id, alloc_ids):
+ """
+ Update currently configured alloc ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.update_to_kv_store(
+ path, json.dumps(alloc_ids)
+ )
+
+ def update_gemport_ids_for_onu(self, pon_intf_onu_id, gemport_ids):
+ """
+ Update currently configured gemport ids for given pon_intf_onu_id
+
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ """
+ path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.update_to_kv_store(
+ path, json.dumps(gemport_ids)
+ )
+
+ def _get_olt_vendor(self):
+ """
+ Get olt vendor variant
+
+ :return: type of olt vendor
+ """
+ olt_vendor = None
+ if self.extra_args and len(self.extra_args) > 0:
+ parser = OltVendorArgumentParser(add_help=False)
+ parser.add_argument('--olt_vendor', '-o', action='store',
+ choices=['default', 'asfvolt16', 'cigolt24'],
+ default='default')
+ try:
+ args = parser.parse_args(shlex.split(self.extra_args))
+ self._log.debug('parsing-extra-arguments', args=args)
+ olt_vendor = args.olt_vendor
+ except ArgumentError as e:
+ self._log.exception('invalid-arguments: {}', e=e)
+ except Exception as e:
+ self._log.exception('option-parsing-error: {}', e=e)
+
+ return olt_vendor
+
+ def _generate_next_id(self, resource):
+ """
+ Generate unique id having OFFSET as start index.
+
+ :param resource: resource used to generate ID
+ :return int: generated id
+ """
+ pos = resource[PONResourceManager.POOL].find('0b0')
+ resource[PONResourceManager.POOL].set(1, pos)
+ return pos[0] + resource[PONResourceManager.START_IDX]
+
+ def _release_id(self, resource, unique_id):
+ """
+ Release unique id having OFFSET as start index.
+
+ :param resource: resource used to release ID
+ :param unique_id: id need to be released
+ """
+ pos = ((int(unique_id)) - resource[PONResourceManager.START_IDX])
+ resource[PONResourceManager.POOL].set(0, pos)
+
+ def _get_path(self, pon_intf_id, resource_type):
+ """
+ Get path for given resource type.
+
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :return: path for given resource type
+ """
+ path = None
+ if resource_type == PONResourceManager.ONU_ID:
+ path = self._get_onu_id_resource_path(pon_intf_id)
+ elif resource_type == PONResourceManager.ALLOC_ID:
+ path = self._get_alloc_id_resource_path(pon_intf_id)
+ elif resource_type == PONResourceManager.GEMPORT_ID:
+ path = self._get_gemport_id_resource_path(pon_intf_id)
+ else:
+ self._log.error("invalid-resource-pool-identifier")
+ return path
+
+ def _get_alloc_id_resource_path(self, pon_intf_id):
+ """
+ Get alloc id resource path.
+
+ :param pon_intf_id: OLT PON interface id
+ :return: alloc id resource path
+ """
+ return PONResourceManager.ALLOC_ID_POOL_PATH.format(
+ self.device_id, pon_intf_id)
+
+ def _get_gemport_id_resource_path(self, pon_intf_id):
+ """
+ Get gemport id resource path.
+
+ :param pon_intf_id: OLT PON interface id
+ :return: gemport id resource path
+ """
+ return PONResourceManager.GEMPORT_ID_POOL_PATH.format(
+ self.device_id, pon_intf_id)
+
+ def _get_onu_id_resource_path(self, pon_intf_id):
+ """
+ Get onu id resource path.
+
+ :param pon_intf_id: OLT PON interface id
+ :return: onu id resource path
+ """
+ return PONResourceManager.ONU_ID_POOL_PATH.format(
+ self.device_id, pon_intf_id)
+
+ def _update_resource(self, path, resource):
+ """
+ Update resource in resource kv store.
+
+ :param path: path to update resource
+ :param resource: resource need to be updated
+ :return boolean: True if resource updated in kv store else False
+ """
+ resource[PONResourceManager.POOL] = \
+ resource[PONResourceManager.POOL].bin
+ result = self._kv_store.update_to_kv_store(path, json.dumps(resource))
+ if result is True:
+ return True
+ return False
+
+ def _get_resource(self, path):
+ """
+ Get resource from kv store.
+
+ :param path: path to get resource
+ :return: resource if resource present in kv store else None
+ """
+ # get resource from kv store
+ result = self._kv_store.get_from_kv_store(path)
+ if result is None:
+ return result
+ self._log.info("dumping resource", result=result)
+ resource = result
+
+ if resource is not None:
+ # decode resource fetched from backend store to dictionary
+ resource = json.loads(resource)
+
+ # resource pool in backend store stored as binary string whereas to
+ # access the pool to generate/release IDs it need to be converted
+ # as BitArray
+ resource[PONResourceManager.POOL] = \
+ BitArray('0b' + resource[PONResourceManager.POOL])
+
+ return resource
+
+ def _format_resource(self, pon_intf_id, start_idx, end_idx):
+ """
+ Format resource as json.
+
+ :param pon_intf_id: OLT PON interface id
+ :param start_idx: start index for id pool
+ :param end_idx: end index for id pool
+ :return dictionary: resource formatted as dictionary
+ """
+ # Format resource as json to be stored in backend store
+ resource = dict()
+ resource[PONResourceManager.PON_INTF_ID] = pon_intf_id
+ resource[PONResourceManager.START_IDX] = start_idx
+ resource[PONResourceManager.END_IDX] = end_idx
+
+ # resource pool stored in backend store as binary string
+ resource[PONResourceManager.POOL] = BitArray(end_idx).bin
+
+ return json.dumps(resource)