Scott Baker | 2c1c482 | 2019-10-16 11:02:41 -0700 | [diff] [blame] | 1 | /* |
Joey Armstrong | 9cdee9f | 2024-01-03 04:56:14 -0500 | [diff] [blame] | 2 | * Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors |
Scott Baker | 2c1c482 | 2019-10-16 11:02:41 -0700 | [diff] [blame] | 3 | |
Joey Armstrong | 7f8436c | 2023-07-09 20:23:27 -0400 | [diff] [blame] | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
Scott Baker | 2c1c482 | 2019-10-16 11:02:41 -0700 | [diff] [blame] | 7 | |
Joey Armstrong | 7f8436c | 2023-07-09 20:23:27 -0400 | [diff] [blame] | 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
Scott Baker | 2c1c482 | 2019-10-16 11:02:41 -0700 | [diff] [blame] | 9 | |
Joey Armstrong | 7f8436c | 2023-07-09 20:23:27 -0400 | [diff] [blame] | 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
Scott Baker | 2c1c482 | 2019-10-16 11:02:41 -0700 | [diff] [blame] | 15 | */ |
| 16 | package kafka |
| 17 | |
Matteo Scandolo | ed12882 | 2020-02-10 15:52:35 -0800 | [diff] [blame] | 18 | import ( |
khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 19 | "context" |
| 20 | "errors" |
Matteo Scandolo | ed12882 | 2020-02-10 15:52:35 -0800 | [diff] [blame] | 21 | "strings" |
khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 22 | "time" |
| 23 | |
| 24 | "github.com/golang/protobuf/ptypes/any" |
| 25 | "github.com/opencord/voltha-lib-go/v7/pkg/log" |
| 26 | "github.com/opencord/voltha-lib-go/v7/pkg/probe" |
Matteo Scandolo | ed12882 | 2020-02-10 15:52:35 -0800 | [diff] [blame] | 27 | ) |
Scott Baker | 2c1c482 | 2019-10-16 11:02:41 -0700 | [diff] [blame] | 28 | |
| 29 | const ( |
| 30 | TopicSeparator = "_" |
| 31 | DeviceIdLength = 24 |
| 32 | ) |
| 33 | |
| 34 | // A Topic definition - may be augmented with additional attributes eventually |
| 35 | type Topic struct { |
| 36 | // The name of the topic. It must start with a letter, |
| 37 | // and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`), |
| 38 | // underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent |
| 39 | // signs (`%`). |
| 40 | Name string |
| 41 | } |
| 42 | |
| 43 | type KVArg struct { |
| 44 | Key string |
| 45 | Value interface{} |
| 46 | } |
| 47 | |
Matteo Scandolo | ed12882 | 2020-02-10 15:52:35 -0800 | [diff] [blame] | 48 | type RpcMType int |
| 49 | |
| 50 | const ( |
| 51 | RpcFormattingError RpcMType = iota |
| 52 | RpcSent |
| 53 | RpcReply |
| 54 | RpcTimeout |
| 55 | RpcTransportError |
| 56 | RpcSystemClosing |
| 57 | ) |
| 58 | |
| 59 | type RpcResponse struct { |
| 60 | MType RpcMType |
| 61 | Err error |
| 62 | Reply *any.Any |
| 63 | } |
| 64 | |
| 65 | func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse { |
| 66 | return &RpcResponse{ |
| 67 | MType: messageType, |
| 68 | Err: err, |
| 69 | Reply: body, |
| 70 | } |
| 71 | } |
| 72 | |
Scott Baker | 2c1c482 | 2019-10-16 11:02:41 -0700 | [diff] [blame] | 73 | // TODO: Remove and provide better may to get the device id |
| 74 | // GetDeviceIdFromTopic extract the deviceId from the topic name. The topic name is formatted either as: |
Joey Armstrong | 7f8436c | 2023-07-09 20:23:27 -0400 | [diff] [blame] | 75 | // |
| 76 | // <any string> or <any string>_<deviceId>. The device Id is 24 characters long. |
Scott Baker | 2c1c482 | 2019-10-16 11:02:41 -0700 | [diff] [blame] | 77 | func GetDeviceIdFromTopic(topic Topic) string { |
| 78 | pos := strings.LastIndex(topic.Name, TopicSeparator) |
| 79 | if pos == -1 { |
| 80 | return "" |
| 81 | } |
| 82 | adjustedPos := pos + len(TopicSeparator) |
| 83 | if adjustedPos >= len(topic.Name) { |
| 84 | return "" |
| 85 | } |
| 86 | deviceId := topic.Name[adjustedPos:len(topic.Name)] |
| 87 | if len(deviceId) != DeviceIdLength { |
| 88 | return "" |
| 89 | } |
| 90 | return deviceId |
| 91 | } |
khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 92 | |
| 93 | // WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the |
| 94 | // context times out. |
| 95 | func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error { |
| 96 | if kClient == nil { |
| 97 | return errors.New("kafka-client-is-nil") |
| 98 | } |
| 99 | for { |
| 100 | if err := kClient.Start(ctx); err != nil { |
| 101 | probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady) |
| 102 | logger.Warnw(ctx, "kafka-connection-down", log.Fields{"error": err}) |
| 103 | select { |
| 104 | case <-time.After(connectionRetryInterval): |
| 105 | continue |
| 106 | case <-ctx.Done(): |
| 107 | return ctx.Err() |
| 108 | } |
| 109 | } |
| 110 | probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning) |
| 111 | logger.Info(ctx, "kafka-connection-up") |
| 112 | break |
| 113 | } |
| 114 | return nil |
| 115 | } |
| 116 | |
Joey Armstrong | 7f8436c | 2023-07-09 20:23:27 -0400 | [diff] [blame] | 117 | /* |
| 118 | * |
khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 119 | MonitorKafkaReadiness checks the liveliness and readiness of the kafka service |
| 120 | and update the status in the probe. |
| 121 | */ |
| 122 | func MonitorKafkaReadiness(ctx context.Context, |
| 123 | kClient Client, |
| 124 | liveProbeInterval, notLiveProbeInterval time.Duration, |
| 125 | serviceName string) { |
| 126 | |
| 127 | if kClient == nil { |
| 128 | logger.Fatal(ctx, "kafka-client-is-nil") |
| 129 | } |
| 130 | |
| 131 | logger.Infow(ctx, "monitor-kafka-readiness", log.Fields{"service": serviceName}) |
| 132 | |
| 133 | livelinessChannel := kClient.EnableLivenessChannel(ctx, true) |
| 134 | healthinessChannel := kClient.EnableHealthinessChannel(ctx, true) |
| 135 | timeout := liveProbeInterval |
| 136 | failed := false |
| 137 | for { |
| 138 | timeoutTimer := time.NewTimer(timeout) |
| 139 | |
| 140 | select { |
| 141 | case healthiness := <-healthinessChannel: |
| 142 | if !healthiness { |
| 143 | // This will eventually cause K8s to restart the container, and will do |
| 144 | // so in a way that allows cleanup to continue, rather than an immediate |
| 145 | // panic and exit here. |
| 146 | probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusFailed) |
| 147 | logger.Infow(ctx, "kafka-not-healthy", log.Fields{"service": serviceName}) |
| 148 | failed = true |
| 149 | } |
| 150 | // Check if the timer has expired or not |
| 151 | if !timeoutTimer.Stop() { |
| 152 | <-timeoutTimer.C |
| 153 | } |
| 154 | case liveliness := <-livelinessChannel: |
| 155 | if failed { |
| 156 | // Failures of the message bus are permanent and can't ever be recovered from, |
| 157 | // so make sure we never inadvertently reset a failed state back to unready. |
| 158 | } else if !liveliness { |
| 159 | // kafka not reachable or down, updating the status to not ready state |
| 160 | probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady) |
| 161 | logger.Infow(ctx, "kafka-not-live", log.Fields{"service": serviceName}) |
| 162 | timeout = notLiveProbeInterval |
| 163 | } else { |
| 164 | // kafka is reachable , updating the status to running state |
| 165 | probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning) |
| 166 | timeout = liveProbeInterval |
| 167 | } |
| 168 | // Check if the timer has expired or not |
| 169 | if !timeoutTimer.Stop() { |
| 170 | <-timeoutTimer.C |
| 171 | } |
| 172 | case <-timeoutTimer.C: |
| 173 | logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName}) |
| 174 | // send the liveness probe in a goroutine; we don't want to deadlock ourselves as |
| 175 | // the liveness probe may wait (and block) writing to our channel. |
| 176 | go func() { |
| 177 | err := kClient.SendLiveness(ctx) |
| 178 | if err != nil { |
| 179 | // Catch possible error case if sending liveness after Sarama has been stopped. |
| 180 | logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName}) |
| 181 | } |
| 182 | }() |
| 183 | case <-ctx.Done(): |
| 184 | return // just exit |
| 185 | } |
| 186 | } |
| 187 | } |