#!/usr/bin/env python

# Copyright 2018 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
import time
from uuid import uuid4

import structlog
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
    DeferredQueue, gatherResults
from zope.interface import implementer

from pyvoltha.common.utils import asleep
from pyvoltha.common.utils.registry import IComponent
from .kafka_proxy import KafkaProxy, get_kafka_proxy
from voltha_protos.inter_container_pb2 import MessageType, Argument, \
    InterContainerRequestBody, InterContainerMessage, Header, \
    InterContainerResponseBody, StrType
import six
import codecs

log = structlog.get_logger()

KAFKA_OFFSET_LATEST = 'latest'
KAFKA_OFFSET_EARLIEST = 'earliest'
ARG_FROM_TOPIC = 'fromTopic'


class KafkaMessagingError(Exception):
    def __init__(self, error):
        self.error = error


@implementer(IComponent)
class IKafkaMessagingProxy(object):
    _kafka_messaging_instance = None

    def __init__(self,
                 kafka_host_port,
                 kv_store,
                 default_topic,
                 group_id_prefix,
                 target_cls):
        """
        Initialize the kafka proxy.  This is a singleton (may change to
        non-singleton if performance is better)
        :param kafka_host_port: Kafka host and port
        :param kv_store: Key-Value store
        :param default_topic: Default topic to subscribe to
        :param target_cls: target class - method of that class is invoked
        when a message is received on the default_topic
        """
        # return an exception if the object already exist
        if IKafkaMessagingProxy._kafka_messaging_instance:
            raise Exception(
                'Singleton-exist', cls=IKafkaMessagingProxy)

        log.debug("Initializing-KafkaProxy")
        self.kafka_host_port = kafka_host_port
        self.kv_store = kv_store
        self.default_topic = default_topic
        self.default_group_id = "_".join((group_id_prefix, default_topic))
        self.target_cls = target_cls
        self.topic_target_cls_map = {}
        self.topic_callback_map = {}
        self.subscribers = {}
        self.kafka_proxy = None
        self.transaction_id_deferred_map = {}
        self.received_msg_queue = DeferredQueue()
        self.stopped = False

        self.init_time = 0
        self.init_received_time = 0

        self.init_resp_time = 0
        self.init_received_resp_time = 0

        self.num_messages = 0
        self.total_time = 0
        self.num_responses = 0
        self.total_time_responses = 0
        log.debug("KafkaProxy-initialized")

    def start(self):
        try:
            log.debug("KafkaProxy-starting")

            # Get the kafka proxy instance.  If it does not exist then
            # create it
            self.kafka_proxy = get_kafka_proxy()
            if self.kafka_proxy == None:
                KafkaProxy(kafka_endpoint=self.kafka_host_port).start()
                self.kafka_proxy = get_kafka_proxy()

            # Subscribe the default topic and target_cls
            self.topic_target_cls_map[self.default_topic] = self.target_cls

            # Start the queue to handle incoming messages
            reactor.callLater(0, self._received_message_processing_loop)

            # Subscribe using the default topic and default group id.  Whenever
            # a message is received on that topic then teh target_cls will be
            # invoked.
            reactor.callLater(0, self.subscribe,
                              topic=self.default_topic,
                              target_cls=self.target_cls,
                              group_id=self.default_group_id)

            # Setup the singleton instance
            IKafkaMessagingProxy._kafka_messaging_instance = self
            log.debug("KafkaProxy-started")
        except Exception as e:
            log.exception("Failed-to-start-proxy", e=e)

    def stop(self):
        """
        Invoked to stop the kafka proxy
        :return: None on success, Exception on failure
        """
        log.debug("Stopping-messaging-proxy ...")
        try:
            # Stop the kafka proxy.  This will stop all the consumers
            # and producers
            self.stopped = True
            self.kafka_proxy.stop()
            log.debug("Messaging-proxy-stopped.")
        except Exception as e:
            log.exception("Exception-when-stopping-messaging-proxy:", e=e)

    def get_target_cls(self):
        return self.target_cls

    def get_default_topic(self):
        return self.default_topic

    @inlineCallbacks
    def _subscribe_group_consumer(self, group_id, topic, offset, callback=None,
                                  target_cls=None):
        try:
            log.debug("subscribing-to-topic-start", topic=topic)
            yield self.kafka_proxy.subscribe(topic,
                                             self._enqueue_received_group_message,
                                             group_id, offset)

            if target_cls is not None and callback is None:
                # Scenario #1
                if topic not in self.topic_target_cls_map:
                    self.topic_target_cls_map[topic] = target_cls
            elif target_cls is None and callback is not None:
                # Scenario #2
                log.debug("custom-callback", topic=topic,
                          callback_map=self.topic_callback_map)
                if topic not in self.topic_callback_map:
                    self.topic_callback_map[topic] = [callback]
                else:
                    self.topic_callback_map[topic].extend([callback])
            else:
                log.warn("invalid-parameters")

            returnValue(True)
        except Exception as e:
            log.exception("Exception-during-subscription", e=e)
            returnValue(False)

    @inlineCallbacks
    def subscribe(self, topic, callback=None, target_cls=None,
                  max_retry=3, group_id=None, offset=KAFKA_OFFSET_LATEST):
        """
        Scenario 1:  invoked to subscribe to a specific topic with a
        target_cls to invoke when a message is received on that topic.  This
        handles the case of request/response where this library performs the
        heavy lifting. In this case the m_callback must to be None

        Scenario 2:  invoked to subscribe to a specific topic with a
        specific callback to invoke when a message is received on that topic.
        This handles the case where the caller wants to process the message
        received itself. In this case the target_cls must to be None

        :param topic: topic to subscribe to
        :param callback: Callback to invoke when a message is received on
        the topic. Either one of callback or target_cls needs can be none
        :param target_cls:  Target class to use when a message is
        received on the topic. There can only be 1 target_cls per topic.
        Either one of callback or target_cls needs can be none
        :param max_retry:  the number of retries before reporting failure
        to subscribe.  This caters for scenario where the kafka topic is not
        ready.
        :param group_id:  The ID of the group the consumer is subscribing to
        :param offset: The topic offset on the kafka bus from where message consumption will start
        :return: True on success, False on failure
        """
        RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]

        def _backoff(msg, retries):
            wait_time = RETRY_BACKOFF[min(retries,
                                          len(RETRY_BACKOFF) - 1)]
            log.info(msg, retry_in=wait_time)
            return asleep.asleep(wait_time)

        log.debug("subscribing", topic=topic, group_id=group_id,
                  callback=callback, target=target_cls)

        retry = 0
        subscribed = False
        if group_id is None:
            group_id = self.default_group_id
        while not subscribed:
            subscribed = yield self._subscribe_group_consumer(group_id, topic,
                                                              callback=callback,
                                                              target_cls=target_cls,
                                                              offset=offset)
            if subscribed:
                returnValue(True)
            elif retry > max_retry:
                returnValue(False)
            else:
                _backoff("subscription-not-complete", retry)
                retry += 1

    def unsubscribe(self, topic, callback=None, target_cls=None):
        """
        Invoked when unsubscribing to a topic
        :param topic: topic to unsubscribe from
        :param callback:  the callback used when subscribing to the topic, if any
        :param target_cls: the targert class used when subscribing to the topic, if any
        :return: None on success or Exception on failure
        """
        log.debug("Unsubscribing-to-topic", topic=topic)

        try:
            self.kafka_proxy.unsubscribe(topic,
                                         self._enqueue_received_group_message)

            if callback is None and target_cls is None:
                log.error("both-call-and-target-cls-cannot-be-none",
                          topic=topic)
                raise KafkaMessagingError(
                    error="both-call-and-target-cls-cannot-be-none")

            if target_cls is not None and topic in self.topic_target_cls_map:
                del self.topic_target_cls_map[topic]

            if callback is not None and topic in self.topic_callback_map:
                index = 0
                for cb in self.topic_callback_map[topic]:
                    if cb == callback:
                        break
                    index += 1
                if index < len(self.topic_callback_map[topic]):
                    self.topic_callback_map[topic].pop(index)

                if len(self.topic_callback_map[topic]) == 0:
                    del self.topic_callback_map[topic]
        except Exception as e:
            log.exception("Exception-when-unsubscribing-to-topic", topic=topic,
                          e=e)
            return e

    @inlineCallbacks
    def _enqueue_received_group_message(self, msg):
        """
        Internal method to continuously queue all received messaged
        irrespective of topic
        :param msg: Received message
        :return: None on success, Exception on failure
        """
        try:
            log.debug("received-msg", msg=msg)
            yield self.received_msg_queue.put(msg)
        except Exception as e:
            log.exception("Failed-enqueueing-received-message", e=e)

    @inlineCallbacks
    def _received_message_processing_loop(self):
        """
        Internal method to continuously process all received messages one
        at a time
        :return: None on success, Exception on failure
        """
        while True:
            try:
                message = yield self.received_msg_queue.get()
                reactor.callLater(0, self._process_message, message)
                if self.stopped:
                    break
            except Exception as e:
                log.exception("Failed-dequeueing-received-message", e=e)

    def _to_string(self, unicode_str):
        if unicode_str is not None:
            if isinstance(unicode_str, six.string_types):
                return unicode_str
            else:
                return codecs.encode(unicode_str, 'ascii')
        else:
            return None

    def _format_request(self,
                        rpc,
                        to_topic,
                        reply_topic,
                        **kwargs):
        """
        Format a request to send over kafka
        :param rpc: Requested remote API
        :param to_topic: Topic to send the request
        :param reply_topic: Topic to receive the resulting response, if any
        :param kwargs: Dictionary of key-value pairs to pass as arguments to
        the remote rpc API.
        :return: A InterContainerMessage message type on success or None on
        failure
        """
        try:
            transaction_id = uuid4().hex
            request = InterContainerMessage()
            request_body = InterContainerRequestBody()
            request.header.id = transaction_id
            request.header.type = MessageType.Value("REQUEST")
            request.header.from_topic = reply_topic
            request.header.to_topic = to_topic

            response_required = False
            if reply_topic:
                request_body.reply_to_topic = reply_topic
                request_body.response_required = True
                response_required = True

            request.header.timestamp.GetCurrentTime()
            request_body.rpc = rpc
            for a, b in six.iteritems(kwargs):
                arg = Argument()
                arg.key = a
                try:
                    arg.value.Pack(b)
                    request_body.args.extend([arg])
                except Exception as e:
                    log.exception("Failed-parsing-value", e=e)
            request.body.Pack(request_body)
            return request, transaction_id, response_required
        except Exception as e:
            log.exception("formatting-request-failed",
                          rpc=rpc,
                          to_topic=to_topic,
                          reply_topic=reply_topic,
                          args=kwargs)
            return None, None, None

    def _format_response(self, msg_header, msg_body, status):
        """
        Format a response
        :param msg_header: The header portion of a received request
        :param msg_body: The response body
        :param status: True is this represents a successful response
        :return: a InterContainerMessage message type
        """
        try:
            assert isinstance(msg_header, Header)
            response = InterContainerMessage()
            response_body = InterContainerResponseBody()
            response.header.id = msg_header.id
            response.header.timestamp.GetCurrentTime()
            response.header.type = MessageType.Value("RESPONSE")
            response.header.from_topic = msg_header.to_topic
            response.header.to_topic = msg_header.from_topic
            if msg_body is not None:
                response_body.result.Pack(msg_body)
            response_body.success = status
            response.body.Pack(response_body)
            return response
        except Exception as e:
            log.exception("formatting-response-failed", header=msg_header,
                          body=msg_body, status=status, e=e)
            return None

    def _parse_response(self, msg):
        try:
            message = InterContainerMessage()
            message.ParseFromString(msg)
            resp = InterContainerResponseBody()
            if message.body.Is(InterContainerResponseBody.DESCRIPTOR):
                message.body.Unpack(resp)
            else:
                log.debug("unsupported-msg", msg_type=type(message.body))
                return None
            log.debug("parsed-response", type=message.header.type, from_topic=message.header.from_topic,
                  to_topic=message.header.to_topic, transaction_id=message.header.id)
            return resp
        except Exception as e:
            log.exception("parsing-response-failed", msg=msg, e=e)
            return None

    @inlineCallbacks
    def _process_message(self, m):
        """
        Default internal method invoked for every batch of messages received
        from Kafka.
        """

        def _augment_args_with_FromTopic(args, from_topic):
            arg = Argument(key=ARG_FROM_TOPIC)
            t = StrType(val=from_topic)
            arg.value.Pack(t)
            args.extend([arg])
            return args

        def _toDict(args):
            """
            Convert a repeatable Argument type into a python dictionary
            :param args: Repeatable core_adapter.Argument type
            :return: a python dictionary
            """
            if args is None:
                return None
            result = {}
            for arg in args:
                assert isinstance(arg, Argument)
                result[arg.key] = arg.value
            return result

        current_time = int(time.time() * 1000)
        # log.debug("Got Message", message=m)
        try:
            val = m.value()
            # val = m.message.value
            # print m.topic

            # Go over customized callbacks first
            m_topic = m.topic()
            if m_topic in self.topic_callback_map:
                for c in self.topic_callback_map[m_topic]:
                    yield c(val)

            #  Check whether we need to process request/response scenario
            if m_topic not in self.topic_target_cls_map:
                return

            # Process request/response scenario
            message = InterContainerMessage()
            message.ParseFromString(val)

            if message.header.type == MessageType.Value("REQUEST"):
                # Get the target class for that specific topic
                targetted_topic = self._to_string(message.header.to_topic)
                msg_body = InterContainerRequestBody()
                if message.body.Is(InterContainerRequestBody.DESCRIPTOR):
                    message.body.Unpack(msg_body)
                else:
                    log.debug("unsupported-msg", msg_type=type(message.body))
                    return
                if targetted_topic in self.topic_target_cls_map:
                    # Augment the request arguments with the from_topic
                    augmented_args = _augment_args_with_FromTopic(msg_body.args,
                                                        msg_body.reply_to_topic)
                    if augmented_args:
                        log.debug("message-body-args-present", rpc=msg_body.rpc,
                                  response_required=msg_body.response_required, reply_to_topic=msg_body.reply_to_topic)
                        (status, res) = yield getattr(
                            self.topic_target_cls_map[targetted_topic],
                            self._to_string(msg_body.rpc))(
                            **_toDict(augmented_args))
                    else:
                        log.debug("message-body-args-absent", rpc=msg_body.rpc,
                                  response_required=msg_body.response_required, reply_to_topic=msg_body.reply_to_topic,)
                        (status, res) = yield getattr(
                            self.topic_target_cls_map[targetted_topic],
                            self._to_string(msg_body.rpc))()
                    if msg_body.response_required:
                        response = self._format_response(
                            msg_header=message.header,
                            msg_body=res,
                            status=status,
                        )
                        if response is not None:
                            res_topic = self._to_string(
                                response.header.to_topic)
                            self._send_kafka_message(res_topic, response)

                        log.debug("Response-sent",
                                  to_topic=res_topic)
            elif message.header.type == MessageType.Value("RESPONSE"):
                trns_id = self._to_string(message.header.id)
                log.debug('received-response', transaction_id=trns_id)
                if trns_id in self.transaction_id_deferred_map:
                    resp = self._parse_response(val)

                    self.transaction_id_deferred_map[trns_id].callback(resp)
            else:
                log.error("!!INVALID-TRANSACTION-TYPE!!")

        except Exception as e:
            log.exception("Failed-to-process-message", message=m, e=e)

    @inlineCallbacks
    def _send_kafka_message(self, topic, msg):
        try:
            yield self.kafka_proxy.send_message(topic, msg.SerializeToString())
        except Exception as e:
            log.exception("Failed-sending-message", message=msg, e=e)

    @inlineCallbacks
    def send_request(self,
                     rpc,
                     to_topic,
                     reply_topic=None,
                     callback=None,
                     **kwargs):
        """
        Invoked to send a message to a remote container and receive a
        response if required.
        :param rpc: The remote API to invoke
        :param to_topic: Send the message to this kafka topic
        :param reply_topic: If not None then a response is expected on this
        topic.  If set to None then no response is required.
        :param callback: Callback to invoke when a response is received.
        :param kwargs: Key-value pairs representing arguments to pass to the
        rpc remote API.
        :return: Either no response is required, or a response is returned
        via the callback or the response is a tuple of (status, return_cls)
        """
        try:
            # Ensure all strings are not unicode encoded
            rpc = self._to_string(rpc)
            to_topic = self._to_string(to_topic)
            reply_topic = self._to_string(reply_topic)

            request, transaction_id, response_required = \
                self._format_request(
                    rpc=rpc,
                    to_topic=to_topic,
                    reply_topic=reply_topic,
                    **kwargs)

            if request is None:
                return

            # Add the transaction to the transaction map before sending the
            # request.  This will guarantee the eventual response will be
            # processed.
            wait_for_result = None
            if response_required:
                wait_for_result = Deferred()
                self.transaction_id_deferred_map[
                    self._to_string(request.header.id)] = wait_for_result
            log.debug("message-send", transaction_id=transaction_id, to_topic=to_topic,
                      from_topic=reply_topic, rpc=rpc)
            yield self._send_kafka_message(to_topic, request)
            log.debug("message-sent", transaction_id=transaction_id, to_topic=to_topic,
                      from_topic=reply_topic, rpc=rpc)

            if response_required:
                res = yield wait_for_result

                # Remove the transaction from the transaction map
                del self.transaction_id_deferred_map[transaction_id]

                if res is not None:
                    if res.success:
                        log.debug("send-message-response", transaction_id=transaction_id, rpc=rpc)
                        if callback:
                            callback((res.success, res.result))
                        else:
                            returnValue((res.success, res.result))
                    else:
                        # this is the case where the core API returns a grpc code.NotFound.  Return or callback
                        # so the caller can act appropriately (i.e add whatever was not found)
                        log.warn("send-message-response-error-result", transaction_id=transaction_id, rpc=rpc, kafka_request=request, kafka_result=res)
                        if callback:
                            callback((res.success, None))
                        else:
                            returnValue((res.success, None))
                else:
                    raise KafkaMessagingError(error="failed-response-for-request:{}".format(request))

        except Exception as e:
            log.exception("Exception-sending-request", e=e)
            raise KafkaMessagingError(error=e)


# Common method to get the singleton instance of the kafka proxy class
def get_messaging_proxy():
    return IKafkaMessagingProxy._kafka_messaging_instance
