
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# Copyright 2015, Big Switch Networks, Inc.

"""
OpenFlow connection class

This class creates a thread which continually parses OpenFlow messages off the
supplied socket and places them in a queue. The class has methods for reading messages
from the RX queue, sending messages, and higher level operations like request-response
and multipart transactions.
"""

import loxi
import loxi.of14
import logging
import time
import socket
import errno
import os
import select
from threading import Condition, Lock, Thread

DEFAULT_TIMEOUT = 1

class TransactionError(Exception):
    def __str__(self):
        return self.args[0]

    @property
    def msg(self):
        return self.args[1]

class Connection(Thread):
    def __init__(self, sock):
        Thread.__init__(self)
        self.sock = sock
        self.logger = logging.getLogger("connection")
        self.rx = []
        self.rx_cv = Condition()
        self.tx_lock = Lock()
        self.next_xid = 1
        self.wakeup_rd, self.wakeup_wr = os.pipe()
        self.finished = False
        self.read_buffer = None

    def run(self):
        while not self.finished:
            rd, wr, err = select.select([self.sock, self.wakeup_rd], [], [])
            if self.sock in rd:
                self.process_read()
            if self.wakeup_rd in rd:
                os.read(self.wakeup_rd, 1)
        self.logger.debug("Exited event loop")

    def process_read(self):
        recvd = self.sock.recv(4096)

        self.logger.debug("Received %d bytes", len(recvd))

        buf = self.read_buffer
        if buf:
            buf += recvd
        else:
            buf = recvd

        offset = 0
        while offset < len(buf):
            if offset + 8 > len(buf):
                # Not enough data for the OpenFlow header
                break

            # Parse the header to get type
            hdr_version, hdr_type, hdr_msglen, hdr_xid = loxi.of14.message.parse_header(buf[offset:])

            # Use loxi to resolve ofp of matching version
            ofp = loxi.protocol(hdr_version)

            # Extract the raw message bytes
            if (offset + hdr_msglen) > len(buf):
                # Not enough data for the body
                break
            rawmsg = buf[offset : offset + hdr_msglen]
            offset += hdr_msglen

            msg = ofp.message.parse_message(rawmsg)
            if not msg:
                self.logger.warn("Could not parse message")
                continue

            self.logger.debug("Received message %s.%s xid %d length %d",
                              type(msg).__module__, type(msg).__name__, hdr_xid, hdr_msglen)

            with self.rx_cv:
                self.rx.append(msg)
                self.rx_cv.notify_all()

        if offset == len(buf):
            self.read_buffer = None
        else:
            self.read_buffer = buf[offset:]
            self.logger.debug("%d bytes remaining", len(self.read_buffer))

    def recv(self, predicate, timeout=DEFAULT_TIMEOUT):
        """
        Remove and return the first message in the RX queue for
        which 'predicate' returns true
        """
        assert self.is_alive()

        deadline = time.time() + timeout
        while True:
            with self.rx_cv:
                for i, msg in enumerate(self.rx):
                    if predicate(msg):
                        return self.rx.pop(i)

                now = time.time()
                if now > deadline:
                    return None
                else:
                    self.rx_cv.wait(deadline - now)

    def recv_any(self, timeout=DEFAULT_TIMEOUT):
        """
        Return the first message in the RX queue
        """
        return self.recv(lambda msg: True, timeout)

    def recv_xid(self, xid, timeout=DEFAULT_TIMEOUT):
        """
        Return the first message in the RX queue with XID 'xid'
        """
        return self.recv(lambda msg: msg.xid == xid, timeout)

    def recv_class(self, klass, timeout=DEFAULT_TIMEOUT):
        """
        Return the first message in the RX queue which is an instance of 'klass'
        """
        return self.recv(lambda msg: isinstance(msg, klass), timeout)

    def send_raw(self, buf):
        """
        Send raw bytes on the socket
        """
        assert self.is_alive()
        self.logger.debug("Sending raw message length %d", len(buf))
        with self.tx_lock:
            if self.sock.sendall(buf) is not None:
                raise RuntimeError("failed to send message to switch")

    def send(self, msg):
        """
        Send a message
        """
        assert self.is_alive()

        if msg.xid is None:
            msg.xid = self._gen_xid()
        buf = msg.pack()
        self.logger.debug("Sending message %s.%s xid %d length %d",
                          type(msg).__module__, type(msg).__name__, msg.xid, len(buf))
        with self.tx_lock:
            if self.sock.sendall(buf) is not None:
                raise RuntimeError("failed to send message to switch")

    def transact(self, msg, timeout=DEFAULT_TIMEOUT):
        """
        Send a message and return the reply
        """
        self.send(msg)
        reply = self.recv_xid(msg.xid, timeout)
        if reply is None:
            raise TransactionError("no reply for %s" % type(msg).__name__, None)
        elif isinstance(reply, loxi.protocol(reply.version).message.error_msg):
            raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
        return reply

    def transact_multipart_generator(self, msg, timeout=DEFAULT_TIMEOUT):
        """
        Send a multipart request and yield each entry from the replies
        """
        self.send(msg)
        finished = False
        while not finished:
            reply = self.recv_xid(msg.xid, timeout)
            if reply is None:
                raise TransactionError("no reply for %s" % type(msg).__name__, None)
            elif not isinstance(reply, loxi.protocol(reply.version).message.stats_reply):
                raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
            for entry in reply.entries:
                yield entry
            finished = reply.flags & loxi.protocol(reply.version).OFPSF_REPLY_MORE == 0

    def transact_multipart(self, msg, timeout=DEFAULT_TIMEOUT):
        """
        Send a multipart request and return all entries from the replies
        """
        entries = []
        for entry in self.transact_multipart_generator(msg, timeout):
            entries.append(entry)
        return entries

    def stop(self):
        """
        Signal the thread to exit and wait for it
        """
        assert not self.finished
        self.logger.debug("Stopping connection")
        self.finished = True
        os.write(self.wakeup_wr, "x")
        self.join()
        self.sock.close()
        os.close(self.wakeup_rd)
        os.close(self.wakeup_wr)
        self.logger.debug("Stopped connection")

    def _gen_xid(self):
        xid = self.next_xid
        self.next_xid += 1
        return xid

def connect(ip, port=6653, daemon=True, ofp=loxi.of14):
    """
    Actively connect to a switch
    """
    soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    soc.connect((ip, port))
    soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
    cxn = Connection(soc)
    cxn.daemon = daemon
    cxn.logger.debug("Connected to %s:%d", ip, port)
    cxn.start()

    cxn.send(ofp.message.hello())
    if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
        raise Exception("Did not receive HELLO")

    return cxn
