blob: a6a14332fe730464b947dadd7b2f5b8f8f272c7c [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
khenaidoocfee5f42018-07-19 22:47:38 -040016package kvstore
17
18import (
19 //log "../common"
20 "context"
21 "errors"
khenaidoocfee5f42018-07-19 22:47:38 -040022 "fmt"
23 v3Client "github.com/coreos/etcd/clientv3"
24 v3rpcTypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
khenaidoo5c11af72018-07-20 17:21:05 -040025 log "github.com/opencord/voltha-go/common/log"
26 "sync"
khenaidoocfee5f42018-07-19 22:47:38 -040027)
28
29// EtcdClient represents the Etcd KV store client
30type EtcdClient struct {
31 ectdAPI *v3Client.Client
32 leaderRev v3Client.Client
33 keyReservations map[string]*v3Client.LeaseID
34 watchedChannels map[string][]map[chan *Event]v3Client.Watcher
35 writeLock sync.Mutex
36}
37
38// NewEtcdClient returns a new client for the Etcd KV store
39func NewEtcdClient(addr string, timeout int) (*EtcdClient, error) {
40
41 duration := GetDuration(timeout)
42
43 c, err := v3Client.New(v3Client.Config{
44 Endpoints: []string{addr},
45 DialTimeout: duration,
46 })
47 if err != nil {
48 log.Error(err)
49 return nil, err
50 }
51 wc := make(map[string][]map[chan *Event]v3Client.Watcher)
52 reservations := make(map[string]*v3Client.LeaseID)
53 return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations: reservations}, nil
54}
55
56// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
57// wait for a response
58func (c *EtcdClient) List(key string, timeout int) (map[string]*KVPair, error) {
59 duration := GetDuration(timeout)
60
61 ctx, cancel := context.WithTimeout(context.Background(), duration)
62 resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
63 cancel()
64 if err != nil {
65 log.Error(err)
66 return nil, err
67 }
68 m := make(map[string]*KVPair)
69 for _, ev := range resp.Kvs {
70 m[string(ev.Key)] = NewKVPair(string(ev.Key), ev.Value, "", ev.Lease)
71 }
72 return m, nil
73}
74
75// Get returns a key-value pair for a given key. Timeout defines how long the function will
76// wait for a response
77func (c *EtcdClient) Get(key string, timeout int) (*KVPair, error) {
78 duration := GetDuration(timeout)
79
80 ctx, cancel := context.WithTimeout(context.Background(), duration)
81 resp, err := c.ectdAPI.Get(ctx, key)
82 cancel()
83 if err != nil {
84 log.Error(err)
85 return nil, err
86 }
87 for _, ev := range resp.Kvs {
88 // Only one value is returned
89 return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease), nil
90 }
91 return nil, nil
92}
93
94// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
95// accepts only a string as a value for a put operation. Timeout defines how long the function will
96// wait for a response
97func (c *EtcdClient) Put(key string, value interface{}, timeout int) error {
98
99 // Validate that we can convert value to a string as etcd API expects a string
100 var val string
101 var er error
102 if val, er = ToString(value); er != nil {
103 return fmt.Errorf("unexpected-type-%T", value)
104 }
105
106 duration := GetDuration(timeout)
107
108 ctx, cancel := context.WithTimeout(context.Background(), duration)
109 c.writeLock.Lock()
110 defer c.writeLock.Unlock()
111 _, err := c.ectdAPI.Put(ctx, key, val)
112 cancel()
113 if err != nil {
114 switch err {
115 case context.Canceled:
khenaidoo5c11af72018-07-20 17:21:05 -0400116 log.Warnw("context-cancelled", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400117 case context.DeadlineExceeded:
khenaidoo5c11af72018-07-20 17:21:05 -0400118 log.Warnw("context-deadline-exceeded", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400119 case v3rpcTypes.ErrEmptyKey:
khenaidoo5c11af72018-07-20 17:21:05 -0400120 log.Warnw("etcd-client-error", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400121 default:
khenaidoo5c11af72018-07-20 17:21:05 -0400122 log.Warnw("bad-endpoints", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400123 }
124 return err
125 }
126 return nil
127}
128
129// Delete removes a key from the KV store. Timeout defines how long the function will
130// wait for a response
131func (c *EtcdClient) Delete(key string, timeout int) error {
132
133 duration := GetDuration(timeout)
134
135 ctx, cancel := context.WithTimeout(context.Background(), duration)
136 defer cancel()
137
138 c.writeLock.Lock()
139 defer c.writeLock.Unlock()
140
141 // count keys about to be deleted
142 gresp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
143 if err != nil {
144 log.Error(err)
145 return err
146 }
147
148 // delete the keys
149 dresp, err := c.ectdAPI.Delete(ctx, key, v3Client.WithPrefix())
150 if err != nil {
151 log.Error(err)
152 return err
153 }
154
155 if dresp == nil || gresp == nil {
156 log.Debug("nothing-to-delete")
157 return nil
158 }
159
khenaidoo5c11af72018-07-20 17:21:05 -0400160 log.Debugw("delete-keys", log.Fields{"all-keys-deleted": int64(len(gresp.Kvs)) == dresp.Deleted})
khenaidoocfee5f42018-07-19 22:47:38 -0400161 if int64(len(gresp.Kvs)) == dresp.Deleted {
162 log.Debug("All-keys-deleted")
163 } else {
164 log.Error("not-all-keys-deleted")
165 err := errors.New("not-all-keys-deleted")
166 return err
167 }
168 return nil
169}
170
171// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
172// the etcd API accepts only a string. Timeout defines how long the function will wait for a response. TTL
173// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
174// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
175// then the value assigned to that key will be returned.
176func (c *EtcdClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
177 // Validate that we can convert value to a string as etcd API expects a string
178 var val string
179 var er error
180 if val, er = ToString(value); er != nil {
181 return nil, fmt.Errorf("unexpected-type%T", value)
182 }
183
184 // Create a lease
185 resp, err := c.ectdAPI.Grant(context.Background(), ttl)
186 if err != nil {
187 log.Error(err)
188 return nil, err
189 }
190 // Register the lease id
191 c.writeLock.Lock()
192 c.keyReservations[key] = &resp.ID
193 c.writeLock.Unlock()
194
195 // Revoke lease if reservation is not successful
196 reservationSuccessful := false
197 defer func() {
198 if !reservationSuccessful {
199 if err = c.ReleaseReservation(key); err != nil {
200 log.Errorf("cannot-release-lease")
201 }
202 }
203 }()
204
205 // Try to grap the Key with the above lease
206 c.ectdAPI.Txn(context.Background())
207 txn := c.ectdAPI.Txn(context.Background())
208 txn = txn.If(v3Client.Compare(v3Client.Version(key), "=", 0))
209 txn = txn.Then(v3Client.OpPut(key, val, v3Client.WithLease(resp.ID)))
210 txn = txn.Else(v3Client.OpGet(key))
211 result, er := txn.Commit()
212 if er != nil {
213 return nil, er
214 }
215
216 if !result.Succeeded {
217 // Verify whether we are already the owner of that Key
218 if len(result.Responses) > 0 &&
219 len(result.Responses[0].GetResponseRange().Kvs) > 0 {
220 kv := result.Responses[0].GetResponseRange().Kvs[0]
221 if string(kv.Value) == val {
222 reservationSuccessful = true
223 return value, nil
224 }
225 return kv.Value, nil
226 }
227 } else {
228 // Read the Key to ensure this is our Key
229 m, err := c.Get(key, defaultKVGetTimeout)
230 if err != nil {
231 return nil, err
232 }
233 if m != nil {
234 if m.Key == key && isEqual(m.Value, value) {
235 // My reservation is successful - register it. For now, support is only for 1 reservation per key
236 // per session.
237 reservationSuccessful = true
238 return value, nil
239 }
240 // My reservation has failed. Return the owner of that key
241 return m.Value, nil
242 }
243 }
244 return nil, nil
245}
246
247// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
248func (c *EtcdClient) ReleaseAllReservations() error {
249 c.writeLock.Lock()
250 defer c.writeLock.Unlock()
251 for key, leaseID := range c.keyReservations {
252 _, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
253 if err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400254 log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400255 return err
256 }
257 delete(c.keyReservations, key)
258 }
259 return nil
260}
261
262// ReleaseReservation releases reservation for a specific key.
263func (c *EtcdClient) ReleaseReservation(key string) error {
264 // Get the leaseid using the key
265 var ok bool
266 var leaseID *v3Client.LeaseID
267 c.writeLock.Lock()
268 defer c.writeLock.Unlock()
269 if leaseID, ok = c.keyReservations[key]; !ok {
270 return errors.New("key-not-reserved")
271 }
272 if leaseID != nil {
273 _, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
274 if err != nil {
275 log.Error(err)
276 return err
277 }
278 delete(c.keyReservations, key)
279 }
280 return nil
281}
282
283// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
284// period specified when reserving the key
285func (c *EtcdClient) RenewReservation(key string) error {
286 // Get the leaseid using the key
287 var ok bool
288 var leaseID *v3Client.LeaseID
289 c.writeLock.Lock()
290 defer c.writeLock.Unlock()
291 if leaseID, ok = c.keyReservations[key]; !ok {
292 return errors.New("key-not-reserved")
293 }
294
295 if leaseID != nil {
296 _, err := c.ectdAPI.KeepAliveOnce(context.Background(), *leaseID)
297 if err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400298 log.Errorw("lease-may-have-expired", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400299 return err
300 }
301 } else {
302 return errors.New("lease-expired")
303 }
304 return nil
305}
306
307// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
308// listen to receive Events.
309func (c *EtcdClient) Watch(key string) chan *Event {
310 w := v3Client.NewWatcher(c.ectdAPI)
311 channel := w.Watch(context.Background(), key, v3Client.WithPrefix())
312
313 // Create a new channel
314 ch := make(chan *Event, maxClientChannelBufferSize)
315
316 // Keep track of the created channels so they can be closed when required
317 channelMap := make(map[chan *Event]v3Client.Watcher)
318 channelMap[ch] = w
319 //c.writeLock.Lock()
320 //defer c.writeLock.Unlock()
321 c.watchedChannels[key] = append(c.watchedChannels[key], channelMap)
322
khenaidoo5c11af72018-07-20 17:21:05 -0400323 log.Debugw("watched-channels", log.Fields{"channels": c.watchedChannels[key]})
khenaidoocfee5f42018-07-19 22:47:38 -0400324 // Launch a go routine to listen for updates
325 go c.listenForKeyChange(channel, ch)
326
327 return ch
328
329}
330
331// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
332// may be multiple listeners on the same key. The previously created channel serves as a key
333func (c *EtcdClient) CloseWatch(key string, ch chan *Event) {
334 // Get the array of channels mapping
335 var watchedChannels []map[chan *Event]v3Client.Watcher
336 var ok bool
337 c.writeLock.Lock()
338 defer c.writeLock.Unlock()
339
340 if watchedChannels, ok = c.watchedChannels[key]; !ok {
khenaidoo5c11af72018-07-20 17:21:05 -0400341 log.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
khenaidoocfee5f42018-07-19 22:47:38 -0400342 return
343 }
344 // Look for the channels
345 var pos = -1
346 for i, chMap := range watchedChannels {
347 if t, ok := chMap[ch]; ok {
348 log.Debug("channel-found")
349 // Close the etcd watcher before the client channel. This should close the etcd channel as well
350 if err := t.Close(); err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400351 log.Errorw("watcher-cannot-be-closed", log.Fields{"key": key, "error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400352 }
353 close(ch)
354 pos = i
355 break
356 }
357 }
358 // Remove that entry if present
359 if pos >= 0 {
360 c.watchedChannels[key] = append(c.watchedChannels[key][:pos], c.watchedChannels[key][pos+1:]...)
361 }
khenaidoo5c11af72018-07-20 17:21:05 -0400362 log.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannels[key]})
khenaidoocfee5f42018-07-19 22:47:38 -0400363}
364
365func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
khenaidoo5c11af72018-07-20 17:21:05 -0400366 log.Infow("start-listening-on-channel", log.Fields{"channel": ch})
khenaidoocfee5f42018-07-19 22:47:38 -0400367 for resp := range channel {
368 for _, ev := range resp.Events {
369 //log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
370 ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value)
371 }
372 }
373 log.Info("stop-listening-on-channel")
374}
375
376func getEventType(event *v3Client.Event) int {
377 switch event.Type {
378 case v3Client.EventTypePut:
379 return PUT
380 case v3Client.EventTypeDelete:
381 return DELETE
382 }
383 return UNKNOWN
384}
385
386// Close closes the KV store client
387func (c *EtcdClient) Close() {
388 c.writeLock.Lock()
389 defer c.writeLock.Unlock()
390 if err := c.ectdAPI.Close(); err != nil {
khenaidoo5c11af72018-07-20 17:21:05 -0400391 log.Errorw("error-closing-client", log.Fields{"error": err})
khenaidoocfee5f42018-07-19 22:47:38 -0400392 }
393}