blob: d50274b9ea45e22d974b9f6812b0f3255d39af78 [file] [log] [blame]
Kent Hagerman2f0d0552020-04-23 17:28:52 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package core
18
19import (
20 "context"
21 "time"
22
23 "github.com/opencord/voltha-go/rw_core/core/adapter"
24 "github.com/opencord/voltha-go/rw_core/core/api"
25 "github.com/opencord/voltha-go/rw_core/core/device"
26 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
27 "github.com/opencord/voltha-lib-go/v3/pkg/log"
28 "github.com/opencord/voltha-lib-go/v3/pkg/probe"
29)
30
31// startKafkInterContainerProxy is responsible for starting the Kafka Interadapter Proxy
32func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, host string, port int, coreTopic, affinityRouterTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
33 logger.Infow("initialize-kafka-manager", log.Fields{"host": host, "port": port, "topic": coreTopic})
34
35 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
36
37 // create the kafka RPC proxy
38 kmp := kafka.NewInterContainerProxy(
Neha Sharmabe485932020-05-25 21:52:55 +000039 ctx,
40 kafka.InterContainerHost(ctx, host),
41 kafka.InterContainerPort(ctx, port),
42 kafka.MsgClient(ctx, kafkaClient),
43 kafka.DefaultTopic(ctx, &kafka.Topic{Name: coreTopic}),
44 kafka.DeviceDiscoveryTopic(ctx, &kafka.Topic{Name: affinityRouterTopic}))
Kent Hagerman2f0d0552020-04-23 17:28:52 -040045
46 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
47
48 // wait for connectivity
49 logger.Infow("starting-kafka-manager", log.Fields{"host": host,
50 "port": port, "topic": coreTopic})
51
52 for {
53 // If we haven't started yet, then try to start
54 logger.Infow("starting-kafka-proxy", log.Fields{})
Neha Sharmabe485932020-05-25 21:52:55 +000055 if err := kmp.Start(ctx); err != nil {
Kent Hagerman2f0d0552020-04-23 17:28:52 -040056 // We failed to start. Delay and then try again later.
57 // Don't worry about liveness, as we can't be live until we've started.
58 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
59 logger.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
60 select {
61 case <-time.After(connectionRetryInterval):
62 case <-ctx.Done():
63 return nil, ctx.Err()
64 }
65 continue
66 }
67 // We started. We only need to do this once.
68 // Next we'll fall through and start checking liveness.
69 logger.Infow("started-kafka-proxy", log.Fields{})
70 break
71 }
72 return kmp, nil
73}
74
75/*
76 * monitorKafkaLiveness is responsible for monitoring the Kafka Interadapter Proxy connectivity state
77 *
78 * Any producer that fails to send will cause KafkaInterContainerProxy to
79 * post a false event on its liveness channel. Any producer that succeeds in sending
80 * will cause KafkaInterContainerProxy to post a true event on its liveness
81 * channel. Group receivers also update liveness state, and a receiver will typically
82 * indicate a loss of liveness within 3-5 seconds of Kafka going down. Receivers
83 * only indicate restoration of liveness if a message is received. During normal
84 * operation, messages will be routinely produced and received, automatically
85 * indicating liveness state. These routine liveness indications are rate-limited
86 * inside sarama_client.
87 *
88 * This thread monitors the status of KafkaInterContainerProxy's liveness and pushes
89 * that state to the core's readiness probes. If no liveness event has been seen
90 * within a timeout, then the thread will make an attempt to produce a "liveness"
91 * message, which will in turn trigger a liveness event on the liveness channel, true
92 * or false depending on whether the attempt succeeded.
93 *
94 * The gRPC server in turn monitors the state of the readiness probe and will
95 * start issuing UNAVAILABLE response while the probe is not ready.
96 *
97 * startupRetryInterval -- interval between attempts to start
98 * liveProbeInterval -- interval between liveness checks when in a live state
99 * notLiveProbeInterval -- interval between liveness checks when in a notLive state
100 *
101 * liveProbeInterval and notLiveProbeInterval can be configured separately,
102 * though the current default is that both are set to 60 seconds.
103 */
104func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
105 logger.Info("started-kafka-message-proxy")
106
Neha Sharmabe485932020-05-25 21:52:55 +0000107 livenessChannel := kmp.EnableLivenessChannel(ctx, true)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400108
109 logger.Info("enabled-kafka-liveness-channel")
110
111 timeout := liveProbeInterval
112 for {
113 timeoutTimer := time.NewTimer(timeout)
114 select {
115 case liveness := <-livenessChannel:
116 logger.Infow("kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
117 // there was a state change in Kafka liveness
118 if !liveness {
119 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
120 logger.Info("kafka-manager-thread-set-server-notready")
121
122 // retry frequently while life is bad
123 timeout = notLiveProbeInterval
124 } else {
125 probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
126 logger.Info("kafka-manager-thread-set-server-ready")
127
128 // retry infrequently while life is good
129 timeout = liveProbeInterval
130 }
131 if !timeoutTimer.Stop() {
132 <-timeoutTimer.C
133 }
134 case <-timeoutTimer.C:
135 logger.Info("kafka-proxy-liveness-recheck")
136 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
137 // the liveness probe may wait (and block) writing to our channel.
138 go func() {
Neha Sharmabe485932020-05-25 21:52:55 +0000139 err := kmp.SendLiveness(ctx)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400140 if err != nil {
141 // Catch possible error case if sending liveness after Sarama has been stopped.
142 logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
143 }
144 }()
145 case <-ctx.Done():
146 return // just exit
147 }
148 }
149}
150
Neha Sharmabe485932020-05-25 21:52:55 +0000151func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic, corePairTopic string) {
152 requestProxy := api.NewAdapterRequestHandlerProxy(ctx, dMgr, aMgr)
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400153
154 // Register the broadcast topic to handle any core-bound broadcast requests
Neha Sharmabe485932020-05-25 21:52:55 +0000155 if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400156 logger.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
157 }
158
159 // Register the core-pair topic to handle core-bound requests destined to the core pair
Neha Sharmabe485932020-05-25 21:52:55 +0000160 if err := kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: corePairTopic}, kafka.OffsetNewest); err != nil {
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400161 logger.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": corePairTopic})
162 }
163
164 logger.Info("request-handler-registered")
165}