blob: 9ecddca6e1a5173e2f2a882b9e5599bdb68369ca [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidoocfee5f42018-07-19 22:47:38 -040016package kvstore
17
18import (
khenaidoocfee5f42018-07-19 22:47:38 -040019 "context"
20 "errors"
khenaidoocfee5f42018-07-19 22:47:38 -040021 "fmt"
Stephane Barbarie260a5632019-02-26 16:12:49 -050022 "github.com/opencord/voltha-go/common/log"
khenaidoob9203542018-09-17 22:56:37 -040023 v3Client "go.etcd.io/etcd/clientv3"
Stephane Barbarie260a5632019-02-26 16:12:49 -050024 v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
khenaidoob9203542018-09-17 22:56:37 -040025 v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
khenaidoo5c11af72018-07-20 17:21:05 -040026 "sync"
khenaidoocfee5f42018-07-19 22:47:38 -040027)
28
29// EtcdClient represents the Etcd KV store client
30type EtcdClient struct {
31 ectdAPI *v3Client.Client
32 leaderRev v3Client.Client
33 keyReservations map[string]*v3Client.LeaseID
34 watchedChannels map[string][]map[chan *Event]v3Client.Watcher
35 writeLock sync.Mutex
36}
37
38// NewEtcdClient returns a new client for the Etcd KV store
39func NewEtcdClient(addr string, timeout int) (*EtcdClient, error) {
khenaidoocfee5f42018-07-19 22:47:38 -040040 duration := GetDuration(timeout)
41
42 c, err := v3Client.New(v3Client.Config{
43 Endpoints: []string{addr},
44 DialTimeout: duration,
45 })
46 if err != nil {
47 log.Error(err)
48 return nil, err
49 }
50 wc := make(map[string][]map[chan *Event]v3Client.Watcher)
51 reservations := make(map[string]*v3Client.LeaseID)
Stephane Barbarie260a5632019-02-26 16:12:49 -050052
khenaidoocfee5f42018-07-19 22:47:38 -040053 return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations: reservations}, nil
54}
55
56// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
57// wait for a response
Stephane Barbarie260a5632019-02-26 16:12:49 -050058func (c *EtcdClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
khenaidoocfee5f42018-07-19 22:47:38 -040059 duration := GetDuration(timeout)
60
61 ctx, cancel := context.WithTimeout(context.Background(), duration)
Stephane Barbarie260a5632019-02-26 16:12:49 -050062
63 // DO NOT lock by default; otherwise lock per instructed value
64 if len(lock) > 0 && lock[0] {
65 session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
66 mu := v3Concurrency.NewMutex(session, "/lock" + key)
67 mu.Lock(context.Background())
68 defer mu.Unlock(context.Background())
69 defer session.Close()
70 }
71
khenaidoocfee5f42018-07-19 22:47:38 -040072 resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
73 cancel()
74 if err != nil {
75 log.Error(err)
76 return nil, err
77 }
78 m := make(map[string]*KVPair)
79 for _, ev := range resp.Kvs {
80 m[string(ev.Key)] = NewKVPair(string(ev.Key), ev.Value, "", ev.Lease)
81 }
82 return m, nil
83}
84
85// Get returns a key-value pair for a given key. Timeout defines how long the function will
86// wait for a response
Stephane Barbarie260a5632019-02-26 16:12:49 -050087func (c *EtcdClient) Get(key string, timeout int, lock ...bool) (*KVPair, error) {
khenaidoocfee5f42018-07-19 22:47:38 -040088 duration := GetDuration(timeout)
89
90 ctx, cancel := context.WithTimeout(context.Background(), duration)
Stephane Barbarie260a5632019-02-26 16:12:49 -050091
92 // Lock by default; otherwise lock per instructed value
93 if len(lock) == 0 || lock[0] {
94 session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
95 mu := v3Concurrency.NewMutex(session, "/lock" + key)
96 mu.Lock(context.Background())
97 defer mu.Unlock(context.Background())
98 defer session.Close()
99 }
100
khenaidoocfee5f42018-07-19 22:47:38 -0400101 resp, err := c.ectdAPI.Get(ctx, key)
102 cancel()
103 if err != nil {
104 log.Error(err)
105 return nil, err
106 }
107 for _, ev := range resp.Kvs {
108 // Only one value is returned
109 return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease), nil
110 }
111 return nil, nil
112}
113
114// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
115// accepts only a string as a value for a put operation. Timeout defines how long the function will
116// wait for a response
Stephane Barbarie260a5632019-02-26 16:12:49 -0500117func (c *EtcdClient) Put(key string, value interface{}, timeout int, lock ...bool) error {
khenaidoocfee5f42018-07-19 22:47:38 -0400118
119 // Validate that we can convert value to a string as etcd API expects a string
120 var val string
121 var er error
122 if val, er = ToString(value); er != nil {
123 return fmt.Errorf("unexpected-type-%T", value)
124 }
125
126 duration := GetDuration(timeout)
127
128 ctx, cancel := context.WithTimeout(context.Background(), duration)
Stephane Barbarie260a5632019-02-26 16:12:49 -0500129
130 // Lock by default; otherwise lock per instructed value
131 if len(lock) == 0 || lock[0] {
132 session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
133 mu := v3Concurrency.NewMutex(session, "/lock" + key)
134 mu.Lock(context.Background())
135 defer mu.Unlock(context.Background())
136 defer session.Close()
137 }
138
khenaidoocfee5f42018-07-19 22:47:38 -0400139 c.writeLock.Lock()
140 defer c.writeLock.Unlock()
141 _, err := c.ectdAPI.Put(ctx, key, val)
142 cancel()
143 if err != nil {
144 switch err {
145 case context.Canceled:
khenaidoo5c11af72018-07-20 17:21:05 -0400146 log.Warnw("context-cancelled", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400147 case context.DeadlineExceeded:
khenaidoo5c11af72018-07-20 17:21:05 -0400148 log.Warnw("context-deadline-exceeded", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400149 case v3rpcTypes.ErrEmptyKey:
khenaidoo5c11af72018-07-20 17:21:05 -0400150 log.Warnw("etcd-client-error", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400151 default:
khenaidoo5c11af72018-07-20 17:21:05 -0400152 log.Warnw("bad-endpoints", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400153 }
154 return err
155 }
156 return nil
157}
158
159// Delete removes a key from the KV store. Timeout defines how long the function will
160// wait for a response
Stephane Barbarie260a5632019-02-26 16:12:49 -0500161func (c *EtcdClient) Delete(key string, timeout int, lock ...bool) error {
khenaidoocfee5f42018-07-19 22:47:38 -0400162
163 duration := GetDuration(timeout)
164
165 ctx, cancel := context.WithTimeout(context.Background(), duration)
Stephane Barbarie260a5632019-02-26 16:12:49 -0500166
167 // Lock by default; otherwise lock per instructed value
168 if len(lock) == 0 || lock[0] {
169 session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
170 mu := v3Concurrency.NewMutex(session, "/lock" + key)
171 mu.Lock(context.Background())
172 defer mu.Unlock(context.Background())
173 defer session.Close()
174 }
175
khenaidoocfee5f42018-07-19 22:47:38 -0400176 defer cancel()
177
178 c.writeLock.Lock()
179 defer c.writeLock.Unlock()
180
181 // count keys about to be deleted
182 gresp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
183 if err != nil {
184 log.Error(err)
185 return err
186 }
187
188 // delete the keys
189 dresp, err := c.ectdAPI.Delete(ctx, key, v3Client.WithPrefix())
190 if err != nil {
191 log.Error(err)
192 return err
193 }
194
195 if dresp == nil || gresp == nil {
196 log.Debug("nothing-to-delete")
197 return nil
198 }
199
khenaidoo5c11af72018-07-20 17:21:05 -0400200 log.Debugw("delete-keys", log.Fields{"all-keys-deleted": int64(len(gresp.Kvs)) == dresp.Deleted})
khenaidoocfee5f42018-07-19 22:47:38 -0400201 if int64(len(gresp.Kvs)) == dresp.Deleted {
202 log.Debug("All-keys-deleted")
203 } else {
204 log.Error("not-all-keys-deleted")
205 err := errors.New("not-all-keys-deleted")
206 return err
207 }
208 return nil
209}
210
211// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
212// the etcd API accepts only a string. Timeout defines how long the function will wait for a response. TTL
213// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
214// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
215// then the value assigned to that key will be returned.
216func (c *EtcdClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
217 // Validate that we can convert value to a string as etcd API expects a string
218 var val string
219 var er error
220 if val, er = ToString(value); er != nil {
221 return nil, fmt.Errorf("unexpected-type%T", value)
222 }
223
224 // Create a lease
225 resp, err := c.ectdAPI.Grant(context.Background(), ttl)
226 if err != nil {
227 log.Error(err)
228 return nil, err
229 }
230 // Register the lease id
231 c.writeLock.Lock()
232 c.keyReservations[key] = &resp.ID
233 c.writeLock.Unlock()
234
235 // Revoke lease if reservation is not successful
236 reservationSuccessful := false
237 defer func() {
238 if !reservationSuccessful {
239 if err = c.ReleaseReservation(key); err != nil {
240 log.Errorf("cannot-release-lease")
241 }
242 }
243 }()
244
245 // Try to grap the Key with the above lease
246 c.ectdAPI.Txn(context.Background())
247 txn := c.ectdAPI.Txn(context.Background())
248 txn = txn.If(v3Client.Compare(v3Client.Version(key), "=", 0))
249 txn = txn.Then(v3Client.OpPut(key, val, v3Client.WithLease(resp.ID)))
250 txn = txn.Else(v3Client.OpGet(key))
251 result, er := txn.Commit()
252 if er != nil {
253 return nil, er
254 }
255
256 if !result.Succeeded {
257 // Verify whether we are already the owner of that Key
258 if len(result.Responses) > 0 &&
259 len(result.Responses[0].GetResponseRange().Kvs) > 0 {
260 kv := result.Responses[0].GetResponseRange().Kvs[0]
261 if string(kv.Value) == val {
262 reservationSuccessful = true
263 return value, nil
264 }
265 return kv.Value, nil
266 }
267 } else {
268 // Read the Key to ensure this is our Key
Stephane Barbarie260a5632019-02-26 16:12:49 -0500269 m, err := c.Get(key, defaultKVGetTimeout, false)
khenaidoocfee5f42018-07-19 22:47:38 -0400270 if err != nil {
271 return nil, err
272 }
273 if m != nil {
274 if m.Key == key && isEqual(m.Value, value) {
275 // My reservation is successful - register it. For now, support is only for 1 reservation per key
276 // per session.
277 reservationSuccessful = true
278 return value, nil
279 }
280 // My reservation has failed. Return the owner of that key
281 return m.Value, nil
282 }
283 }
284 return nil, nil
285}
286
287// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
288func (c *EtcdClient) ReleaseAllReservations() error {
289 c.writeLock.Lock()
290 defer c.writeLock.Unlock()
291 for key, leaseID := range c.keyReservations {
292 _, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
293 if err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400294 log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400295 return err
296 }
297 delete(c.keyReservations, key)
298 }
299 return nil
300}
301
302// ReleaseReservation releases reservation for a specific key.
303func (c *EtcdClient) ReleaseReservation(key string) error {
304 // Get the leaseid using the key
305 var ok bool
306 var leaseID *v3Client.LeaseID
307 c.writeLock.Lock()
308 defer c.writeLock.Unlock()
309 if leaseID, ok = c.keyReservations[key]; !ok {
310 return errors.New("key-not-reserved")
311 }
312 if leaseID != nil {
313 _, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
314 if err != nil {
315 log.Error(err)
316 return err
317 }
318 delete(c.keyReservations, key)
319 }
320 return nil
321}
322
323// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
324// period specified when reserving the key
325func (c *EtcdClient) RenewReservation(key string) error {
326 // Get the leaseid using the key
327 var ok bool
328 var leaseID *v3Client.LeaseID
329 c.writeLock.Lock()
330 defer c.writeLock.Unlock()
331 if leaseID, ok = c.keyReservations[key]; !ok {
332 return errors.New("key-not-reserved")
333 }
334
335 if leaseID != nil {
336 _, err := c.ectdAPI.KeepAliveOnce(context.Background(), *leaseID)
337 if err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400338 log.Errorw("lease-may-have-expired", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400339 return err
340 }
341 } else {
342 return errors.New("lease-expired")
343 }
344 return nil
345}
346
347// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
348// listen to receive Events.
349func (c *EtcdClient) Watch(key string) chan *Event {
350 w := v3Client.NewWatcher(c.ectdAPI)
351 channel := w.Watch(context.Background(), key, v3Client.WithPrefix())
352
353 // Create a new channel
354 ch := make(chan *Event, maxClientChannelBufferSize)
355
356 // Keep track of the created channels so they can be closed when required
357 channelMap := make(map[chan *Event]v3Client.Watcher)
358 channelMap[ch] = w
359 //c.writeLock.Lock()
360 //defer c.writeLock.Unlock()
361 c.watchedChannels[key] = append(c.watchedChannels[key], channelMap)
362
khenaidoo5c11af72018-07-20 17:21:05 -0400363 log.Debugw("watched-channels", log.Fields{"channels": c.watchedChannels[key]})
khenaidoocfee5f42018-07-19 22:47:38 -0400364 // Launch a go routine to listen for updates
365 go c.listenForKeyChange(channel, ch)
366
367 return ch
368
369}
370
371// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
372// may be multiple listeners on the same key. The previously created channel serves as a key
373func (c *EtcdClient) CloseWatch(key string, ch chan *Event) {
374 // Get the array of channels mapping
375 var watchedChannels []map[chan *Event]v3Client.Watcher
376 var ok bool
377 c.writeLock.Lock()
378 defer c.writeLock.Unlock()
379
380 if watchedChannels, ok = c.watchedChannels[key]; !ok {
khenaidoo5c11af72018-07-20 17:21:05 -0400381 log.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
khenaidoocfee5f42018-07-19 22:47:38 -0400382 return
383 }
384 // Look for the channels
385 var pos = -1
386 for i, chMap := range watchedChannels {
387 if t, ok := chMap[ch]; ok {
388 log.Debug("channel-found")
389 // Close the etcd watcher before the client channel. This should close the etcd channel as well
390 if err := t.Close(); err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400391 log.Errorw("watcher-cannot-be-closed", log.Fields{"key": key, "error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400392 }
393 close(ch)
394 pos = i
395 break
396 }
397 }
398 // Remove that entry if present
399 if pos >= 0 {
400 c.watchedChannels[key] = append(c.watchedChannels[key][:pos], c.watchedChannels[key][pos+1:]...)
401 }
khenaidoo5c11af72018-07-20 17:21:05 -0400402 log.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannels[key]})
khenaidoocfee5f42018-07-19 22:47:38 -0400403}
404
405func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
khenaidoo5c11af72018-07-20 17:21:05 -0400406 log.Infow("start-listening-on-channel", log.Fields{"channel": ch})
khenaidoocfee5f42018-07-19 22:47:38 -0400407 for resp := range channel {
408 for _, ev := range resp.Events {
409 //log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
410 ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value)
411 }
412 }
413 log.Info("stop-listening-on-channel")
414}
415
416func getEventType(event *v3Client.Event) int {
417 switch event.Type {
418 case v3Client.EventTypePut:
419 return PUT
420 case v3Client.EventTypeDelete:
421 return DELETE
422 }
423 return UNKNOWN
424}
425
426// Close closes the KV store client
427func (c *EtcdClient) Close() {
428 c.writeLock.Lock()
429 defer c.writeLock.Unlock()
430 if err := c.ectdAPI.Close(); err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400431 log.Errorw("error-closing-client", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400432 }
433}