blob: 6ba8bed18943d10b649c59b34f78958818dfc328 [file] [log] [blame]
Kent Hagerman2f0d0552020-04-23 17:28:52 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package core
18
19import (
20 "context"
khenaidoo5e4fca32021-05-12 16:02:23 -040021 "github.com/cenkalti/backoff/v3"
22 "github.com/opencord/voltha-go/rw_core/config"
yasin sapli5458a1c2021-06-14 22:24:38 +000023 "github.com/opencord/voltha-lib-go/v5/pkg/events"
khenaidoo5e4fca32021-05-12 16:02:23 -040024
Kent Hagerman2f0d0552020-04-23 17:28:52 -040025 "time"
26
27 "github.com/opencord/voltha-go/rw_core/core/adapter"
28 "github.com/opencord/voltha-go/rw_core/core/api"
29 "github.com/opencord/voltha-go/rw_core/core/device"
yasin sapli5458a1c2021-06-14 22:24:38 +000030 "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
31 "github.com/opencord/voltha-lib-go/v5/pkg/log"
32 "github.com/opencord/voltha-lib-go/v5/pkg/probe"
Kent Hagerman2f0d0552020-04-23 17:28:52 -040033)
34
35// startKafkInterContainerProxy is responsible for starting the Kafka Interadapter Proxy
David Bainbridge9ae13132020-06-22 17:28:01 -070036func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000037 logger.Infow(ctx, "initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040038
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080039 probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPreparing)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040040
41 // create the kafka RPC proxy
42 kmp := kafka.NewInterContainerProxy(
Neha Sharmad1387da2020-05-07 20:07:28 +000043 kafka.InterContainerAddress(address),
Kent Hagerman2f0d0552020-04-23 17:28:52 -040044 kafka.MsgClient(kafkaClient),
David Bainbridge9ae13132020-06-22 17:28:01 -070045 kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}))
Kent Hagerman2f0d0552020-04-23 17:28:52 -040046
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080047 probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPrepared)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040048
49 // wait for connectivity
Rohan Agrawal31f21802020-06-12 05:38:46 +000050 logger.Infow(ctx, "starting-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040051
52 for {
53 // If we haven't started yet, then try to start
Rohan Agrawal31f21802020-06-12 05:38:46 +000054 logger.Infow(ctx, "starting-kafka-proxy", log.Fields{})
55 if err := kmp.Start(ctx); err != nil {
Kent Hagerman2f0d0552020-04-23 17:28:52 -040056 // We failed to start. Delay and then try again later.
57 // Don't worry about liveness, as we can't be live until we've started.
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080058 probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusNotReady)
59 logger.Warnw(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040060 select {
61 case <-time.After(connectionRetryInterval):
62 case <-ctx.Done():
63 return nil, ctx.Err()
64 }
65 continue
66 }
67 // We started. We only need to do this once.
68 // Next we'll fall through and start checking liveness.
Rohan Agrawal31f21802020-06-12 05:38:46 +000069 logger.Infow(ctx, "started-kafka-proxy", log.Fields{})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040070 break
71 }
72 return kmp, nil
73}
74
khenaidoo5e4fca32021-05-12 16:02:23 -040075func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration, updateProbeService bool) (*events.EventProxy, error) {
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080076 ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
77 for {
78 if err := kafkaClient.Start(ctx); err != nil {
khenaidoo5e4fca32021-05-12 16:02:23 -040079 if updateProbeService {
80 probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
81 }
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080082 logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
83 select {
84 case <-time.After(connectionRetryInterval):
85 continue
86 case <-ctx.Done():
87 return nil, ctx.Err()
88 }
89 }
Himani Chawla606a4f02021-03-23 19:45:58 +053090 go ep.Start()
khenaidoo5e4fca32021-05-12 16:02:23 -040091 if updateProbeService {
92 probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusRunning)
93 }
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080094 logger.Info(ctx, "started-connection-on-kafka-cluster-address")
95 break
96 }
97 return ep, nil
98}
99
Himani Chawla606a4f02021-03-23 19:45:58 +0530100func stopEventProxy(ctx context.Context, kafkaClient kafka.Client, ep *events.EventProxy) {
101 defer kafkaClient.Stop(ctx)
102 ep.Stop()
103}
104
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800105// Interface that is valid for both EventProxy and InterContainerProxy
106type KafkaProxy interface {
107 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
108 SendLiveness(ctx context.Context) error
109}
110
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400111/*
112 * monitorKafkaLiveness is responsible for monitoring the Kafka Interadapter Proxy connectivity state
113 *
114 * Any producer that fails to send will cause KafkaInterContainerProxy to
115 * post a false event on its liveness channel. Any producer that succeeds in sending
116 * will cause KafkaInterContainerProxy to post a true event on its liveness
117 * channel. Group receivers also update liveness state, and a receiver will typically
118 * indicate a loss of liveness within 3-5 seconds of Kafka going down. Receivers
119 * only indicate restoration of liveness if a message is received. During normal
120 * operation, messages will be routinely produced and received, automatically
121 * indicating liveness state. These routine liveness indications are rate-limited
122 * inside sarama_client.
123 *
124 * This thread monitors the status of KafkaInterContainerProxy's liveness and pushes
125 * that state to the core's readiness probes. If no liveness event has been seen
126 * within a timeout, then the thread will make an attempt to produce a "liveness"
127 * message, which will in turn trigger a liveness event on the liveness channel, true
128 * or false depending on whether the attempt succeeded.
129 *
130 * The gRPC server in turn monitors the state of the readiness probe and will
131 * start issuing UNAVAILABLE response while the probe is not ready.
132 *
133 * startupRetryInterval -- interval between attempts to start
134 * liveProbeInterval -- interval between liveness checks when in a live state
135 * notLiveProbeInterval -- interval between liveness checks when in a notLive state
136 *
137 * liveProbeInterval and notLiveProbeInterval can be configured separately,
138 * though the current default is that both are set to 60 seconds.
139 */
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800140func monitorKafkaLiveness(ctx context.Context, kmp KafkaProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration, serviceName string) {
141 logger.Infow(ctx, "started-kafka-message-proxy", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400142
Rohan Agrawal31f21802020-06-12 05:38:46 +0000143 livenessChannel := kmp.EnableLivenessChannel(ctx, true)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400144
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800145 logger.Infow(ctx, "enabled-kafka-liveness-channel", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400146
147 timeout := liveProbeInterval
148 for {
149 timeoutTimer := time.NewTimer(timeout)
150 select {
151 case liveness := <-livenessChannel:
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800152 logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness, "service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400153 // there was a state change in Kafka liveness
154 if !liveness {
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800155 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
156 logger.Infow(ctx, "kafka-manager-thread-set-server-notready", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400157
158 // retry frequently while life is bad
159 timeout = notLiveProbeInterval
160 } else {
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800161 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
162 logger.Infow(ctx, "kafka-manager-thread-set-server-ready", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400163
164 // retry infrequently while life is good
165 timeout = liveProbeInterval
166 }
167 if !timeoutTimer.Stop() {
168 <-timeoutTimer.C
169 }
170 case <-timeoutTimer.C:
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800171 logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400172 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
173 // the liveness probe may wait (and block) writing to our channel.
174 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000175 err := kmp.SendLiveness(ctx)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400176 if err != nil {
177 // Catch possible error case if sending liveness after Sarama has been stopped.
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800178 logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400179 }
180 }()
181 case <-ctx.Done():
182 return // just exit
183 }
184 }
185}
186
khenaidoo5e4fca32021-05-12 16:02:23 -0400187func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager,
188 aMgr *adapter.Manager, cf *config.RWCoreFlags, serviceName string) {
189 logger.Infow(ctx, "registering-request-handler", log.Fields{"topic": cf.CoreTopic})
190
191 // Set the exponential backoff params
192 kafkaRetryBackoff := backoff.NewExponentialBackOff()
193 kafkaRetryBackoff.InitialInterval = cf.BackoffRetryInitialInterval
194 kafkaRetryBackoff.MaxElapsedTime = cf.BackoffRetryMaxElapsedTime
195 kafkaRetryBackoff.MaxInterval = cf.BackoffRetryMaxInterval
196
197 //For initial request, do not wait
198 backoffTimer := time.NewTimer(0)
199
200 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400201 requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
khenaidoo5e4fca32021-05-12 16:02:23 -0400202 for {
203 select {
204 case <-backoffTimer.C:
205 // Register the broadcast topic to handle any core-bound broadcast requests
206 err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: cf.CoreTopic}, requestProxy)
207 if err == nil {
208 logger.Infow(ctx, "request-handler-registered", log.Fields{"topic": cf.CoreTopic})
209 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
210 return
211 }
212 logger.Errorw(ctx, "failed-registering-broadcast-handler-retrying", log.Fields{"topic": cf.CoreTopic})
213 duration := kafkaRetryBackoff.NextBackOff()
214 //This case should never occur(by default) as max elapsed time for backoff is 0(by default) , so it will never return stop
215 if duration == backoff.Stop {
216 // If we reach a maximum then warn and reset the backoff timer and keep attempting.
217 logger.Warnw(ctx, "maximum-kafka-retry-backoff-reached-resetting",
218 log.Fields{"max-kafka-retry-backoff": kafkaRetryBackoff.MaxElapsedTime})
219 kafkaRetryBackoff.Reset()
220 duration = kafkaRetryBackoff.NextBackOff()
221 }
222 backoffTimer = time.NewTimer(duration)
223 case <-ctx.Done():
224 logger.Infow(ctx, "context-closed", log.Fields{"topic": cf.CoreTopic})
225 return
226 }
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400227 }
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400228}