[VOL-4290] Voltha go library updates for gRPC migration
Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/pkg/kafka/utils.go b/pkg/kafka/utils.go
index bdc615f..608361b 100644
--- a/pkg/kafka/utils.go
+++ b/pkg/kafka/utils.go
@@ -16,8 +16,14 @@
package kafka
import (
- "github.com/golang/protobuf/ptypes/any"
+ "context"
+ "errors"
"strings"
+ "time"
+
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
)
const (
@@ -82,3 +88,98 @@
}
return deviceId
}
+
+// WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the
+// context times out.
+func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error {
+ if kClient == nil {
+ return errors.New("kafka-client-is-nil")
+ }
+ for {
+ if err := kClient.Start(ctx); err != nil {
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+ logger.Warnw(ctx, "kafka-connection-down", log.Fields{"error": err})
+ select {
+ case <-time.After(connectionRetryInterval):
+ continue
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ logger.Info(ctx, "kafka-connection-up")
+ break
+ }
+ return nil
+}
+
+/**
+MonitorKafkaReadiness checks the liveliness and readiness of the kafka service
+and update the status in the probe.
+*/
+func MonitorKafkaReadiness(ctx context.Context,
+ kClient Client,
+ liveProbeInterval, notLiveProbeInterval time.Duration,
+ serviceName string) {
+
+ if kClient == nil {
+ logger.Fatal(ctx, "kafka-client-is-nil")
+ }
+
+ logger.Infow(ctx, "monitor-kafka-readiness", log.Fields{"service": serviceName})
+
+ livelinessChannel := kClient.EnableLivenessChannel(ctx, true)
+ healthinessChannel := kClient.EnableHealthinessChannel(ctx, true)
+ timeout := liveProbeInterval
+ failed := false
+ for {
+ timeoutTimer := time.NewTimer(timeout)
+
+ select {
+ case healthiness := <-healthinessChannel:
+ if !healthiness {
+ // This will eventually cause K8s to restart the container, and will do
+ // so in a way that allows cleanup to continue, rather than an immediate
+ // panic and exit here.
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusFailed)
+ logger.Infow(ctx, "kafka-not-healthy", log.Fields{"service": serviceName})
+ failed = true
+ }
+ // Check if the timer has expired or not
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+ case liveliness := <-livelinessChannel:
+ if failed {
+ // Failures of the message bus are permanent and can't ever be recovered from,
+ // so make sure we never inadvertently reset a failed state back to unready.
+ } else if !liveliness {
+ // kafka not reachable or down, updating the status to not ready state
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+ logger.Infow(ctx, "kafka-not-live", log.Fields{"service": serviceName})
+ timeout = notLiveProbeInterval
+ } else {
+ // kafka is reachable , updating the status to running state
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ timeout = liveProbeInterval
+ }
+ // Check if the timer has expired or not
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+ case <-timeoutTimer.C:
+ logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
+ // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+ // the liveness probe may wait (and block) writing to our channel.
+ go func() {
+ err := kClient.SendLiveness(ctx)
+ if err != nil {
+ // Catch possible error case if sending liveness after Sarama has been stopped.
+ logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
+ }
+ }()
+ case <-ctx.Done():
+ return // just exit
+ }
+ }
+}