blob: 2755cd18ebdd0b2ba81698c4f938d286a0ec9ffb [file] [log] [blame]
khenaidoocfee5f42018-07-19 22:47:38 -04001package kvstore
2
3import (
4 //log "../common"
5 "context"
6 "errors"
7 "sync"
8 log "github.com/opencord/voltha-go/common/log"
9 "fmt"
10 v3Client "github.com/coreos/etcd/clientv3"
11 v3rpcTypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
12)
13
14// EtcdClient represents the Etcd KV store client
15type EtcdClient struct {
16 ectdAPI *v3Client.Client
17 leaderRev v3Client.Client
18 keyReservations map[string]*v3Client.LeaseID
19 watchedChannels map[string][]map[chan *Event]v3Client.Watcher
20 writeLock sync.Mutex
21}
22
23// NewEtcdClient returns a new client for the Etcd KV store
24func NewEtcdClient(addr string, timeout int) (*EtcdClient, error) {
25
26 duration := GetDuration(timeout)
27
28 c, err := v3Client.New(v3Client.Config{
29 Endpoints: []string{addr},
30 DialTimeout: duration,
31 })
32 if err != nil {
33 log.Error(err)
34 return nil, err
35 }
36 wc := make(map[string][]map[chan *Event]v3Client.Watcher)
37 reservations := make(map[string]*v3Client.LeaseID)
38 return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations: reservations}, nil
39}
40
41// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
42// wait for a response
43func (c *EtcdClient) List(key string, timeout int) (map[string]*KVPair, error) {
44 duration := GetDuration(timeout)
45
46 ctx, cancel := context.WithTimeout(context.Background(), duration)
47 resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
48 cancel()
49 if err != nil {
50 log.Error(err)
51 return nil, err
52 }
53 m := make(map[string]*KVPair)
54 for _, ev := range resp.Kvs {
55 m[string(ev.Key)] = NewKVPair(string(ev.Key), ev.Value, "", ev.Lease)
56 }
57 return m, nil
58}
59
60// Get returns a key-value pair for a given key. Timeout defines how long the function will
61// wait for a response
62func (c *EtcdClient) Get(key string, timeout int) (*KVPair, error) {
63 duration := GetDuration(timeout)
64
65 ctx, cancel := context.WithTimeout(context.Background(), duration)
66 resp, err := c.ectdAPI.Get(ctx, key)
67 cancel()
68 if err != nil {
69 log.Error(err)
70 return nil, err
71 }
72 for _, ev := range resp.Kvs {
73 // Only one value is returned
74 return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease), nil
75 }
76 return nil, nil
77}
78
79// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
80// accepts only a string as a value for a put operation. Timeout defines how long the function will
81// wait for a response
82func (c *EtcdClient) Put(key string, value interface{}, timeout int) error {
83
84 // Validate that we can convert value to a string as etcd API expects a string
85 var val string
86 var er error
87 if val, er = ToString(value); er != nil {
88 return fmt.Errorf("unexpected-type-%T", value)
89 }
90
91 duration := GetDuration(timeout)
92
93 ctx, cancel := context.WithTimeout(context.Background(), duration)
94 c.writeLock.Lock()
95 defer c.writeLock.Unlock()
96 _, err := c.ectdAPI.Put(ctx, key, val)
97 cancel()
98 if err != nil {
99 switch err {
100 case context.Canceled:
101 log.Warnw("context-cancelled", log.Fields{"error":err})
102 case context.DeadlineExceeded:
103 log.Warnw("context-deadline-exceeded", log.Fields{"error":err})
104 case v3rpcTypes.ErrEmptyKey:
105 log.Warnw("etcd-client-error", log.Fields{"error":err})
106 default:
107 log.Warnw("bad-endpoints", log.Fields{"error":err})
108 }
109 return err
110 }
111 return nil
112}
113
114// Delete removes a key from the KV store. Timeout defines how long the function will
115// wait for a response
116func (c *EtcdClient) Delete(key string, timeout int) error {
117
118 duration := GetDuration(timeout)
119
120 ctx, cancel := context.WithTimeout(context.Background(), duration)
121 defer cancel()
122
123 c.writeLock.Lock()
124 defer c.writeLock.Unlock()
125
126 // count keys about to be deleted
127 gresp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
128 if err != nil {
129 log.Error(err)
130 return err
131 }
132
133 // delete the keys
134 dresp, err := c.ectdAPI.Delete(ctx, key, v3Client.WithPrefix())
135 if err != nil {
136 log.Error(err)
137 return err
138 }
139
140 if dresp == nil || gresp == nil {
141 log.Debug("nothing-to-delete")
142 return nil
143 }
144
145 log.Debugw("delete-keys", log.Fields{"all-keys-deleted":int64(len(gresp.Kvs)) == dresp.Deleted})
146 if int64(len(gresp.Kvs)) == dresp.Deleted {
147 log.Debug("All-keys-deleted")
148 } else {
149 log.Error("not-all-keys-deleted")
150 err := errors.New("not-all-keys-deleted")
151 return err
152 }
153 return nil
154}
155
156// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
157// the etcd API accepts only a string. Timeout defines how long the function will wait for a response. TTL
158// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
159// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
160// then the value assigned to that key will be returned.
161func (c *EtcdClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
162 // Validate that we can convert value to a string as etcd API expects a string
163 var val string
164 var er error
165 if val, er = ToString(value); er != nil {
166 return nil, fmt.Errorf("unexpected-type%T", value)
167 }
168
169 // Create a lease
170 resp, err := c.ectdAPI.Grant(context.Background(), ttl)
171 if err != nil {
172 log.Error(err)
173 return nil, err
174 }
175 // Register the lease id
176 c.writeLock.Lock()
177 c.keyReservations[key] = &resp.ID
178 c.writeLock.Unlock()
179
180 // Revoke lease if reservation is not successful
181 reservationSuccessful := false
182 defer func() {
183 if !reservationSuccessful {
184 if err = c.ReleaseReservation(key); err != nil {
185 log.Errorf("cannot-release-lease")
186 }
187 }
188 }()
189
190 // Try to grap the Key with the above lease
191 c.ectdAPI.Txn(context.Background())
192 txn := c.ectdAPI.Txn(context.Background())
193 txn = txn.If(v3Client.Compare(v3Client.Version(key), "=", 0))
194 txn = txn.Then(v3Client.OpPut(key, val, v3Client.WithLease(resp.ID)))
195 txn = txn.Else(v3Client.OpGet(key))
196 result, er := txn.Commit()
197 if er != nil {
198 return nil, er
199 }
200
201 if !result.Succeeded {
202 // Verify whether we are already the owner of that Key
203 if len(result.Responses) > 0 &&
204 len(result.Responses[0].GetResponseRange().Kvs) > 0 {
205 kv := result.Responses[0].GetResponseRange().Kvs[0]
206 if string(kv.Value) == val {
207 reservationSuccessful = true
208 return value, nil
209 }
210 return kv.Value, nil
211 }
212 } else {
213 // Read the Key to ensure this is our Key
214 m, err := c.Get(key, defaultKVGetTimeout)
215 if err != nil {
216 return nil, err
217 }
218 if m != nil {
219 if m.Key == key && isEqual(m.Value, value) {
220 // My reservation is successful - register it. For now, support is only for 1 reservation per key
221 // per session.
222 reservationSuccessful = true
223 return value, nil
224 }
225 // My reservation has failed. Return the owner of that key
226 return m.Value, nil
227 }
228 }
229 return nil, nil
230}
231
232// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
233func (c *EtcdClient) ReleaseAllReservations() error {
234 c.writeLock.Lock()
235 defer c.writeLock.Unlock()
236 for key, leaseID := range c.keyReservations {
237 _, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
238 if err != nil {
239 log.Errorw("cannot-release-reservation", log.Fields{"key":key, "error":err})
240 return err
241 }
242 delete(c.keyReservations, key)
243 }
244 return nil
245}
246
247// ReleaseReservation releases reservation for a specific key.
248func (c *EtcdClient) ReleaseReservation(key string) error {
249 // Get the leaseid using the key
250 var ok bool
251 var leaseID *v3Client.LeaseID
252 c.writeLock.Lock()
253 defer c.writeLock.Unlock()
254 if leaseID, ok = c.keyReservations[key]; !ok {
255 return errors.New("key-not-reserved")
256 }
257 if leaseID != nil {
258 _, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
259 if err != nil {
260 log.Error(err)
261 return err
262 }
263 delete(c.keyReservations, key)
264 }
265 return nil
266}
267
268// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
269// period specified when reserving the key
270func (c *EtcdClient) RenewReservation(key string) error {
271 // Get the leaseid using the key
272 var ok bool
273 var leaseID *v3Client.LeaseID
274 c.writeLock.Lock()
275 defer c.writeLock.Unlock()
276 if leaseID, ok = c.keyReservations[key]; !ok {
277 return errors.New("key-not-reserved")
278 }
279
280 if leaseID != nil {
281 _, err := c.ectdAPI.KeepAliveOnce(context.Background(), *leaseID)
282 if err != nil {
283 log.Errorw("lease-may-have-expired", log.Fields{"error":err})
284 return err
285 }
286 } else {
287 return errors.New("lease-expired")
288 }
289 return nil
290}
291
292// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
293// listen to receive Events.
294func (c *EtcdClient) Watch(key string) chan *Event {
295 w := v3Client.NewWatcher(c.ectdAPI)
296 channel := w.Watch(context.Background(), key, v3Client.WithPrefix())
297
298 // Create a new channel
299 ch := make(chan *Event, maxClientChannelBufferSize)
300
301 // Keep track of the created channels so they can be closed when required
302 channelMap := make(map[chan *Event]v3Client.Watcher)
303 channelMap[ch] = w
304 //c.writeLock.Lock()
305 //defer c.writeLock.Unlock()
306 c.watchedChannels[key] = append(c.watchedChannels[key], channelMap)
307
308 log.Debugw("watched-channels", log.Fields{"channels":c.watchedChannels[key]})
309 // Launch a go routine to listen for updates
310 go c.listenForKeyChange(channel, ch)
311
312 return ch
313
314}
315
316// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
317// may be multiple listeners on the same key. The previously created channel serves as a key
318func (c *EtcdClient) CloseWatch(key string, ch chan *Event) {
319 // Get the array of channels mapping
320 var watchedChannels []map[chan *Event]v3Client.Watcher
321 var ok bool
322 c.writeLock.Lock()
323 defer c.writeLock.Unlock()
324
325 if watchedChannels, ok = c.watchedChannels[key]; !ok {
326 log.Warnw("key-has-no-watched-channels", log.Fields{"key":key})
327 return
328 }
329 // Look for the channels
330 var pos = -1
331 for i, chMap := range watchedChannels {
332 if t, ok := chMap[ch]; ok {
333 log.Debug("channel-found")
334 // Close the etcd watcher before the client channel. This should close the etcd channel as well
335 if err := t.Close(); err != nil {
336 log.Errorw("watcher-cannot-be-closed", log.Fields{"key":key, "error":err})
337 }
338 close(ch)
339 pos = i
340 break
341 }
342 }
343 // Remove that entry if present
344 if pos >= 0 {
345 c.watchedChannels[key] = append(c.watchedChannels[key][:pos], c.watchedChannels[key][pos+1:]...)
346 }
347 log.Infow("watcher-channel-exiting", log.Fields{"key":key, "channel":c.watchedChannels[key]})
348}
349
350func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
351 log.Infow("start-listening-on-channel", log.Fields{"channel":ch})
352 for resp := range channel {
353 for _, ev := range resp.Events {
354 //log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
355 ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value)
356 }
357 }
358 log.Info("stop-listening-on-channel")
359}
360
361func getEventType(event *v3Client.Event) int {
362 switch event.Type {
363 case v3Client.EventTypePut:
364 return PUT
365 case v3Client.EventTypeDelete:
366 return DELETE
367 }
368 return UNKNOWN
369}
370
371// Close closes the KV store client
372func (c *EtcdClient) Close() {
373 c.writeLock.Lock()
374 defer c.writeLock.Unlock()
375 if err := c.ectdAPI.Close(); err != nil {
376 log.Errorw("error-closing-client", log.Fields{"error":err})
377 }
378}