blob: 608361b0a5ec2d842cf9482aada2b634bafa7187 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "errors"
21 "strings"
22 "time"
23
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/opencord/voltha-lib-go/v7/pkg/log"
26 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
27)
28
29const (
30 TopicSeparator = "_"
31 DeviceIdLength = 24
32)
33
34// A Topic definition - may be augmented with additional attributes eventually
35type Topic struct {
36 // The name of the topic. It must start with a letter,
37 // and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
38 // underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
39 // signs (`%`).
40 Name string
41}
42
43type KVArg struct {
44 Key string
45 Value interface{}
46}
47
48type RpcMType int
49
50const (
51 RpcFormattingError RpcMType = iota
52 RpcSent
53 RpcReply
54 RpcTimeout
55 RpcTransportError
56 RpcSystemClosing
57)
58
59type RpcResponse struct {
60 MType RpcMType
61 Err error
62 Reply *any.Any
63}
64
65func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
66 return &RpcResponse{
67 MType: messageType,
68 Err: err,
69 Reply: body,
70 }
71}
72
73// TODO: Remove and provide better may to get the device id
74// GetDeviceIdFromTopic extract the deviceId from the topic name. The topic name is formatted either as:
75// <any string> or <any string>_<deviceId>. The device Id is 24 characters long.
76func GetDeviceIdFromTopic(topic Topic) string {
77 pos := strings.LastIndex(topic.Name, TopicSeparator)
78 if pos == -1 {
79 return ""
80 }
81 adjustedPos := pos + len(TopicSeparator)
82 if adjustedPos >= len(topic.Name) {
83 return ""
84 }
85 deviceId := topic.Name[adjustedPos:len(topic.Name)]
86 if len(deviceId) != DeviceIdLength {
87 return ""
88 }
89 return deviceId
90}
91
92// WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the
93// context times out.
94func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error {
95 if kClient == nil {
96 return errors.New("kafka-client-is-nil")
97 }
98 for {
99 if err := kClient.Start(ctx); err != nil {
100 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
101 logger.Warnw(ctx, "kafka-connection-down", log.Fields{"error": err})
102 select {
103 case <-time.After(connectionRetryInterval):
104 continue
105 case <-ctx.Done():
106 return ctx.Err()
107 }
108 }
109 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
110 logger.Info(ctx, "kafka-connection-up")
111 break
112 }
113 return nil
114}
115
116/**
117MonitorKafkaReadiness checks the liveliness and readiness of the kafka service
118and update the status in the probe.
119*/
120func MonitorKafkaReadiness(ctx context.Context,
121 kClient Client,
122 liveProbeInterval, notLiveProbeInterval time.Duration,
123 serviceName string) {
124
125 if kClient == nil {
126 logger.Fatal(ctx, "kafka-client-is-nil")
127 }
128
129 logger.Infow(ctx, "monitor-kafka-readiness", log.Fields{"service": serviceName})
130
131 livelinessChannel := kClient.EnableLivenessChannel(ctx, true)
132 healthinessChannel := kClient.EnableHealthinessChannel(ctx, true)
133 timeout := liveProbeInterval
134 failed := false
135 for {
136 timeoutTimer := time.NewTimer(timeout)
137
138 select {
139 case healthiness := <-healthinessChannel:
140 if !healthiness {
141 // This will eventually cause K8s to restart the container, and will do
142 // so in a way that allows cleanup to continue, rather than an immediate
143 // panic and exit here.
144 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusFailed)
145 logger.Infow(ctx, "kafka-not-healthy", log.Fields{"service": serviceName})
146 failed = true
147 }
148 // Check if the timer has expired or not
149 if !timeoutTimer.Stop() {
150 <-timeoutTimer.C
151 }
152 case liveliness := <-livelinessChannel:
153 if failed {
154 // Failures of the message bus are permanent and can't ever be recovered from,
155 // so make sure we never inadvertently reset a failed state back to unready.
156 } else if !liveliness {
157 // kafka not reachable or down, updating the status to not ready state
158 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
159 logger.Infow(ctx, "kafka-not-live", log.Fields{"service": serviceName})
160 timeout = notLiveProbeInterval
161 } else {
162 // kafka is reachable , updating the status to running state
163 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
164 timeout = liveProbeInterval
165 }
166 // Check if the timer has expired or not
167 if !timeoutTimer.Stop() {
168 <-timeoutTimer.C
169 }
170 case <-timeoutTimer.C:
171 logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
172 // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
173 // the liveness probe may wait (and block) writing to our channel.
174 go func() {
175 err := kClient.SendLiveness(ctx)
176 if err != nil {
177 // Catch possible error case if sending liveness after Sarama has been stopped.
178 logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
179 }
180 }()
181 case <-ctx.Done():
182 return // just exit
183 }
184 }
185}