blob: 52cf88b9d4db79fe4238012ecc6145cffea7fe79 [file] [log] [blame]
Prince Pereirac1c21d62021-04-22 08:38:15 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17// Package db holds utils for datastore implementation
18package db
19
20import (
21 "context"
22 "errors"
23 "time"
24
25 "github.com/opencord/voltha-lib-go/v4/pkg/db"
26 "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
27
28 "github.com/opencord/voltha-lib-go/v4/pkg/probe"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
31
32 "github.com/opencord/opendevice-manager/pkg/config"
33
34 "github.com/opencord/voltha-lib-go/v4/pkg/log"
35)
36
37var kvClient *KvStoreClient
38
39// KvStoreClient holds the KVStore info
40type KvStoreClient struct {
41 client kvstore.Client
42}
43
44// logger represents the log object
45var logger log.CLogger
46
47// init function for the package
48func init() {
49 logger = config.Initlog()
50}
51
52// NewKVClient function initialises and connects to KVStore
53func NewKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
54 ctx1, cancel := context.WithTimeout(ctx, timeout)
55 logger.Infow(ctx, "kv-store-type", log.Fields{"store": config.KVStore})
56 switch storeType {
57 case "etcd":
58 etcdClient, err := kvstore.NewEtcdClient(ctx1, address, timeout, log.FatalLevel)
59 defer cancel()
60 if !etcdClient.IsConnectionUp(ctx1) {
61 logger.Errorw(ctx, "etcd-server-unreachable", log.Fields{"address": address})
62 return nil, errors.New("etcd client unreachable")
63 }
64 kvClient = new(KvStoreClient)
65 kvClient.client = etcdClient
66 return etcdClient, err
67 }
68 return nil, errors.New("unsupported-kv-store")
69}
70
71// StopKVClient function disconnects connects from KVStore
72func StopKVClient(ctx context.Context, kvClient kvstore.Client) {
73
74 // Release all reservations
75 if err := kvClient.ReleaseAllReservations(ctx); err != nil {
76 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
77 }
78 // Close the DB connection
79 kvClient.Close(ctx)
80}
81
82// WaitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
83func WaitUntilKVStoreReachableOrMaxTries(ctx context.Context, kvClient kvstore.Client, maxRetries int, retryInterval time.Duration) error {
84 logger.Infow(ctx, "verifying-KV-store-connectivity", log.Fields{"retries": maxRetries, "retryInterval": retryInterval})
85 count := 0
86 for {
87 if !kvClient.IsConnectionUp(ctx) {
88 logger.Info(ctx, "KV-store-unreachable")
89 if maxRetries != -1 {
90 if count >= maxRetries {
91 return status.Error(codes.Unavailable, "kv store unreachable")
92 }
93 }
94 count++
95
96 // Take a nap before retrying
97 select {
98 case <-ctx.Done():
99 //ctx canceled
100 return ctx.Err()
101 case <-time.After(retryInterval):
102 }
103 logger.Infow(ctx, "retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
104 } else {
105 break
106 }
107 }
108 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
109 logger.Info(ctx, "KV-store-reachable")
110 return nil
111}
112
113/*
114 * Thread to monitor kvstore Liveness (connection status)
115 *
116 * This function constantly monitors Liveness State of kvstore as reported
117 * periodically by backend and updates the Status of kv-store service registered
118 * with rw_core probe.
119 *
120 * If no liveness event has been seen within a timeout, then the thread will
121 * perform a "liveness" check attempt, which will in turn trigger a liveness event on
122 * the liveness channel, true or false depending on whether the attempt succeeded.
123 *
124 * The gRPC server in turn monitors the state of the readiness probe and will
125 * start issuing UNAVAILABLE response while the probe is not ready.
126 */
127// MonitorKVStoreLiveness abc
128func MonitorKVStoreLiveness(ctx context.Context, backend *db.Backend, liveProbeInterval, notLiveProbeInterval time.Duration) {
129 logger.Info(ctx, "start-monitoring-kvstore-liveness")
130
131 // Instruct backend to create Liveness channel for transporting state updates
132 livenessChannel := backend.EnableLivenessChannel(ctx)
133
134 logger.Debug(ctx, "enabled-kvstore-liveness-channel")
135
136 // Default state for kvstore is alive for rw_core
137 timeout := liveProbeInterval
138loop:
139 for {
140 timeoutTimer := time.NewTimer(timeout)
141 select {
142
143 case liveness := <-livenessChannel:
144 logger.Debugw(ctx, "received-liveness-change-notification", log.Fields{"liveness": liveness})
145
146 if !liveness {
147 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
148 logger.Info(ctx, "kvstore-set-server-notready")
149
150 timeout = notLiveProbeInterval
151
152 } else {
153 probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
154 logger.Info(ctx, "kvstore-set-server-ready")
155
156 timeout = liveProbeInterval
157 }
158
159 if !timeoutTimer.Stop() {
160 <-timeoutTimer.C
161 }
162
163 case <-ctx.Done():
164 break loop
165
166 case <-timeoutTimer.C:
167 logger.Info(ctx, "kvstore-perform-liveness-check-on-timeout")
168
169 // Trigger Liveness check if no liveness update received within the timeout period.
170 // The Liveness check will push Live state to same channel which this routine is
171 // reading and processing. This, do it asynchronously to avoid blocking for
172 // backend response and avoid any possibility of deadlock
173 go backend.PerformLivenessCheck(ctx)
174 }
175 }
176}