blob: 6af7d3ddeb01408ada819e54786d7b8fd9d5ed97 [file] [log] [blame]
Andrea Campanella18448bc2021-07-08 18:47:22 +02001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kvstore
17
18import (
19 "container/list"
20 "context"
21 "errors"
22 "github.com/opencord/voltha-lib-go/v5/pkg/log"
23 "go.etcd.io/etcd/clientv3"
24 "sync"
25 "time"
26)
27
28// EtcdClientAllocator represents a generic interface to allocate an Etcd Client
29type EtcdClientAllocator interface {
30 Get(context.Context) (*clientv3.Client, error)
31 Put(*clientv3.Client)
32 Close(ctx context.Context)
33}
34
35// NewRoundRobinEtcdClientAllocator creates a new ETCD Client Allocator using a Round Robin scheme
36func NewRoundRobinEtcdClientAllocator(endpoints []string, timeout time.Duration, capacity, maxUsage int, level log.LogLevel) (EtcdClientAllocator, error) {
37 return &roundRobin{
38 all: make(map[*clientv3.Client]*rrEntry),
39 full: make(map[*clientv3.Client]*rrEntry),
40 waitList: list.New(),
41 max: maxUsage,
42 capacity: capacity,
43 timeout: timeout,
44 endpoints: endpoints,
45 logLevel: level,
46 closingCh: make(chan struct{}, capacity*maxUsage),
47 stopCh: make(chan struct{}),
48 }, nil
49}
50
51type rrEntry struct {
52 client *clientv3.Client
53 count int
54 age time.Time
55}
56
57type roundRobin struct {
58 //block chan struct{}
59 sync.Mutex
60 available []*rrEntry
61 all map[*clientv3.Client]*rrEntry
62 full map[*clientv3.Client]*rrEntry
63 waitList *list.List
64 max int
65 capacity int
66 timeout time.Duration
67 //ageOut time.Duration
68 endpoints []string
69 size int
70 logLevel log.LogLevel
71 closing bool
72 closingCh chan struct{}
73 stopCh chan struct{}
74}
75
76// Get returns an Etcd client. If not is available, it will create one
77// until the maximum allowed capacity. If maximum capacity has been
78// reached then it will wait until s used one is freed.
79func (r *roundRobin) Get(ctx context.Context) (*clientv3.Client, error) {
80 r.Lock()
81
82 if r.closing {
83 r.Unlock()
84 return nil, errors.New("pool-is-closing")
85 }
86
87 // first determine if we need to block, which would mean the
88 // available queue is empty and we are at capacity
89 if len(r.available) == 0 && r.size >= r.capacity {
90
91 // create a channel on which to wait and
92 // add it to the list
93 ch := make(chan struct{})
94 element := r.waitList.PushBack(ch)
95 r.Unlock()
96
97 // block until it is our turn or context
98 // expires or is canceled
99 select {
100 case <-r.stopCh:
101 logger.Info(ctx, "stop-waiting-pool-is-closing")
102 r.waitList.Remove(element)
103 return nil, errors.New("stop-waiting-pool-is-closing")
104 case <-ch:
105 r.waitList.Remove(element)
106 case <-ctx.Done():
107 r.waitList.Remove(element)
108 return nil, ctx.Err()
109 }
110 r.Lock()
111 }
112
113 defer r.Unlock()
114 if len(r.available) > 0 {
115 // pull off back end as it is operationally quicker
116 last := len(r.available) - 1
117 entry := r.available[last]
118 entry.count++
119 if entry.count >= r.max {
120 r.available = r.available[:last]
121 r.full[entry.client] = entry
122 }
123 entry.age = time.Now()
124 return entry.client, nil
125 }
126
127 logConfig := log.ConstructZapConfig(log.JSON, r.logLevel, log.Fields{})
128 // increase capacity
129 client, err := clientv3.New(clientv3.Config{
130 Endpoints: r.endpoints,
131 DialTimeout: r.timeout,
132 LogConfig: &logConfig,
133 })
134 if err != nil {
135 return nil, err
136 }
137 entry := &rrEntry{
138 client: client,
139 count: 1,
140 }
141 r.all[entry.client] = entry
142
143 if r.max > 1 {
144 r.available = append(r.available, entry)
145 } else {
146 r.full[entry.client] = entry
147 }
148 r.size++
149 return client, nil
150}
151
152// Put returns the Etcd Client back to the pool
153func (r *roundRobin) Put(client *clientv3.Client) {
154 r.Lock()
155
156 entry := r.all[client]
157 entry.count--
158
159 if r.closing {
160 // Close client if count is 0
161 if entry.count == 0 {
162 if err := entry.client.Close(); err != nil {
163 logger.Warnw(context.Background(), "error-closing-client", log.Fields{"error": err})
164 }
165 delete(r.all, entry.client)
166 }
167 // Notify Close function that a client was returned to the pool
168 r.closingCh <- struct{}{}
169 r.Unlock()
170 return
171 }
172
173 // This entry is now available for use, so
174 // if in full map add it to available and
175 // remove from full
176 if _, ok := r.full[client]; ok {
177 r.available = append(r.available, entry)
178 delete(r.full, client)
179 }
180
181 front := r.waitList.Front()
182 if front != nil {
183 ch := r.waitList.Remove(front)
184 r.Unlock()
185 // need to unblock if someone is waiting
186 ch.(chan struct{}) <- struct{}{}
187 return
188 }
189 r.Unlock()
190}
191
192func (r *roundRobin) Close(ctx context.Context) {
193 r.Lock()
194 r.closing = true
195
196 // Notify anyone waiting for a client to stop waiting
197 close(r.stopCh)
198
199 // Clean-up unused clients
200 for i := 0; i < len(r.available); i++ {
201 // Count 0 means no one is using that client
202 if r.available[i].count == 0 {
203 if err := r.available[i].client.Close(); err != nil {
204 logger.Warnw(ctx, "failure-closing-client", log.Fields{"client": r.available[i].client, "error": err})
205 }
206 // Remove client for all list
207 delete(r.all, r.available[i].client)
208 }
209 }
210
211 // Figure out how many clients are in use
212 numberInUse := 0
213 for _, rrEntry := range r.all {
214 numberInUse += rrEntry.count
215 }
216 r.Unlock()
217
218 if numberInUse == 0 {
219 logger.Info(ctx, "no-connection-in-use")
220 return
221 }
222
223 logger.Infow(ctx, "waiting-for-clients-return", log.Fields{"count": numberInUse})
224
225 // Wait for notifications when a client is returned to the pool
226 for {
227 select {
228 case <-r.closingCh:
229 numberInUse--
230 if numberInUse == 0 {
231 logger.Info(ctx, "all-connections-closed")
232 return
233 }
234 case <-ctx.Done():
235 logger.Warnw(ctx, "context-done", log.Fields{"error": ctx.Err()})
236 return
237 }
238 }
239}