/*
* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors

* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at

* http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
 */
package kafka

import (
	"context"
	"errors"
	"strings"
	"time"

	"github.com/golang/protobuf/ptypes/any"
	"github.com/opencord/voltha-lib-go/v7/pkg/log"
	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
)

const (
	TopicSeparator = "_"
	DeviceIdLength = 24
)

// A Topic definition - may be augmented with additional attributes eventually
type Topic struct {
	// The name of the topic. It must start with a letter,
	// and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
	// underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
	// signs (`%`).
	Name string
}

type KVArg struct {
	Key   string
	Value interface{}
}

type RpcMType int

const (
	RpcFormattingError RpcMType = iota
	RpcSent
	RpcReply
	RpcTimeout
	RpcTransportError
	RpcSystemClosing
)

type RpcResponse struct {
	MType RpcMType
	Err   error
	Reply *any.Any
}

func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
	return &RpcResponse{
		MType: messageType,
		Err:   err,
		Reply: body,
	}
}

// TODO:  Remove and provide better may to get the device id
// GetDeviceIdFromTopic extract the deviceId from the topic name.  The topic name is formatted either as:
//
//	<any string> or <any string>_<deviceId>.  The device Id is 24 characters long.
func GetDeviceIdFromTopic(topic Topic) string {
	pos := strings.LastIndex(topic.Name, TopicSeparator)
	if pos == -1 {
		return ""
	}
	adjustedPos := pos + len(TopicSeparator)
	if adjustedPos >= len(topic.Name) {
		return ""
	}
	deviceId := topic.Name[adjustedPos:len(topic.Name)]
	if len(deviceId) != DeviceIdLength {
		return ""
	}
	return deviceId
}

// WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the
// context times out.
func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error {
	if kClient == nil {
		return errors.New("kafka-client-is-nil")
	}
	for {
		if err := kClient.Start(ctx); err != nil {
			probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
			logger.Warnw(ctx, "kafka-connection-down", log.Fields{"error": err})
			select {
			case <-time.After(connectionRetryInterval):
				continue
			case <-ctx.Done():
				return ctx.Err()
			}
		}
		probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
		logger.Info(ctx, "kafka-connection-up")
		break
	}
	return nil
}

/*
*
MonitorKafkaReadiness checks the liveliness and readiness of the kafka service
and update the status in the probe.
*/
func MonitorKafkaReadiness(ctx context.Context,
	kClient Client,
	liveProbeInterval, notLiveProbeInterval time.Duration,
	serviceName string) {

	if kClient == nil {
		logger.Fatal(ctx, "kafka-client-is-nil")
	}

	logger.Infow(ctx, "monitor-kafka-readiness", log.Fields{"service": serviceName})

	livelinessChannel := kClient.EnableLivenessChannel(ctx, true)
	healthinessChannel := kClient.EnableHealthinessChannel(ctx, true)
	timeout := liveProbeInterval
	failed := false
	for {
		timeoutTimer := time.NewTimer(timeout)

		select {
		case healthiness := <-healthinessChannel:
			if !healthiness {
				// This will eventually cause K8s to restart the container, and will do
				// so in a way that allows cleanup to continue, rather than an immediate
				// panic and exit here.
				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusFailed)
				logger.Infow(ctx, "kafka-not-healthy", log.Fields{"service": serviceName})
				failed = true
			}
			// Check if the timer has expired or not
			if !timeoutTimer.Stop() {
				<-timeoutTimer.C
			}
		case liveliness := <-livelinessChannel:
			if failed {
				// Failures of the message bus are permanent and can't ever be recovered from,
				// so make sure we never inadvertently reset a failed state back to unready.
			} else if !liveliness {
				// kafka not reachable or down, updating the status to not ready state
				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
				logger.Infow(ctx, "kafka-not-live", log.Fields{"service": serviceName})
				timeout = notLiveProbeInterval
			} else {
				// kafka is reachable , updating the status to running state
				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
				timeout = liveProbeInterval
			}
			// Check if the timer has expired or not
			if !timeoutTimer.Stop() {
				<-timeoutTimer.C
			}
		case <-timeoutTimer.C:
			logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
			// the liveness probe may wait (and block) writing to our channel.
			go func() {
				err := kClient.SendLiveness(ctx)
				if err != nil {
					// Catch possible error case if sending liveness after Sarama has been stopped.
					logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
				}
			}()
		case <-ctx.Done():
			return // just exit
		}
	}
}
