blob: f966efb881a035aee2d54f89f875de859bed4fa6 [file] [log] [blame]
khenaidoo6f415b22021-06-22 18:08:53 -04001/*
Joey Armstrong7f8436c2023-07-09 20:23:27 -04002* Copyright 2021-2023 Open Networking Foundation (ONF) and the ONF Contributors
khenaidoo6f415b22021-06-22 18:08:53 -04003
Joey Armstrong7f8436c2023-07-09 20:23:27 -04004* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
khenaidoo6f415b22021-06-22 18:08:53 -04007
Joey Armstrong7f8436c2023-07-09 20:23:27 -04008* http://www.apache.org/licenses/LICENSE-2.0
khenaidoo6f415b22021-06-22 18:08:53 -04009
Joey Armstrong7f8436c2023-07-09 20:23:27 -040010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
khenaidoo6f415b22021-06-22 18:08:53 -040015 */
16package kvstore
17
18import (
19 "container/list"
20 "context"
21 "errors"
khenaidoo6f415b22021-06-22 18:08:53 -040022 "sync"
23 "time"
khenaidoo26721882021-08-11 17:42:52 -040024
25 "github.com/opencord/voltha-lib-go/v7/pkg/log"
26 "go.etcd.io/etcd/clientv3"
khenaidoo6f415b22021-06-22 18:08:53 -040027)
28
29// EtcdClientAllocator represents a generic interface to allocate an Etcd Client
30type EtcdClientAllocator interface {
31 Get(context.Context) (*clientv3.Client, error)
32 Put(*clientv3.Client)
33 Close(ctx context.Context)
34}
35
36// NewRoundRobinEtcdClientAllocator creates a new ETCD Client Allocator using a Round Robin scheme
37func NewRoundRobinEtcdClientAllocator(endpoints []string, timeout time.Duration, capacity, maxUsage int, level log.LogLevel) (EtcdClientAllocator, error) {
38 return &roundRobin{
39 all: make(map[*clientv3.Client]*rrEntry),
40 full: make(map[*clientv3.Client]*rrEntry),
41 waitList: list.New(),
42 max: maxUsage,
43 capacity: capacity,
44 timeout: timeout,
45 endpoints: endpoints,
46 logLevel: level,
47 closingCh: make(chan struct{}, capacity*maxUsage),
48 stopCh: make(chan struct{}),
49 }, nil
50}
51
52type rrEntry struct {
53 client *clientv3.Client
54 count int
55 age time.Time
56}
57
58type roundRobin struct {
59 //block chan struct{}
60 sync.Mutex
61 available []*rrEntry
62 all map[*clientv3.Client]*rrEntry
63 full map[*clientv3.Client]*rrEntry
64 waitList *list.List
65 max int
66 capacity int
67 timeout time.Duration
68 //ageOut time.Duration
69 endpoints []string
70 size int
71 logLevel log.LogLevel
72 closing bool
73 closingCh chan struct{}
74 stopCh chan struct{}
75}
76
77// Get returns an Etcd client. If not is available, it will create one
78// until the maximum allowed capacity. If maximum capacity has been
79// reached then it will wait until s used one is freed.
80func (r *roundRobin) Get(ctx context.Context) (*clientv3.Client, error) {
81 r.Lock()
82
83 if r.closing {
84 r.Unlock()
85 return nil, errors.New("pool-is-closing")
86 }
87
88 // first determine if we need to block, which would mean the
89 // available queue is empty and we are at capacity
90 if len(r.available) == 0 && r.size >= r.capacity {
91
92 // create a channel on which to wait and
93 // add it to the list
94 ch := make(chan struct{})
95 element := r.waitList.PushBack(ch)
96 r.Unlock()
97
98 // block until it is our turn or context
99 // expires or is canceled
100 select {
101 case <-r.stopCh:
102 logger.Info(ctx, "stop-waiting-pool-is-closing")
103 r.waitList.Remove(element)
104 return nil, errors.New("stop-waiting-pool-is-closing")
105 case <-ch:
106 r.waitList.Remove(element)
107 case <-ctx.Done():
108 r.waitList.Remove(element)
109 return nil, ctx.Err()
110 }
111 r.Lock()
112 }
113
114 defer r.Unlock()
115 if len(r.available) > 0 {
116 // pull off back end as it is operationally quicker
117 last := len(r.available) - 1
118 entry := r.available[last]
119 entry.count++
120 if entry.count >= r.max {
121 r.available = r.available[:last]
122 r.full[entry.client] = entry
123 }
124 entry.age = time.Now()
125 return entry.client, nil
126 }
127
128 logConfig := log.ConstructZapConfig(log.JSON, r.logLevel, log.Fields{})
129 // increase capacity
130 client, err := clientv3.New(clientv3.Config{
131 Endpoints: r.endpoints,
132 DialTimeout: r.timeout,
133 LogConfig: &logConfig,
134 })
135 if err != nil {
136 return nil, err
137 }
138 entry := &rrEntry{
139 client: client,
140 count: 1,
141 }
142 r.all[entry.client] = entry
143
144 if r.max > 1 {
145 r.available = append(r.available, entry)
146 } else {
147 r.full[entry.client] = entry
148 }
149 r.size++
150 return client, nil
151}
152
153// Put returns the Etcd Client back to the pool
154func (r *roundRobin) Put(client *clientv3.Client) {
155 r.Lock()
156
157 entry := r.all[client]
158 entry.count--
159
160 if r.closing {
161 // Close client if count is 0
162 if entry.count == 0 {
163 if err := entry.client.Close(); err != nil {
164 logger.Warnw(context.Background(), "error-closing-client", log.Fields{"error": err})
165 }
166 delete(r.all, entry.client)
167 }
168 // Notify Close function that a client was returned to the pool
169 r.closingCh <- struct{}{}
170 r.Unlock()
171 return
172 }
173
174 // This entry is now available for use, so
175 // if in full map add it to available and
176 // remove from full
177 if _, ok := r.full[client]; ok {
178 r.available = append(r.available, entry)
179 delete(r.full, client)
180 }
181
182 front := r.waitList.Front()
183 if front != nil {
184 ch := r.waitList.Remove(front)
185 r.Unlock()
186 // need to unblock if someone is waiting
187 ch.(chan struct{}) <- struct{}{}
188 return
189 }
190 r.Unlock()
191}
192
193func (r *roundRobin) Close(ctx context.Context) {
194 r.Lock()
195 r.closing = true
196
197 // Notify anyone waiting for a client to stop waiting
198 close(r.stopCh)
199
200 // Clean-up unused clients
201 for i := 0; i < len(r.available); i++ {
202 // Count 0 means no one is using that client
203 if r.available[i].count == 0 {
204 if err := r.available[i].client.Close(); err != nil {
205 logger.Warnw(ctx, "failure-closing-client", log.Fields{"client": r.available[i].client, "error": err})
206 }
207 // Remove client for all list
208 delete(r.all, r.available[i].client)
209 }
210 }
211
212 // Figure out how many clients are in use
213 numberInUse := 0
214 for _, rrEntry := range r.all {
215 numberInUse += rrEntry.count
216 }
217 r.Unlock()
218
219 if numberInUse == 0 {
220 logger.Info(ctx, "no-connection-in-use")
221 return
222 }
223
224 logger.Infow(ctx, "waiting-for-clients-return", log.Fields{"count": numberInUse})
225
226 // Wait for notifications when a client is returned to the pool
227 for {
228 select {
229 case <-r.closingCh:
230 numberInUse--
231 if numberInUse == 0 {
232 logger.Info(ctx, "all-connections-closed")
233 return
234 }
235 case <-ctx.Done():
236 logger.Warnw(ctx, "context-done", log.Fields{"error": ctx.Err()})
237 return
238 }
239 }
240}