blob: f8a956d2cf16f2bff16113ebae1a7977cb6b946d [file] [log] [blame]
Kent Hagerman2f0d0552020-04-23 17:28:52 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package core
18
19import (
20 "context"
21 "time"
22
23 "github.com/opencord/voltha-go/rw_core/core/adapter"
24 "github.com/opencord/voltha-go/rw_core/core/api"
25 "github.com/opencord/voltha-go/rw_core/core/device"
Maninderdfadc982020-10-28 14:04:33 +053026 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
27 "github.com/opencord/voltha-lib-go/v4/pkg/log"
28 "github.com/opencord/voltha-lib-go/v4/pkg/probe"
Kent Hagerman2f0d0552020-04-23 17:28:52 -040029)
30
31// startKafkInterContainerProxy is responsible for starting the Kafka Interadapter Proxy
David Bainbridge9ae13132020-06-22 17:28:01 -070032func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000033 logger.Infow(ctx, "initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040034
35 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
36
37 // create the kafka RPC proxy
38 kmp := kafka.NewInterContainerProxy(
Neha Sharmad1387da2020-05-07 20:07:28 +000039 kafka.InterContainerAddress(address),
Kent Hagerman2f0d0552020-04-23 17:28:52 -040040 kafka.MsgClient(kafkaClient),
David Bainbridge9ae13132020-06-22 17:28:01 -070041 kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}))
Kent Hagerman2f0d0552020-04-23 17:28:52 -040042
43 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
44
45 // wait for connectivity
Rohan Agrawal31f21802020-06-12 05:38:46 +000046 logger.Infow(ctx, "starting-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040047
48 for {
49 // If we haven't started yet, then try to start
Rohan Agrawal31f21802020-06-12 05:38:46 +000050 logger.Infow(ctx, "starting-kafka-proxy", log.Fields{})
51 if err := kmp.Start(ctx); err != nil {
Kent Hagerman2f0d0552020-04-23 17:28:52 -040052 // We failed to start. Delay and then try again later.
53 // Don't worry about liveness, as we can't be live until we've started.
54 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
Rohan Agrawal31f21802020-06-12 05:38:46 +000055 logger.Infow(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040056 select {
57 case <-time.After(connectionRetryInterval):
58 case <-ctx.Done():
59 return nil, ctx.Err()
60 }
61 continue
62 }
63 // We started. We only need to do this once.
64 // Next we'll fall through and start checking liveness.
Rohan Agrawal31f21802020-06-12 05:38:46 +000065 logger.Infow(ctx, "started-kafka-proxy", log.Fields{})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040066 break
67 }
68 return kmp, nil
69}
70
71/*
72 * monitorKafkaLiveness is responsible for monitoring the Kafka Interadapter Proxy connectivity state
73 *
74 * Any producer that fails to send will cause KafkaInterContainerProxy to
75 * post a false event on its liveness channel. Any producer that succeeds in sending
76 * will cause KafkaInterContainerProxy to post a true event on its liveness
77 * channel. Group receivers also update liveness state, and a receiver will typically
78 * indicate a loss of liveness within 3-5 seconds of Kafka going down. Receivers
79 * only indicate restoration of liveness if a message is received. During normal
80 * operation, messages will be routinely produced and received, automatically
81 * indicating liveness state. These routine liveness indications are rate-limited
82 * inside sarama_client.
83 *
84 * This thread monitors the status of KafkaInterContainerProxy's liveness and pushes
85 * that state to the core's readiness probes. If no liveness event has been seen
86 * within a timeout, then the thread will make an attempt to produce a "liveness"
87 * message, which will in turn trigger a liveness event on the liveness channel, true
88 * or false depending on whether the attempt succeeded.
89 *
90 * The gRPC server in turn monitors the state of the readiness probe and will
91 * start issuing UNAVAILABLE response while the probe is not ready.
92 *
93 * startupRetryInterval -- interval between attempts to start
94 * liveProbeInterval -- interval between liveness checks when in a live state
95 * notLiveProbeInterval -- interval between liveness checks when in a notLive state
96 *
97 * liveProbeInterval and notLiveProbeInterval can be configured separately,
98 * though the current default is that both are set to 60 seconds.
99 */
100func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000101 logger.Info(ctx, "started-kafka-message-proxy")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400102
Rohan Agrawal31f21802020-06-12 05:38:46 +0000103 livenessChannel := kmp.EnableLivenessChannel(ctx, true)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400104
Rohan Agrawal31f21802020-06-12 05:38:46 +0000105 logger.Info(ctx, "enabled-kafka-liveness-channel")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400106
107 timeout := liveProbeInterval
108 for {
109 timeoutTimer := time.NewTimer(timeout)
110 select {
111 case liveness := <-livenessChannel:
Rohan Agrawal31f21802020-06-12 05:38:46 +0000112 logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400113 // there was a state change in Kafka liveness
114 if !liveness {
115 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000116 logger.Info(ctx, "kafka-manager-thread-set-server-notready")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400117
118 // retry frequently while life is bad
119 timeout = notLiveProbeInterval
120 } else {
121 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000122 logger.Info(ctx, "kafka-manager-thread-set-server-ready")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400123
124 // retry infrequently while life is good
125 timeout = liveProbeInterval
126 }
127 if !timeoutTimer.Stop() {
128 <-timeoutTimer.C
129 }
130 case <-timeoutTimer.C:
Rohan Agrawal31f21802020-06-12 05:38:46 +0000131 logger.Info(ctx, "kafka-proxy-liveness-recheck")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400132 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
133 // the liveness probe may wait (and block) writing to our channel.
134 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000135 err := kmp.SendLiveness(ctx)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400136 if err != nil {
137 // Catch possible error case if sending liveness after Sarama has been stopped.
Rohan Agrawal31f21802020-06-12 05:38:46 +0000138 logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400139 }
140 }()
141 case <-ctx.Done():
142 return // just exit
143 }
144 }
145}
146
Rohan Agrawal31f21802020-06-12 05:38:46 +0000147func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic string) {
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400148 requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
149
150 // Register the broadcast topic to handle any core-bound broadcast requests
Rohan Agrawal31f21802020-06-12 05:38:46 +0000151 if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
152 logger.Fatalw(ctx, "Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400153 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000154
155 logger.Info(ctx, "request-handler-registered")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400156}