#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
gRPC client meant to connect to a gRPC server endpoint, and query the
end-point's schema by calling SchemaService.Schema(Empty) and all of its
semantics are derived from the recovered schema.
"""

import os
import sys
from zlib import decompress

import grpc
from grpc._channel import _Rendezvous
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from werkzeug.exceptions import ServiceUnavailable

from common.utils.asleep import asleep
from netconf.protos import third_party
from netconf.protos.schema_pb2 import SchemaServiceStub
from google.protobuf.empty_pb2 import Empty
from common.utils.consulhelpers import get_endpoint_from_consul
# from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
#     VolthaGlobalServiceStub
# from google.protobuf import empty_pb2
# from google.protobuf.json_format import MessageToDict, ParseDict
from nc_rpc_mapper import get_nc_rpc_mapper_instance
from google.protobuf import descriptor
import base64
import math
import collections
from netconf.constants import Constants as C

_INT64_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_INT64,
                          descriptor.FieldDescriptor.CPPTYPE_UINT64])
_FLOAT_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_FLOAT,
                          descriptor.FieldDescriptor.CPPTYPE_DOUBLE])
_INFINITY = 'Infinity'
_NEG_INFINITY = '-Infinity'
_NAN = 'NaN'

log = get_logger()


class GrpcClient(object):
    """
    Connect to a gRPC server, fetch its schema, and process the downloaded
    proto schema files.  The goal is to convert the proto schemas into yang
    schemas which would be exposed to the Netconf client.
    """
    RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]

    def __init__(self, consul_endpoint, work_dir,
                 grpc_endpoint='localhost:50055',
                 reconnect_callback=None,
                 on_start_callback=None):
        self.consul_endpoint = consul_endpoint
        self.grpc_endpoint = grpc_endpoint
        self.work_dir = work_dir
        self.reconnect_callback = reconnect_callback
        self.on_start_callback = on_start_callback

        self.plugin_dir = os.path.abspath(os.path.join(
            os.path.dirname(__file__), '../protoc_plugins'))

        self.yang_schemas = set()

        self.channel = None
        self.local_stub = None
        self.schema = None
        self.retries = 0
        self.shutting_down = False
        self.connected = False

    def start(self):
        log.debug('starting')
        if not self.connected:
            reactor.callLater(0, self.connect)
        log.info('started')
        return self

    def stop(self):
        log.debug('stopping')
        if self.shutting_down:
            return
        self.shutting_down = True
        log.info('stopped')

    def set_on_start_callback(self, on_start_callback):
        self.on_start_callback = on_start_callback
        return self

    def set_reconnect_callback(self, reconnect_callback):
        self.reconnect_callback = reconnect_callback
        return self

    def resolve_endpoint(self, endpoint):
        ip_port_endpoint = endpoint
        if endpoint.startswith('@'):
            try:
                ip_port_endpoint = get_endpoint_from_consul(
                    self.consul_endpoint, endpoint[1:])
                log.info('endpoint-found',
                         endpoint=endpoint, ip_port=ip_port_endpoint)
            except Exception as e:
                log.error('service-not-found-in-consul', endpoint=endpoint,
                          exception=repr(e))
                return None, None
        if ip_port_endpoint:
            host, port = ip_port_endpoint.split(':', 2)
            return host, int(port)

    @inlineCallbacks
    def connect(self):
        """
        (Re-)Connect to end-point
        """
        if self.shutting_down or self.connected:
            return

        try:
            host, port = self.resolve_endpoint(self.grpc_endpoint)

            # If host and port is not set then we will retry
            if host and port:
                log.info('grpc-endpoint-connecting', host=host, port=port)
                self.channel = grpc.insecure_channel(
                    '{}:{}'.format(host, port))

                yang_from = self._retrieve_schema()
                log.info('proto-to-yang-schema', file=yang_from)
                self._compile_proto_files(yang_from)
                self._set_yang_schemas()

                self._clear_backoff()

                if self.on_start_callback is not None:
                    reactor.callLater(0, self.on_start_callback)

                self.connected = True
                if self.reconnect_callback is not None:
                    reactor.callLater(0, self.reconnect_callback)

                return

        except _Rendezvous, e:
            if e.code() == grpc.StatusCode.UNAVAILABLE:
                log.info('grpc-endpoint-not-available')
            else:
                log.exception(e)
            yield self._backoff('not-available')

        except Exception, e:
            if not self.shutting_down:
                log.exception('cannot-connect', endpoint=_endpoint)
            yield self._backoff('unknown-error')

        reactor.callLater(1, self.connect)

    def _backoff(self, msg):
        wait_time = self.RETRY_BACKOFF[min(self.retries,
                                           len(self.RETRY_BACKOFF) - 1)]
        self.retries += 1
        log.error(msg, retry_in=wait_time)
        return asleep(wait_time)

    def _clear_backoff(self):
        if self.retries:
            log.info('reconnected', after_retries=self.retries)
            self.retries = 0

    def _retrieve_schema(self):
        """
        Retrieve schema from gRPC end-point, and save all *.proto files in
        the work directory.
        """
        assert isinstance(self.channel, grpc.Channel)
        stub = SchemaServiceStub(self.channel)
        # try:
        schemas = stub.GetSchema(Empty())
        # except _Rendezvous, e:
        #     if e.code == grpc.StatusCode.UNAVAILABLE:
        #
        #     else:
        #         raise e

        os.system('mkdir -p %s' % self.work_dir)
        os.system('rm -fr /tmp/%s/*' %
                  self.work_dir.replace('/tmp/', ''))  # safer

        for proto_file in schemas.protos:
            proto_fname = proto_file.file_name
            # TODO: Do we need to process a set of files using a prefix
            # instead of just one?
            proto_content = proto_file.proto
            log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
                     length=len(proto_content))
            with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
                f.write(proto_content)

            desc_content = decompress(proto_file.descriptor)
            desc_fname = proto_fname.replace('.proto', '.desc')
            log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
                     length=len(desc_content))
            with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
                f.write(desc_content)
        return schemas.yang_from

    def _compile_proto_files(self, yang_from):
        """
        For each *.proto file in the work directory, compile the proto
        file into the respective *_pb2.py file as well as generate the
        corresponding yang schema.
        :return: None
        """
        log.info('start')
        google_api_dir = os.path.abspath(os.path.join(
            os.path.dirname(__file__), '../protos/third_party'
        ))

        log.info('google-api', api_dir=google_api_dir)

        netconf_base_dir = os.path.abspath(os.path.join(
            os.path.dirname(__file__), '../..'
        ))
        log.info('netconf-dir', dir=netconf_base_dir)

        for fname in [f for f in os.listdir(self.work_dir)
                      if f.endswith('.proto')]:
            log.info('filename', file=fname)

            need_yang = fname == yang_from
            log.debug('compiling',
                      file=fname,
                      yang_schema_required=need_yang)
            cmd = (
                'cd %s && '
                'env PATH=%s PYTHONPATH=%s '
                'python -m grpc.tools.protoc '
                '-I. '
                '-I%s '
                '--python_out=. '
                '--grpc_python_out=. '
                '--plugin=protoc-gen-gw=%s/rpc_gw_gen.py '
                '--gw_out=. '
                '--plugin=protoc-gen-custom=%s/proto2yang.py '
                '%s'
                '%s' % (
                    self.work_dir,
                    ':'.join([os.environ['PATH'], self.plugin_dir]),
                    ':'.join([google_api_dir, netconf_base_dir]),
                    google_api_dir,
                    self.plugin_dir,
                    self.plugin_dir,
                    '--custom_out=. ' if need_yang else '',
                    fname)
            )
            log.debug('executing', cmd=cmd, file=fname)
            os.system(cmd)
            log.info('compiled', file=fname)

        # Load the generated modules
        mapper = get_nc_rpc_mapper_instance(self.work_dir, self)
        mapper.load_modules()

    def _set_yang_schemas(self):
        if self.work_dir not in sys.path:
            sys.path.insert(0, self.work_dir)

        for fname in [f for f in os.listdir(self.work_dir)
                      if f.endswith('.yang')]:
            # Filter out schemas which are not required
            if fname not in C.SCHEMAS_TO_IGNORE:
                self.yang_schemas.add(fname[:-len('.yang')])
        log.info('yang-schemas', schemas=self.yang_schemas)

    @inlineCallbacks
    def invoke_voltha_rpc(self, service, method, params, metadata=None):
        try:
            mapper = get_nc_rpc_mapper_instance()

            # Get the mapping function using the service and method name
            func = mapper.get_function(service, method)
            if func is None:
                log.info('unsupported-rpc', service=service, method=method)
                return

            response = yield func(self, params, metadata)

            # Get the XML tag to use in the response
            xml_tag = mapper.get_xml_tag(service, method)

            # # Get the XML list item name used in the response
            # list_item_name = mapper.get_list_items_name(service, method)

            # Get the YANG defined fields (and their order) for that service
            # and method
            fields = mapper.get_fields_from_yang_defs(service, method)

            # Check if this represents a List and whether the field name is
            # items.  In the response (a dictionary), if a list named 'items'
            # is returned then 'items' can either:
            # 1) represent a list of items being returned where 'items' is just
            # a name to represent a list. In this case, this name will be
            # discarded
            # 2) represent the actual field name as defined in the proto
            # definitions.  If this is the case then we need to preserve the
            # name
            list_item_name = ''
            if fields: # if the return type is empty then fields will be None
                if len(fields) == 1:
                    if fields[0]['name'] == 'items':
                        list_item_name = 'items'

            # Rearrange the dictionary response as specified by the YANG
            # definitions
            rearranged_response = self.rearrange_dict(mapper, response, fields)

            log.info('rpc-result', service=service, method=method,
                     response=response,
                     rearranged_response=rearranged_response, xml_tag=xml_tag,
                     list_item_name=list_item_name, fields=fields)

            returnValue((rearranged_response, (xml_tag, list_item_name)))

        except Exception, e:
            log.exception('rpc-failure', service=service, method=method,
                          params=params, e=e)

    def rearrange_dict(self, mapper, orig_dict, fields):
        log.debug('rearranging-dict', fields=fields)
        result = collections.OrderedDict()
        if len(orig_dict) == 0 or not fields:
            return result
        for f in fields:
            if orig_dict.has_key(f['name']):
                if f['type_ref']:
                    # Get the fields for that type
                    sub_fields = mapper.get_fields_from_type_name(f['module'],
                                                                  f['type'])
                    if f['repeated']:
                        result[f['name']] = []
                        for d in orig_dict[f['name']]:
                            result[f['name']].append(self.rearrange_dict(
                                mapper, d, sub_fields))
                    else:
                        result[f['name']] = self.rearrange_dict(mapper,
                                                                orig_dict[
                                                                    f['name']],
                                                                sub_fields)
                else:
                    result[f['name']] = orig_dict[f['name']]
        return result

    @inlineCallbacks
    def invoke(self, stub, method_name, request, metadata, retry=1):
        """
        Invoke a gRPC call to the remote server and return the response.
        :param stub: Reference to the *_pb2 service stub
        :param method_name: The method name inside the service stub
        :param request: The request protobuf message
        :param metadata: [(str, str), (str, str), ...]
        :return: The response protobuf message and returned trailing metadata
        """

        if not self.connected:
            raise ServiceUnavailable()

        try:
            method = getattr(stub(self.channel), method_name)
            response, rendezvous = method.with_call(request, metadata=metadata)
            returnValue((response, rendezvous.trailing_metadata()))

        except grpc._channel._Rendezvous, e:
            code = e.code()
            if code == grpc.StatusCode.UNAVAILABLE:
                e = ServiceUnavailable()

                if self.connected:
                    self.connected = False
                    yield self.connect()
                    if retry > 0:
                        response = yield self.invoke(stub, method_name,
                                                     request, metadata,
                                                     retry=retry - 1)
                        returnValue(response)

            elif code in (
                    grpc.StatusCode.NOT_FOUND,
                    grpc.StatusCode.INVALID_ARGUMENT,
                    grpc.StatusCode.ALREADY_EXISTS):

                pass  # don't log error, these occur naturally

            else:
                log.exception(e)

            raise e

    # Below is an adaptation of Google's MessageToDict() which includes
    # protobuf options extensions

    class Error(Exception):
        """Top-level module error for json_format."""

    class SerializeToJsonError(Error):
        """Thrown if serialization to JSON fails."""

    def _IsMapEntry(self, field):
        return (field.type == descriptor.FieldDescriptor.TYPE_MESSAGE and
                field.message_type.has_options and
                field.message_type.GetOptions().map_entry)

    def convertToDict(self, message):
        """Converts message to an object according to Proto3 JSON Specification."""

        js = {}
        return self._RegularMessageToJsonObject(message, js)

    def get_yang_option(self, field):
        opt = field.GetOptions()
        yang_opt = {}
        for fd, val in opt.ListFields():
            if fd.full_name == 'voltha.yang_inline_node':
                yang_opt['id'] = val.id
                yang_opt['type'] = val.type
                # Fow now, a max of 1 yang option is set per field
                return yang_opt

    def _RegularMessageToJsonObject(self, message, js):
        """Converts normal message according to Proto3 JSON Specification."""
        fields = message.ListFields()

        try:
            for field, value in fields:
                # Check for options
                yang_opt = self.get_yang_option(field)

                name = field.name
                if self._IsMapEntry(field):
                    # Convert a map field.
                    v_field = field.message_type.fields_by_name['value']
                    js_map = {}
                    for key in value:
                        if isinstance(key, bool):
                            if key:
                                recorded_key = 'true'
                            else:
                                recorded_key = 'false'
                        else:
                            recorded_key = key
                        js_map[recorded_key] = self._FieldToJsonObject(
                            v_field, value[key])
                    js[name] = js_map
                elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
                    # Convert a repeated field.
                    js[name] = [self._FieldToJsonObject(field, k)
                                for k in value]
                else:
                    # This specific yang option applies only to non-repeated
                    # fields
                    if yang_opt:  # Create a map
                        js_map = {}
                        js_map['yang_field_option'] = True
                        js_map['yang_field_option_id'] = yang_opt['id']
                        js_map['yang_field_option_type'] = yang_opt['type']
                        js_map['name'] = name
                        js_map[name] = self._FieldToJsonObject(field, value)
                        js[name] = js_map
                    else:
                        js[name] = self._FieldToJsonObject(field, value)

            # Serialize default value if including_default_value_fields is True.
            message_descriptor = message.DESCRIPTOR
            for field in message_descriptor.fields:
                # Singular message fields and oneof fields will not be affected.
                if ((
                                    field.label != descriptor.FieldDescriptor.LABEL_REPEATED and
                                    field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE) or
                        field.containing_oneof):
                    continue
                name = field.name
                if name in js:
                    # Skip the field which has been serailized already.
                    continue
                if self._IsMapEntry(field):
                    js[name] = {}
                elif field.label == descriptor.FieldDescriptor.LABEL_REPEATED:
                    js[name] = []
                else:
                    js[name] = self._FieldToJsonObject(field,
                                                       field.default_value)

        except ValueError as e:
            raise self.SerializeToJsonError(
                'Failed to serialize {0} field: {1}.'.format(field.name, e))

        return js

    def _FieldToJsonObject(self, field, value):
        """Converts field value according to Proto3 JSON Specification."""
        if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE:
            return self.convertToDict(value)
        elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM:
            enum_value = field.enum_type.values_by_number.get(value, None)
            if enum_value is not None:
                return enum_value.name
            else:
                raise self.SerializeToJsonError('Enum field contains an '
                                                'integer value '
                                                'which can not mapped to an enum value.')
        elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING:
            if field.type == descriptor.FieldDescriptor.TYPE_BYTES:
                # Use base64 Data encoding for bytes
                return base64.b64encode(value).decode('utf-8')
            else:
                return value
        elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL:
            return bool(value)
        elif field.cpp_type in _INT64_TYPES:
            return str(value)
        elif field.cpp_type in _FLOAT_TYPES:
            if math.isinf(value):
                if value < 0.0:
                    return _NEG_INFINITY
                else:
                    return _INFINITY
            if math.isnan(value):
                return _NAN
        return value
