blob: fba621f91ccc0d6c4636185c3267fde6e9991308 [file] [log] [blame]
* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka
import (
const (
TopicSeparator = "_"
DeviceIdLength = 24
// A Topic definition - may be augmented with additional attributes eventually
type Topic struct {
// The name of the topic. It must start with a letter,
// and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
// underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
// signs (`%`).
Name string
type KVArg struct {
Key string
Value interface{}
type RpcMType int
const (
RpcFormattingError RpcMType = iota
type RpcResponse struct {
MType RpcMType
Err error
Reply *any.Any
func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
return &RpcResponse{
MType: messageType,
Err: err,
Reply: body,
// TODO: Remove and provide better may to get the device id
// GetDeviceIdFromTopic extract the deviceId from the topic name. The topic name is formatted either as:
// <any string> or <any string>_<deviceId>. The device Id is 24 characters long.
func GetDeviceIdFromTopic(topic Topic) string {
pos := strings.LastIndex(topic.Name, TopicSeparator)
if pos == -1 {
return ""
adjustedPos := pos + len(TopicSeparator)
if adjustedPos >= len(topic.Name) {
return ""
deviceId := topic.Name[adjustedPos:len(topic.Name)]
if len(deviceId) != DeviceIdLength {
return ""
return deviceId
// WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the
// context times out.
func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error {
if kClient == nil {
return errors.New("kafka-client-is-nil")
for {
if err := kClient.Start(ctx); err != nil {
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
logger.Warnw(ctx, "kafka-connection-down", log.Fields{"error": err})
select {
case <-time.After(connectionRetryInterval):
case <-ctx.Done():
return ctx.Err()
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
logger.Info(ctx, "kafka-connection-up")
return nil
MonitorKafkaReadiness checks the liveliness and readiness of the kafka service
and update the status in the probe.
func MonitorKafkaReadiness(ctx context.Context,
kClient Client,
liveProbeInterval, notLiveProbeInterval time.Duration,
serviceName string) {
if kClient == nil {
logger.Fatal(ctx, "kafka-client-is-nil")
logger.Infow(ctx, "monitor-kafka-readiness", log.Fields{"service": serviceName})
livelinessChannel := kClient.EnableLivenessChannel(ctx, true)
healthinessChannel := kClient.EnableHealthinessChannel(ctx, true)
timeout := liveProbeInterval
failed := false
for {
timeoutTimer := time.NewTimer(timeout)
select {
case healthiness := <-healthinessChannel:
if !healthiness {
// This will eventually cause K8s to restart the container, and will do
// so in a way that allows cleanup to continue, rather than an immediate
// panic and exit here.
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusFailed)
logger.Infow(ctx, "kafka-not-healthy", log.Fields{"service": serviceName})
failed = true
// Check if the timer has expired or not
if !timeoutTimer.Stop() {
case liveliness := <-livelinessChannel:
if failed {
// Failures of the message bus are permanent and can't ever be recovered from,
// so make sure we never inadvertently reset a failed state back to unready.
} else if !liveliness {
// kafka not reachable or down, updating the status to not ready state
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
logger.Infow(ctx, "kafka-not-live", log.Fields{"service": serviceName})
timeout = notLiveProbeInterval
} else {
// kafka is reachable , updating the status to running state
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
timeout = liveProbeInterval
// Check if the timer has expired or not
if !timeoutTimer.Stop() {
case <-timeoutTimer.C:
logger.Debugf(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
// the liveness probe may wait (and block) writing to our channel.
go func() {
err := kClient.SendLiveness(ctx)
if err != nil {
// Catch possible error case if sending liveness after Sarama has been stopped.
logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
case <-ctx.Done():
return // just exit