blob: 26cdba370c0952861da758dca76d3a3a70def5e9 [file] [log] [blame]
Prince Pereirac1c21d62021-04-22 08:38:15 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17// Package db holds utils for datastore implementation
18package db
19
20import (
21 "context"
22 "errors"
23 "time"
24
25 "github.com/opencord/voltha-lib-go/v4/pkg/db"
26 "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
27
28 "github.com/opencord/voltha-lib-go/v4/pkg/probe"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
31
32 "github.com/opencord/opendevice-manager/pkg/config"
33
34 "github.com/opencord/voltha-lib-go/v4/pkg/log"
35)
36
Prince Pereira32d40f22021-05-27 06:14:29 +000037var kvClient kvstore.Client
Prince Pereirac1c21d62021-04-22 08:38:15 +000038
39// logger represents the log object
40var logger log.CLogger
41
42// init function for the package
43func init() {
44 logger = config.Initlog()
45}
46
47// NewKVClient function initialises and connects to KVStore
48func NewKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
49 ctx1, cancel := context.WithTimeout(ctx, timeout)
50 logger.Infow(ctx, "kv-store-type", log.Fields{"store": config.KVStore})
51 switch storeType {
52 case "etcd":
53 etcdClient, err := kvstore.NewEtcdClient(ctx1, address, timeout, log.FatalLevel)
54 defer cancel()
55 if !etcdClient.IsConnectionUp(ctx1) {
56 logger.Errorw(ctx, "etcd-server-unreachable", log.Fields{"address": address})
57 return nil, errors.New("etcd client unreachable")
58 }
Prince Pereira32d40f22021-05-27 06:14:29 +000059 kvClient = etcdClient
Prince Pereirac1c21d62021-04-22 08:38:15 +000060 return etcdClient, err
61 }
62 return nil, errors.New("unsupported-kv-store")
63}
64
65// StopKVClient function disconnects connects from KVStore
66func StopKVClient(ctx context.Context, kvClient kvstore.Client) {
67
68 // Release all reservations
69 if err := kvClient.ReleaseAllReservations(ctx); err != nil {
70 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
71 }
72 // Close the DB connection
73 kvClient.Close(ctx)
74}
75
76// WaitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
77func WaitUntilKVStoreReachableOrMaxTries(ctx context.Context, kvClient kvstore.Client, maxRetries int, retryInterval time.Duration) error {
78 logger.Infow(ctx, "verifying-KV-store-connectivity", log.Fields{"retries": maxRetries, "retryInterval": retryInterval})
79 count := 0
80 for {
81 if !kvClient.IsConnectionUp(ctx) {
82 logger.Info(ctx, "KV-store-unreachable")
83 if maxRetries != -1 {
84 if count >= maxRetries {
85 return status.Error(codes.Unavailable, "kv store unreachable")
86 }
87 }
88 count++
89
90 // Take a nap before retrying
91 select {
92 case <-ctx.Done():
93 //ctx canceled
94 return ctx.Err()
95 case <-time.After(retryInterval):
96 }
97 logger.Infow(ctx, "retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
98 } else {
99 break
100 }
101 }
102 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
103 logger.Info(ctx, "KV-store-reachable")
104 return nil
105}
106
107/*
108 * Thread to monitor kvstore Liveness (connection status)
109 *
110 * This function constantly monitors Liveness State of kvstore as reported
111 * periodically by backend and updates the Status of kv-store service registered
112 * with rw_core probe.
113 *
114 * If no liveness event has been seen within a timeout, then the thread will
115 * perform a "liveness" check attempt, which will in turn trigger a liveness event on
116 * the liveness channel, true or false depending on whether the attempt succeeded.
117 *
118 * The gRPC server in turn monitors the state of the readiness probe and will
119 * start issuing UNAVAILABLE response while the probe is not ready.
120 */
121// MonitorKVStoreLiveness abc
122func MonitorKVStoreLiveness(ctx context.Context, backend *db.Backend, liveProbeInterval, notLiveProbeInterval time.Duration) {
123 logger.Info(ctx, "start-monitoring-kvstore-liveness")
124
125 // Instruct backend to create Liveness channel for transporting state updates
126 livenessChannel := backend.EnableLivenessChannel(ctx)
127
128 logger.Debug(ctx, "enabled-kvstore-liveness-channel")
129
130 // Default state for kvstore is alive for rw_core
131 timeout := liveProbeInterval
132loop:
133 for {
134 timeoutTimer := time.NewTimer(timeout)
135 select {
136
137 case liveness := <-livenessChannel:
138 logger.Debugw(ctx, "received-liveness-change-notification", log.Fields{"liveness": liveness})
139
140 if !liveness {
141 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
142 logger.Info(ctx, "kvstore-set-server-notready")
143
144 timeout = notLiveProbeInterval
145
146 } else {
147 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
148 logger.Info(ctx, "kvstore-set-server-ready")
149
150 timeout = liveProbeInterval
151 }
152
153 if !timeoutTimer.Stop() {
154 <-timeoutTimer.C
155 }
156
157 case <-ctx.Done():
158 break loop
159
160 case <-timeoutTimer.C:
161 logger.Info(ctx, "kvstore-perform-liveness-check-on-timeout")
162
163 // Trigger Liveness check if no liveness update received within the timeout period.
164 // The Liveness check will push Live state to same channel which this routine is
165 // reading and processing. This, do it asynchronously to avoid blocking for
166 // backend response and avoid any possibility of deadlock
167 go backend.PerformLivenessCheck(ctx)
168 }
169 }
170}