blob: d45a84e3d0d0b06fa80660d5421a5d2f65b44793 [file] [log] [blame]
Kent Hagerman2f0d0552020-04-23 17:28:52 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package core
18
19import (
20 "context"
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080021 "github.com/opencord/voltha-lib-go/v4/pkg/events"
Kent Hagerman2f0d0552020-04-23 17:28:52 -040022 "time"
23
24 "github.com/opencord/voltha-go/rw_core/core/adapter"
25 "github.com/opencord/voltha-go/rw_core/core/api"
26 "github.com/opencord/voltha-go/rw_core/core/device"
Maninderdfadc982020-10-28 14:04:33 +053027 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
28 "github.com/opencord/voltha-lib-go/v4/pkg/log"
29 "github.com/opencord/voltha-lib-go/v4/pkg/probe"
Kent Hagerman2f0d0552020-04-23 17:28:52 -040030)
31
32// startKafkInterContainerProxy is responsible for starting the Kafka Interadapter Proxy
David Bainbridge9ae13132020-06-22 17:28:01 -070033func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000034 logger.Infow(ctx, "initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040035
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080036 probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPreparing)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040037
38 // create the kafka RPC proxy
39 kmp := kafka.NewInterContainerProxy(
Neha Sharmad1387da2020-05-07 20:07:28 +000040 kafka.InterContainerAddress(address),
Kent Hagerman2f0d0552020-04-23 17:28:52 -040041 kafka.MsgClient(kafkaClient),
David Bainbridge9ae13132020-06-22 17:28:01 -070042 kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}))
Kent Hagerman2f0d0552020-04-23 17:28:52 -040043
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080044 probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPrepared)
Kent Hagerman2f0d0552020-04-23 17:28:52 -040045
46 // wait for connectivity
Rohan Agrawal31f21802020-06-12 05:38:46 +000047 logger.Infow(ctx, "starting-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040048
49 for {
50 // If we haven't started yet, then try to start
Rohan Agrawal31f21802020-06-12 05:38:46 +000051 logger.Infow(ctx, "starting-kafka-proxy", log.Fields{})
52 if err := kmp.Start(ctx); err != nil {
Kent Hagerman2f0d0552020-04-23 17:28:52 -040053 // We failed to start. Delay and then try again later.
54 // Don't worry about liveness, as we can't be live until we've started.
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080055 probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusNotReady)
56 logger.Warnw(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040057 select {
58 case <-time.After(connectionRetryInterval):
59 case <-ctx.Done():
60 return nil, ctx.Err()
61 }
62 continue
63 }
64 // We started. We only need to do this once.
65 // Next we'll fall through and start checking liveness.
Rohan Agrawal31f21802020-06-12 05:38:46 +000066 logger.Infow(ctx, "started-kafka-proxy", log.Fields{})
Kent Hagerman2f0d0552020-04-23 17:28:52 -040067 break
68 }
69 return kmp, nil
70}
71
Matteo Scandolob3ba79c2021-03-01 10:53:23 -080072func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration) (*events.EventProxy, error) {
73 ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
74 for {
75 if err := kafkaClient.Start(ctx); err != nil {
76 probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
77 logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
78 select {
79 case <-time.After(connectionRetryInterval):
80 continue
81 case <-ctx.Done():
82 return nil, ctx.Err()
83 }
84 }
85 logger.Info(ctx, "started-connection-on-kafka-cluster-address")
86 break
87 }
88 return ep, nil
89}
90
91// Interface that is valid for both EventProxy and InterContainerProxy
92type KafkaProxy interface {
93 EnableLivenessChannel(ctx context.Context, enable bool) chan bool
94 SendLiveness(ctx context.Context) error
95}
96
Kent Hagerman2f0d0552020-04-23 17:28:52 -040097/*
98 * monitorKafkaLiveness is responsible for monitoring the Kafka Interadapter Proxy connectivity state
99 *
100 * Any producer that fails to send will cause KafkaInterContainerProxy to
101 * post a false event on its liveness channel. Any producer that succeeds in sending
102 * will cause KafkaInterContainerProxy to post a true event on its liveness
103 * channel. Group receivers also update liveness state, and a receiver will typically
104 * indicate a loss of liveness within 3-5 seconds of Kafka going down. Receivers
105 * only indicate restoration of liveness if a message is received. During normal
106 * operation, messages will be routinely produced and received, automatically
107 * indicating liveness state. These routine liveness indications are rate-limited
108 * inside sarama_client.
109 *
110 * This thread monitors the status of KafkaInterContainerProxy's liveness and pushes
111 * that state to the core's readiness probes. If no liveness event has been seen
112 * within a timeout, then the thread will make an attempt to produce a "liveness"
113 * message, which will in turn trigger a liveness event on the liveness channel, true
114 * or false depending on whether the attempt succeeded.
115 *
116 * The gRPC server in turn monitors the state of the readiness probe and will
117 * start issuing UNAVAILABLE response while the probe is not ready.
118 *
119 * startupRetryInterval -- interval between attempts to start
120 * liveProbeInterval -- interval between liveness checks when in a live state
121 * notLiveProbeInterval -- interval between liveness checks when in a notLive state
122 *
123 * liveProbeInterval and notLiveProbeInterval can be configured separately,
124 * though the current default is that both are set to 60 seconds.
125 */
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800126func monitorKafkaLiveness(ctx context.Context, kmp KafkaProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration, serviceName string) {
127 logger.Infow(ctx, "started-kafka-message-proxy", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400128
Rohan Agrawal31f21802020-06-12 05:38:46 +0000129 livenessChannel := kmp.EnableLivenessChannel(ctx, true)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400130
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800131 logger.Infow(ctx, "enabled-kafka-liveness-channel", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400132
133 timeout := liveProbeInterval
134 for {
135 timeoutTimer := time.NewTimer(timeout)
136 select {
137 case liveness := <-livenessChannel:
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800138 logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness, "service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400139 // there was a state change in Kafka liveness
140 if !liveness {
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800141 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
142 logger.Infow(ctx, "kafka-manager-thread-set-server-notready", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400143
144 // retry frequently while life is bad
145 timeout = notLiveProbeInterval
146 } else {
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800147 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
148 logger.Infow(ctx, "kafka-manager-thread-set-server-ready", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400149
150 // retry infrequently while life is good
151 timeout = liveProbeInterval
152 }
153 if !timeoutTimer.Stop() {
154 <-timeoutTimer.C
155 }
156 case <-timeoutTimer.C:
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800157 logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400158 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
159 // the liveness probe may wait (and block) writing to our channel.
160 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000161 err := kmp.SendLiveness(ctx)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400162 if err != nil {
163 // Catch possible error case if sending liveness after Sarama has been stopped.
Matteo Scandolob3ba79c2021-03-01 10:53:23 -0800164 logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400165 }
166 }()
167 case <-ctx.Done():
168 return // just exit
169 }
170 }
171}
172
Rohan Agrawal31f21802020-06-12 05:38:46 +0000173func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic string) {
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400174 requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
175
176 // Register the broadcast topic to handle any core-bound broadcast requests
Rohan Agrawal31f21802020-06-12 05:38:46 +0000177 if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
178 logger.Fatalw(ctx, "Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400179 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000180
181 logger.Info(ctx, "request-handler-registered")
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400182}