blob: ae33090b442e988a055ae5e20d33a47e14210932 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package core
import (
"context"
"github.com/cenkalti/backoff/v3"
"github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-lib-go/v4/pkg/events"
"time"
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/api"
"github.com/opencord/voltha-go/rw_core/core/device"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-lib-go/v4/pkg/probe"
)
// startKafkInterContainerProxy is responsible for starting the Kafka Interadapter Proxy
func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
logger.Infow(ctx, "initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPreparing)
// create the kafka RPC proxy
kmp := kafka.NewInterContainerProxy(
kafka.InterContainerAddress(address),
kafka.MsgClient(kafkaClient),
kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}))
probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPrepared)
// wait for connectivity
logger.Infow(ctx, "starting-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
for {
// If we haven't started yet, then try to start
logger.Infow(ctx, "starting-kafka-proxy", log.Fields{})
if err := kmp.Start(ctx); err != nil {
// We failed to start. Delay and then try again later.
// Don't worry about liveness, as we can't be live until we've started.
probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusNotReady)
logger.Warnw(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
select {
case <-time.After(connectionRetryInterval):
case <-ctx.Done():
return nil, ctx.Err()
}
continue
}
// We started. We only need to do this once.
// Next we'll fall through and start checking liveness.
logger.Infow(ctx, "started-kafka-proxy", log.Fields{})
break
}
return kmp, nil
}
func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration, updateProbeService bool) (*events.EventProxy, error) {
ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
for {
if err := kafkaClient.Start(ctx); err != nil {
if updateProbeService {
probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
}
logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
select {
case <-time.After(connectionRetryInterval):
continue
case <-ctx.Done():
return nil, ctx.Err()
}
}
if updateProbeService {
probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusRunning)
}
logger.Info(ctx, "started-connection-on-kafka-cluster-address")
break
}
return ep, nil
}
// Interface that is valid for both EventProxy and InterContainerProxy
type KafkaProxy interface {
EnableLivenessChannel(ctx context.Context, enable bool) chan bool
SendLiveness(ctx context.Context) error
}
/*
* monitorKafkaLiveness is responsible for monitoring the Kafka Interadapter Proxy connectivity state
*
* Any producer that fails to send will cause KafkaInterContainerProxy to
* post a false event on its liveness channel. Any producer that succeeds in sending
* will cause KafkaInterContainerProxy to post a true event on its liveness
* channel. Group receivers also update liveness state, and a receiver will typically
* indicate a loss of liveness within 3-5 seconds of Kafka going down. Receivers
* only indicate restoration of liveness if a message is received. During normal
* operation, messages will be routinely produced and received, automatically
* indicating liveness state. These routine liveness indications are rate-limited
* inside sarama_client.
*
* This thread monitors the status of KafkaInterContainerProxy's liveness and pushes
* that state to the core's readiness probes. If no liveness event has been seen
* within a timeout, then the thread will make an attempt to produce a "liveness"
* message, which will in turn trigger a liveness event on the liveness channel, true
* or false depending on whether the attempt succeeded.
*
* The gRPC server in turn monitors the state of the readiness probe and will
* start issuing UNAVAILABLE response while the probe is not ready.
*
* startupRetryInterval -- interval between attempts to start
* liveProbeInterval -- interval between liveness checks when in a live state
* notLiveProbeInterval -- interval between liveness checks when in a notLive state
*
* liveProbeInterval and notLiveProbeInterval can be configured separately,
* though the current default is that both are set to 60 seconds.
*/
func monitorKafkaLiveness(ctx context.Context, kmp KafkaProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration, serviceName string) {
logger.Infow(ctx, "started-kafka-message-proxy", log.Fields{"service": serviceName})
livenessChannel := kmp.EnableLivenessChannel(ctx, true)
logger.Infow(ctx, "enabled-kafka-liveness-channel", log.Fields{"service": serviceName})
timeout := liveProbeInterval
for {
timeoutTimer := time.NewTimer(timeout)
select {
case liveness := <-livenessChannel:
logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness, "service": serviceName})
// there was a state change in Kafka liveness
if !liveness {
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
logger.Infow(ctx, "kafka-manager-thread-set-server-notready", log.Fields{"service": serviceName})
// retry frequently while life is bad
timeout = notLiveProbeInterval
} else {
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
logger.Infow(ctx, "kafka-manager-thread-set-server-ready", log.Fields{"service": serviceName})
// retry infrequently while life is good
timeout = liveProbeInterval
}
if !timeoutTimer.Stop() {
<-timeoutTimer.C
}
case <-timeoutTimer.C:
logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
// the liveness probe may wait (and block) writing to our channel.
go func() {
err := kmp.SendLiveness(ctx)
if err != nil {
// Catch possible error case if sending liveness after Sarama has been stopped.
logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
}
}()
case <-ctx.Done():
return // just exit
}
}
}
func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager,
aMgr *adapter.Manager, cf *config.RWCoreFlags, serviceName string) {
logger.Infow(ctx, "registering-request-handler", log.Fields{"topic": cf.CoreTopic})
// Set the exponential backoff params
kafkaRetryBackoff := backoff.NewExponentialBackOff()
kafkaRetryBackoff.InitialInterval = cf.BackoffRetryInitialInterval
kafkaRetryBackoff.MaxElapsedTime = cf.BackoffRetryMaxElapsedTime
kafkaRetryBackoff.MaxInterval = cf.BackoffRetryMaxInterval
//For initial request, do not wait
backoffTimer := time.NewTimer(0)
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
for {
select {
case <-backoffTimer.C:
// Register the broadcast topic to handle any core-bound broadcast requests
err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: cf.CoreTopic}, requestProxy)
if err == nil {
logger.Infow(ctx, "request-handler-registered", log.Fields{"topic": cf.CoreTopic})
probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
return
}
logger.Errorw(ctx, "failed-registering-broadcast-handler-retrying", log.Fields{"topic": cf.CoreTopic})
duration := kafkaRetryBackoff.NextBackOff()
//This case should never occur(by default) as max elapsed time for backoff is 0(by default) , so it will never return stop
if duration == backoff.Stop {
// If we reach a maximum then warn and reset the backoff timer and keep attempting.
logger.Warnw(ctx, "maximum-kafka-retry-backoff-reached-resetting",
log.Fields{"max-kafka-retry-backoff": kafkaRetryBackoff.MaxElapsedTime})
kafkaRetryBackoff.Reset()
duration = kafkaRetryBackoff.NextBackOff()
}
backoffTimer = time.NewTimer(duration)
case <-ctx.Done():
logger.Infow(ctx, "context-closed", log.Fields{"topic": cf.CoreTopic})
return
}
}
}