# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import confluent_kafka
import imp
import inspect
import os
import threading
import time
from xosconfig import Config


class XOSKafkaMessage:
    def __init__(self, consumer_msg):

        self.topic = consumer_msg.topic()
        self.key = consumer_msg.key()
        self.value = consumer_msg.value()

        self.timestamp = None
        (ts_type, ts_val) = consumer_msg.timestamp()

        if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
            self.timestamp = ts_val


class XOSKafkaThread(threading.Thread):
    """ XOSKafkaThread

        A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
        Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
        function is called for each event.
    """

    def __init__(self, step, bootstrap_servers, model_accessor, log, *args, **kwargs):
        super(XOSKafkaThread, self).__init__(*args, **kwargs)
        self.consumer = None
        self.step = step
        self.bootstrap_servers = bootstrap_servers
        self.model_accessor = model_accessor
        self.log = log
        self.daemon = True

    def create_kafka_consumer(self):
        # use the service name as the group id
        consumer_config = {
            "group.id": Config().get("name"),
            "bootstrap.servers": ",".join(self.bootstrap_servers),
            "default.topic.config": {"auto.offset.reset": "smallest"},
        }

        return confluent_kafka.Consumer(**consumer_config)

    def run(self):
        if (not self.step.topics) and (not self.step.pattern):
            raise Exception(
                "Neither topics nor pattern is defined for step %s" % self.step.__name__
            )

        if self.step.topics and self.step.pattern:
            raise Exception(
                "Both topics and pattern are defined for step %s. Choose one."
                % self.step.__name__
            )

        self.log.info(
            "Waiting for events",
            topic=self.step.topics,
            pattern=self.step.pattern,
            step=self.step.__name__,
        )

        while True:
            try:
                # setup consumer or loop on failure
                if self.consumer is None:
                    self.consumer = self.create_kafka_consumer()

                    if self.step.topics:
                        self.consumer.subscribe(self.step.topics)

                    elif self.step.pattern:
                        self.consumer.subscribe(self.step.pattern)

            except confluent_kafka.KafkaError._ALL_BROKERS_DOWN as e:
                self.log.warning(
                    "No brokers available on %s, %s" % (self.bootstrap_servers, e)
                )
                time.sleep(20)
                continue

            except confluent_kafka.KafkaError as e:
                # Maybe Kafka has not started yet. Log the exception and try again in a second.
                self.log.exception("Exception in kafka loop: %s" % e)
                time.sleep(1)
                continue

            # wait until we get a message, if no message, loop again
            msg = self.consumer.poll(timeout=1.0)

            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
                    self.log.debug(
                        "Reached end of kafka topic %s, partition: %s, offset: %d"
                        % (msg.topic(), msg.partition(), msg.offset())
                    )
                else:
                    self.log.exception("Error in kafka message: %s" % msg.error())

            else:
                # wrap parsing the event in a class
                event_msg = XOSKafkaMessage(msg)

                self.log.info(
                    "Processing event", event_msg=event_msg, step=self.step.__name__
                )

                try:
                    self.step(model_accessor=self.model_accessor, log=self.log).process_event(event_msg)

                except BaseException:
                    self.log.exception(
                        "Exception in event step",
                        event_msg=event_msg,
                        step=self.step.__name__,
                    )


class XOSEventEngine(object):
    """ XOSEventEngine

        Subscribe to and handle processing of events. Two methods are defined:

            load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
                                       descendant from EventStep.

            start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
                        will be called before start().
    """

    def __init__(self, model_accessor, log):
        self.event_steps = []
        self.threads = []
        self.model_accessor = model_accessor
        self.log = log

    def load_event_step_modules(self, event_step_dir):
        self.event_steps = []
        self.log.info("Loading event steps", event_step_dir=event_step_dir)

        # NOTE we'll load all the classes that inherit from EventStep
        for fn in os.listdir(event_step_dir):
            pathname = os.path.join(event_step_dir, fn)
            if (
                os.path.isfile(pathname)
                and fn.endswith(".py")
                and (fn != "__init__.py")
                and ("test" not in fn)
            ):
                event_module = imp.load_source(fn[:-3], pathname)

                for classname in dir(event_module):
                    c = getattr(event_module, classname, None)

                    if inspect.isclass(c):
                        base_names = [b.__name__ for b in c.__bases__]
                        if "EventStep" in base_names:
                            self.event_steps.append(c)
        self.log.info("Loaded event steps", steps=self.event_steps)

    def start(self):
        eventbus_kind = Config.get("event_bus.kind")
        eventbus_endpoint = Config.get("event_bus.endpoint")

        if not eventbus_kind:
            self.log.error(
                "Eventbus kind is not configured in synchronizer config file."
            )
            return

        if eventbus_kind not in ["kafka"]:
            self.log.error(
                "Eventbus kind is set to a technology we do not implement.",
                eventbus_kind=eventbus_kind,
            )
            return

        if not eventbus_endpoint:
            self.log.error(
                "Eventbus endpoint is not configured in synchronizer config file."
            )
            return

        for step in self.event_steps:
            if step.technology == "kafka":
                thread = XOSKafkaThread(step, [eventbus_endpoint], self.model_accessor, self.log)
                thread.start()
                self.threads.append(thread)
            else:
                self.log.error(
                    "Unknown technology. Skipping step",
                    technology=step.technology,
                    step=step.__name__,
                )
