blob: 7dcc72463f8b77890b3cd64eeb5a155ed86f6c2e [file] [log] [blame]
khenaidoo7d3c5582021-08-11 18:09:44 -04001/*
Joey Armstrongacd8d182024-01-28 16:01:00 -05002* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
khenaidoo7d3c5582021-08-11 18:09:44 -04003
Mahir Gunyel00554c62023-07-24 09:44:52 +03004* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
khenaidoo7d3c5582021-08-11 18:09:44 -04007
Mahir Gunyel00554c62023-07-24 09:44:52 +03008* http://www.apache.org/licenses/LICENSE-2.0
khenaidoo7d3c5582021-08-11 18:09:44 -04009
Mahir Gunyel00554c62023-07-24 09:44:52 +030010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
khenaidoo7d3c5582021-08-11 18:09:44 -040015 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "strings"
22 "time"
23
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/opencord/voltha-lib-go/v7/pkg/log"
26 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
27)
28
29const (
30 TopicSeparator = "_"
31 DeviceIdLength = 24
32)
33
34// A Topic definition - may be augmented with additional attributes eventually
35type Topic struct {
36 // The name of the topic. It must start with a letter,
37 // and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
38 // underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
39 // signs (`%`).
40 Name string
41}
42
43type KVArg struct {
44 Key string
45 Value interface{}
46}
47
48type RpcMType int
49
50const (
51 RpcFormattingError RpcMType = iota
52 RpcSent
53 RpcReply
54 RpcTimeout
55 RpcTransportError
56 RpcSystemClosing
57)
58
59type RpcResponse struct {
60 MType RpcMType
61 Err error
62 Reply *any.Any
63}
64
65func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
66 return &RpcResponse{
67 MType: messageType,
68 Err: err,
69 Reply: body,
70 }
71}
72
73// TODO: Remove and provide better may to get the device id
74// GetDeviceIdFromTopic extract the deviceId from the topic name. The topic name is formatted either as:
Mahir Gunyel00554c62023-07-24 09:44:52 +030075//
76// <any string> or <any string>_<deviceId>. The device Id is 24 characters long.
khenaidoo7d3c5582021-08-11 18:09:44 -040077func GetDeviceIdFromTopic(topic Topic) string {
78 pos := strings.LastIndex(topic.Name, TopicSeparator)
79 if pos == -1 {
80 return ""
81 }
82 adjustedPos := pos + len(TopicSeparator)
83 if adjustedPos >= len(topic.Name) {
84 return ""
85 }
86 deviceId := topic.Name[adjustedPos:len(topic.Name)]
87 if len(deviceId) != DeviceIdLength {
88 return ""
89 }
90 return deviceId
91}
92
93// WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the
94// context times out.
95func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error {
96 if kClient == nil {
97 return errors.New("kafka-client-is-nil")
98 }
99 for {
100 if err := kClient.Start(ctx); err != nil {
101 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
102 logger.Warnw(ctx, "kafka-connection-down", log.Fields{"error": err})
103 select {
104 case <-time.After(connectionRetryInterval):
105 continue
106 case <-ctx.Done():
107 return ctx.Err()
108 }
109 }
110 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
111 logger.Info(ctx, "kafka-connection-up")
112 break
113 }
114 return nil
115}
116
Mahir Gunyel00554c62023-07-24 09:44:52 +0300117/*
118*
khenaidoo7d3c5582021-08-11 18:09:44 -0400119MonitorKafkaReadiness checks the liveliness and readiness of the kafka service
120and update the status in the probe.
121*/
122func MonitorKafkaReadiness(ctx context.Context,
123 kClient Client,
124 liveProbeInterval, notLiveProbeInterval time.Duration,
125 serviceName string) {
126
127 if kClient == nil {
128 logger.Fatal(ctx, "kafka-client-is-nil")
129 }
130
131 logger.Infow(ctx, "monitor-kafka-readiness", log.Fields{"service": serviceName})
132
133 livelinessChannel := kClient.EnableLivenessChannel(ctx, true)
134 healthinessChannel := kClient.EnableHealthinessChannel(ctx, true)
135 timeout := liveProbeInterval
136 failed := false
137 for {
138 timeoutTimer := time.NewTimer(timeout)
139
140 select {
141 case healthiness := <-healthinessChannel:
142 if !healthiness {
143 // This will eventually cause K8s to restart the container, and will do
144 // so in a way that allows cleanup to continue, rather than an immediate
145 // panic and exit here.
146 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusFailed)
147 logger.Infow(ctx, "kafka-not-healthy", log.Fields{"service": serviceName})
148 failed = true
149 }
150 // Check if the timer has expired or not
151 if !timeoutTimer.Stop() {
152 <-timeoutTimer.C
153 }
154 case liveliness := <-livelinessChannel:
155 if failed {
156 // Failures of the message bus are permanent and can't ever be recovered from,
157 // so make sure we never inadvertently reset a failed state back to unready.
158 } else if !liveliness {
159 // kafka not reachable or down, updating the status to not ready state
160 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
161 logger.Infow(ctx, "kafka-not-live", log.Fields{"service": serviceName})
162 timeout = notLiveProbeInterval
163 } else {
164 // kafka is reachable , updating the status to running state
165 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
166 timeout = liveProbeInterval
167 }
168 // Check if the timer has expired or not
169 if !timeoutTimer.Stop() {
170 <-timeoutTimer.C
171 }
172 case <-timeoutTimer.C:
173 logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
174 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
175 // the liveness probe may wait (and block) writing to our channel.
176 go func() {
177 err := kClient.SendLiveness(ctx)
178 if err != nil {
179 // Catch possible error case if sending liveness after Sarama has been stopped.
180 logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
181 }
182 }()
183 case <-ctx.Done():
184 return // just exit
185 }
186 }
187}