package kafka

/**
 * Copyright 2016 Confluent Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import (
	"fmt"
	"reflect"
	"strings"
	"unsafe"
)

/*
#include <stdlib.h>
#include <librdkafka/rdkafka.h>
*/
import "C"

// ConfigValue supports the following types:
//  bool, int, string, any type with the standard String() interface
type ConfigValue interface{}

// ConfigMap is a map contaning standard librdkafka configuration properties as documented in:
// https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md
//
// The special property "default.topic.config" (optional) is a ConfigMap containing default topic
// configuration properties.
type ConfigMap map[string]ConfigValue

// SetKey sets configuration property key to value.
// For user convenience a key prefixed with {topic}. will be
// set on the "default.topic.config" sub-map.
func (m ConfigMap) SetKey(key string, value ConfigValue) error {
	if strings.HasPrefix(key, "{topic}.") {
		_, found := m["default.topic.config"]
		if !found {
			m["default.topic.config"] = ConfigMap{}
		}
		m["default.topic.config"].(ConfigMap)[strings.TrimPrefix(key, "{topic}.")] = value
	} else {
		m[key] = value
	}

	return nil
}

// Set implements flag.Set (command line argument parser) as a convenience
// for `-X key=value` config.
func (m ConfigMap) Set(kv string) error {
	i := strings.Index(kv, "=")
	if i == -1 {
		return Error{ErrInvalidArg, "Expected key=value"}
	}

	k := kv[:i]
	v := kv[i+1:]

	return m.SetKey(k, v)
}

func value2string(v ConfigValue) (ret string, errstr string) {

	switch x := v.(type) {
	case bool:
		if x {
			ret = "true"
		} else {
			ret = "false"
		}
	case int:
		ret = fmt.Sprintf("%d", x)
	case string:
		ret = x
	case fmt.Stringer:
		ret = x.String()
	default:
		return "", fmt.Sprintf("Invalid value type %T", v)
	}

	return ret, ""
}

// rdkAnyconf abstracts rd_kafka_conf_t and rd_kafka_topic_conf_t
// into a common interface.
type rdkAnyconf interface {
	set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t
}

func anyconfSet(anyconf rdkAnyconf, key string, val ConfigValue) (err error) {
	value, errstr := value2string(val)
	if errstr != "" {
		return Error{ErrInvalidArg, fmt.Sprintf("%s for key %s (expected string,bool,int,ConfigMap)", errstr, key)}
	}
	cKey := C.CString(key)
	cVal := C.CString(value)
	cErrstr := (*C.char)(C.malloc(C.size_t(128)))
	defer C.free(unsafe.Pointer(cErrstr))

	if anyconf.set(cKey, cVal, cErrstr, 128) != C.RD_KAFKA_CONF_OK {
		C.free(unsafe.Pointer(cKey))
		C.free(unsafe.Pointer(cVal))
		return newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
	}

	return nil
}

// we need these typedefs to workaround a crash in golint
// when parsing the set() methods below
type rdkConf C.rd_kafka_conf_t
type rdkTopicConf C.rd_kafka_topic_conf_t

func (cConf *rdkConf) set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t {
	return C.rd_kafka_conf_set((*C.rd_kafka_conf_t)(cConf), cKey, cVal, cErrstr, C.size_t(errstrSize))
}

func (ctopicConf *rdkTopicConf) set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t {
	return C.rd_kafka_topic_conf_set((*C.rd_kafka_topic_conf_t)(ctopicConf), cKey, cVal, cErrstr, C.size_t(errstrSize))
}

func configConvertAnyconf(m ConfigMap, anyconf rdkAnyconf) (err error) {
	// set plugins first, any plugin-specific configuration depends on
	// the plugin to have already been set
	pluginPaths, ok := m["plugin.library.paths"]
	if ok {
		err = anyconfSet(anyconf, "plugin.library.paths", pluginPaths)
		if err != nil {
			return err
		}
	}
	for k, v := range m {
		if k == "plugin.library.paths" {
			continue
		}
		switch v.(type) {
		case ConfigMap:
			/* Special sub-ConfigMap, only used for default.topic.config */

			if k != "default.topic.config" {
				return Error{ErrInvalidArg, fmt.Sprintf("Invalid type for key %s", k)}
			}

			var cTopicConf = C.rd_kafka_topic_conf_new()

			err = configConvertAnyconf(v.(ConfigMap),
				(*rdkTopicConf)(cTopicConf))
			if err != nil {
				C.rd_kafka_topic_conf_destroy(cTopicConf)
				return err
			}

			C.rd_kafka_conf_set_default_topic_conf(
				(*C.rd_kafka_conf_t)(anyconf.(*rdkConf)),
				(*C.rd_kafka_topic_conf_t)((*rdkTopicConf)(cTopicConf)))

		default:
			err = anyconfSet(anyconf, k, v)
			if err != nil {
				return err
			}
		}
	}

	return nil
}

// convert ConfigMap to C rd_kafka_conf_t *
func (m ConfigMap) convert() (cConf *C.rd_kafka_conf_t, err error) {
	cConf = C.rd_kafka_conf_new()

	err = configConvertAnyconf(m, (*rdkConf)(cConf))
	if err != nil {
		C.rd_kafka_conf_destroy(cConf)
		return nil, err
	}
	return cConf, nil
}

// get finds key in the configmap and returns its value.
// If the key is not found defval is returned.
// If the key is found but the type is mismatched an error is returned.
func (m ConfigMap) get(key string, defval ConfigValue) (ConfigValue, error) {
	if strings.HasPrefix(key, "{topic}.") {
		defconfCv, found := m["default.topic.config"]
		if !found {
			return defval, nil
		}
		return defconfCv.(ConfigMap).get(strings.TrimPrefix(key, "{topic}."), defval)
	}

	v, ok := m[key]
	if !ok {
		return defval, nil
	}

	if defval != nil && reflect.TypeOf(defval) != reflect.TypeOf(v) {
		return nil, Error{ErrInvalidArg, fmt.Sprintf("%s expects type %T, not %T", key, defval, v)}
	}

	return v, nil
}

// extract performs a get() and if found deletes the key.
func (m ConfigMap) extract(key string, defval ConfigValue) (ConfigValue, error) {

	v, err := m.get(key, defval)
	if err != nil {
		return nil, err
	}

	delete(m, key)

	return v, nil
}

func (m ConfigMap) clone() ConfigMap {
	m2 := make(ConfigMap)
	for k, v := range m {
		m2[k] = v
	}
	return m2
}

// Get finds the given key in the ConfigMap and returns its value.
// If the key is not found `defval` is returned.
// If the key is found but the type does not match that of `defval` (unless nil)
// an ErrInvalidArg error is returned.
func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error) {
	return m.get(key, defval)
}
