blob: ec53830c7f0a1704d0093db663c0c94548e28119 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package client
16
17import (
18 "context"
19 "encoding/json"
20 "errors"
21 "fmt"
22 "go.etcd.io/etcd/pkg/pathutil"
23 "net/http"
24 "net/url"
25 "strconv"
26 "strings"
27 "time"
28)
29
30const (
31 ErrorCodeKeyNotFound = 100
32 ErrorCodeTestFailed = 101
33 ErrorCodeNotFile = 102
34 ErrorCodeNotDir = 104
35 ErrorCodeNodeExist = 105
36 ErrorCodeRootROnly = 107
37 ErrorCodeDirNotEmpty = 108
38 ErrorCodeUnauthorized = 110
39
40 ErrorCodePrevValueRequired = 201
41 ErrorCodeTTLNaN = 202
42 ErrorCodeIndexNaN = 203
43 ErrorCodeInvalidField = 209
44 ErrorCodeInvalidForm = 210
45
46 ErrorCodeRaftInternal = 300
47 ErrorCodeLeaderElect = 301
48
49 ErrorCodeWatcherCleared = 400
50 ErrorCodeEventIndexCleared = 401
51)
52
53type Error struct {
54 Code int `json:"errorCode"`
55 Message string `json:"message"`
56 Cause string `json:"cause"`
57 Index uint64 `json:"index"`
58}
59
60func (e Error) Error() string {
61 return fmt.Sprintf("%v: %v (%v) [%v]", e.Code, e.Message, e.Cause, e.Index)
62}
63
64var (
65 ErrInvalidJSON = errors.New("client: response is invalid json. The endpoint is probably not valid etcd cluster endpoint")
66 ErrEmptyBody = errors.New("client: response body is empty")
67)
68
69// PrevExistType is used to define an existence condition when setting
70// or deleting Nodes.
71type PrevExistType string
72
73const (
74 PrevIgnore = PrevExistType("")
75 PrevExist = PrevExistType("true")
76 PrevNoExist = PrevExistType("false")
77)
78
79var (
80 defaultV2KeysPrefix = "/v2/keys"
81)
82
83// NewKeysAPI builds a KeysAPI that interacts with etcd's key-value
84// API over HTTP.
85func NewKeysAPI(c Client) KeysAPI {
86 return NewKeysAPIWithPrefix(c, defaultV2KeysPrefix)
87}
88
89// NewKeysAPIWithPrefix acts like NewKeysAPI, but allows the caller
90// to provide a custom base URL path. This should only be used in
91// very rare cases.
92func NewKeysAPIWithPrefix(c Client, p string) KeysAPI {
93 return &httpKeysAPI{
94 client: c,
95 prefix: p,
96 }
97}
98
99type KeysAPI interface {
100 // Get retrieves a set of Nodes from etcd
101 Get(ctx context.Context, key string, opts *GetOptions) (*Response, error)
102
103 // Set assigns a new value to a Node identified by a given key. The caller
104 // may define a set of conditions in the SetOptions. If SetOptions.Dir=true
105 // then value is ignored.
106 Set(ctx context.Context, key, value string, opts *SetOptions) (*Response, error)
107
108 // Delete removes a Node identified by the given key, optionally destroying
109 // all of its children as well. The caller may define a set of required
110 // conditions in an DeleteOptions object.
111 Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error)
112
113 // Create is an alias for Set w/ PrevExist=false
114 Create(ctx context.Context, key, value string) (*Response, error)
115
116 // CreateInOrder is used to atomically create in-order keys within the given directory.
117 CreateInOrder(ctx context.Context, dir, value string, opts *CreateInOrderOptions) (*Response, error)
118
119 // Update is an alias for Set w/ PrevExist=true
120 Update(ctx context.Context, key, value string) (*Response, error)
121
122 // Watcher builds a new Watcher targeted at a specific Node identified
123 // by the given key. The Watcher may be configured at creation time
124 // through a WatcherOptions object. The returned Watcher is designed
125 // to emit events that happen to a Node, and optionally to its children.
126 Watcher(key string, opts *WatcherOptions) Watcher
127}
128
129type WatcherOptions struct {
130 // AfterIndex defines the index after-which the Watcher should
131 // start emitting events. For example, if a value of 5 is
132 // provided, the first event will have an index >= 6.
133 //
134 // Setting AfterIndex to 0 (default) means that the Watcher
135 // should start watching for events starting at the current
136 // index, whatever that may be.
137 AfterIndex uint64
138
139 // Recursive specifies whether or not the Watcher should emit
140 // events that occur in children of the given keyspace. If set
141 // to false (default), events will be limited to those that
142 // occur for the exact key.
143 Recursive bool
144}
145
146type CreateInOrderOptions struct {
147 // TTL defines a period of time after-which the Node should
148 // expire and no longer exist. Values <= 0 are ignored. Given
149 // that the zero-value is ignored, TTL cannot be used to set
150 // a TTL of 0.
151 TTL time.Duration
152}
153
154type SetOptions struct {
155 // PrevValue specifies what the current value of the Node must
156 // be in order for the Set operation to succeed.
157 //
158 // Leaving this field empty means that the caller wishes to
159 // ignore the current value of the Node. This cannot be used
160 // to compare the Node's current value to an empty string.
161 //
162 // PrevValue is ignored if Dir=true
163 PrevValue string
164
165 // PrevIndex indicates what the current ModifiedIndex of the
166 // Node must be in order for the Set operation to succeed.
167 //
168 // If PrevIndex is set to 0 (default), no comparison is made.
169 PrevIndex uint64
170
171 // PrevExist specifies whether the Node must currently exist
172 // (PrevExist) or not (PrevNoExist). If the caller does not
173 // care about existence, set PrevExist to PrevIgnore, or simply
174 // leave it unset.
175 PrevExist PrevExistType
176
177 // TTL defines a period of time after-which the Node should
178 // expire and no longer exist. Values <= 0 are ignored. Given
179 // that the zero-value is ignored, TTL cannot be used to set
180 // a TTL of 0.
181 TTL time.Duration
182
183 // Refresh set to true means a TTL value can be updated
184 // without firing a watch or changing the node value. A
185 // value must not be provided when refreshing a key.
186 Refresh bool
187
188 // Dir specifies whether or not this Node should be created as a directory.
189 Dir bool
190
191 // NoValueOnSuccess specifies whether the response contains the current value of the Node.
192 // If set, the response will only contain the current value when the request fails.
193 NoValueOnSuccess bool
194}
195
196type GetOptions struct {
197 // Recursive defines whether or not all children of the Node
198 // should be returned.
199 Recursive bool
200
201 // Sort instructs the server whether or not to sort the Nodes.
202 // If true, the Nodes are sorted alphabetically by key in
203 // ascending order (A to z). If false (default), the Nodes will
204 // not be sorted and the ordering used should not be considered
205 // predictable.
206 Sort bool
207
208 // Quorum specifies whether it gets the latest committed value that
209 // has been applied in quorum of members, which ensures external
210 // consistency (or linearizability).
211 Quorum bool
212}
213
214type DeleteOptions struct {
215 // PrevValue specifies what the current value of the Node must
216 // be in order for the Delete operation to succeed.
217 //
218 // Leaving this field empty means that the caller wishes to
219 // ignore the current value of the Node. This cannot be used
220 // to compare the Node's current value to an empty string.
221 PrevValue string
222
223 // PrevIndex indicates what the current ModifiedIndex of the
224 // Node must be in order for the Delete operation to succeed.
225 //
226 // If PrevIndex is set to 0 (default), no comparison is made.
227 PrevIndex uint64
228
229 // Recursive defines whether or not all children of the Node
230 // should be deleted. If set to true, all children of the Node
231 // identified by the given key will be deleted. If left unset
232 // or explicitly set to false, only a single Node will be
233 // deleted.
234 Recursive bool
235
236 // Dir specifies whether or not this Node should be removed as a directory.
237 Dir bool
238}
239
240type Watcher interface {
241 // Next blocks until an etcd event occurs, then returns a Response
242 // representing that event. The behavior of Next depends on the
243 // WatcherOptions used to construct the Watcher. Next is designed to
244 // be called repeatedly, each time blocking until a subsequent event
245 // is available.
246 //
247 // If the provided context is cancelled, Next will return a non-nil
248 // error. Any other failures encountered while waiting for the next
249 // event (connection issues, deserialization failures, etc) will
250 // also result in a non-nil error.
251 Next(context.Context) (*Response, error)
252}
253
254type Response struct {
255 // Action is the name of the operation that occurred. Possible values
256 // include get, set, delete, update, create, compareAndSwap,
257 // compareAndDelete and expire.
258 Action string `json:"action"`
259
260 // Node represents the state of the relevant etcd Node.
261 Node *Node `json:"node"`
262
263 // PrevNode represents the previous state of the Node. PrevNode is non-nil
264 // only if the Node existed before the action occurred and the action
265 // caused a change to the Node.
266 PrevNode *Node `json:"prevNode"`
267
268 // Index holds the cluster-level index at the time the Response was generated.
269 // This index is not tied to the Node(s) contained in this Response.
270 Index uint64 `json:"-"`
271
272 // ClusterID holds the cluster-level ID reported by the server. This
273 // should be different for different etcd clusters.
274 ClusterID string `json:"-"`
275}
276
277type Node struct {
278 // Key represents the unique location of this Node (e.g. "/foo/bar").
279 Key string `json:"key"`
280
281 // Dir reports whether node describes a directory.
282 Dir bool `json:"dir,omitempty"`
283
284 // Value is the current data stored on this Node. If this Node
285 // is a directory, Value will be empty.
286 Value string `json:"value"`
287
288 // Nodes holds the children of this Node, only if this Node is a directory.
289 // This slice of will be arbitrarily deep (children, grandchildren, great-
290 // grandchildren, etc.) if a recursive Get or Watch request were made.
291 Nodes Nodes `json:"nodes"`
292
293 // CreatedIndex is the etcd index at-which this Node was created.
294 CreatedIndex uint64 `json:"createdIndex"`
295
296 // ModifiedIndex is the etcd index at-which this Node was last modified.
297 ModifiedIndex uint64 `json:"modifiedIndex"`
298
299 // Expiration is the server side expiration time of the key.
300 Expiration *time.Time `json:"expiration,omitempty"`
301
302 // TTL is the time to live of the key in second.
303 TTL int64 `json:"ttl,omitempty"`
304}
305
306func (n *Node) String() string {
307 return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d, TTL: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex, n.TTL)
308}
309
310// TTLDuration returns the Node's TTL as a time.Duration object
311func (n *Node) TTLDuration() time.Duration {
312 return time.Duration(n.TTL) * time.Second
313}
314
315type Nodes []*Node
316
317// interfaces for sorting
318
319func (ns Nodes) Len() int { return len(ns) }
320func (ns Nodes) Less(i, j int) bool { return ns[i].Key < ns[j].Key }
321func (ns Nodes) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] }
322
323type httpKeysAPI struct {
324 client httpClient
325 prefix string
326}
327
328func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions) (*Response, error) {
329 act := &setAction{
330 Prefix: k.prefix,
331 Key: key,
332 Value: val,
333 }
334
335 if opts != nil {
336 act.PrevValue = opts.PrevValue
337 act.PrevIndex = opts.PrevIndex
338 act.PrevExist = opts.PrevExist
339 act.TTL = opts.TTL
340 act.Refresh = opts.Refresh
341 act.Dir = opts.Dir
342 act.NoValueOnSuccess = opts.NoValueOnSuccess
343 }
344
345 doCtx := ctx
346 if act.PrevExist == PrevNoExist {
347 doCtx = context.WithValue(doCtx, &oneShotCtxValue, &oneShotCtxValue)
348 }
349 resp, body, err := k.client.Do(doCtx, act)
350 if err != nil {
351 return nil, err
352 }
353
354 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
355}
356
357func (k *httpKeysAPI) Create(ctx context.Context, key, val string) (*Response, error) {
358 return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevNoExist})
359}
360
361func (k *httpKeysAPI) CreateInOrder(ctx context.Context, dir, val string, opts *CreateInOrderOptions) (*Response, error) {
362 act := &createInOrderAction{
363 Prefix: k.prefix,
364 Dir: dir,
365 Value: val,
366 }
367
368 if opts != nil {
369 act.TTL = opts.TTL
370 }
371
372 resp, body, err := k.client.Do(ctx, act)
373 if err != nil {
374 return nil, err
375 }
376
377 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
378}
379
380func (k *httpKeysAPI) Update(ctx context.Context, key, val string) (*Response, error) {
381 return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevExist})
382}
383
384func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error) {
385 act := &deleteAction{
386 Prefix: k.prefix,
387 Key: key,
388 }
389
390 if opts != nil {
391 act.PrevValue = opts.PrevValue
392 act.PrevIndex = opts.PrevIndex
393 act.Dir = opts.Dir
394 act.Recursive = opts.Recursive
395 }
396
397 doCtx := context.WithValue(ctx, &oneShotCtxValue, &oneShotCtxValue)
398 resp, body, err := k.client.Do(doCtx, act)
399 if err != nil {
400 return nil, err
401 }
402
403 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
404}
405
406func (k *httpKeysAPI) Get(ctx context.Context, key string, opts *GetOptions) (*Response, error) {
407 act := &getAction{
408 Prefix: k.prefix,
409 Key: key,
410 }
411
412 if opts != nil {
413 act.Recursive = opts.Recursive
414 act.Sorted = opts.Sort
415 act.Quorum = opts.Quorum
416 }
417
418 resp, body, err := k.client.Do(ctx, act)
419 if err != nil {
420 return nil, err
421 }
422
423 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
424}
425
426func (k *httpKeysAPI) Watcher(key string, opts *WatcherOptions) Watcher {
427 act := waitAction{
428 Prefix: k.prefix,
429 Key: key,
430 }
431
432 if opts != nil {
433 act.Recursive = opts.Recursive
434 if opts.AfterIndex > 0 {
435 act.WaitIndex = opts.AfterIndex + 1
436 }
437 }
438
439 return &httpWatcher{
440 client: k.client,
441 nextWait: act,
442 }
443}
444
445type httpWatcher struct {
446 client httpClient
447 nextWait waitAction
448}
449
450func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
451 for {
452 httpresp, body, err := hw.client.Do(ctx, &hw.nextWait)
453 if err != nil {
454 return nil, err
455 }
456
457 resp, err := unmarshalHTTPResponse(httpresp.StatusCode, httpresp.Header, body)
458 if err != nil {
459 if err == ErrEmptyBody {
460 continue
461 }
462 return nil, err
463 }
464
465 hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
466 return resp, nil
467 }
468}
469
470// v2KeysURL forms a URL representing the location of a key.
471// The endpoint argument represents the base URL of an etcd
472// server. The prefix is the path needed to route from the
473// provided endpoint's path to the root of the keys API
474// (typically "/v2/keys").
475func v2KeysURL(ep url.URL, prefix, key string) *url.URL {
476 // We concatenate all parts together manually. We cannot use
477 // path.Join because it does not reserve trailing slash.
478 // We call CanonicalURLPath to further cleanup the path.
479 if prefix != "" && prefix[0] != '/' {
480 prefix = "/" + prefix
481 }
482 if key != "" && key[0] != '/' {
483 key = "/" + key
484 }
485 ep.Path = pathutil.CanonicalURLPath(ep.Path + prefix + key)
486 return &ep
487}
488
489type getAction struct {
490 Prefix string
491 Key string
492 Recursive bool
493 Sorted bool
494 Quorum bool
495}
496
497func (g *getAction) HTTPRequest(ep url.URL) *http.Request {
498 u := v2KeysURL(ep, g.Prefix, g.Key)
499
500 params := u.Query()
501 params.Set("recursive", strconv.FormatBool(g.Recursive))
502 params.Set("sorted", strconv.FormatBool(g.Sorted))
503 params.Set("quorum", strconv.FormatBool(g.Quorum))
504 u.RawQuery = params.Encode()
505
506 req, _ := http.NewRequest("GET", u.String(), nil)
507 return req
508}
509
510type waitAction struct {
511 Prefix string
512 Key string
513 WaitIndex uint64
514 Recursive bool
515}
516
517func (w *waitAction) HTTPRequest(ep url.URL) *http.Request {
518 u := v2KeysURL(ep, w.Prefix, w.Key)
519
520 params := u.Query()
521 params.Set("wait", "true")
522 params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
523 params.Set("recursive", strconv.FormatBool(w.Recursive))
524 u.RawQuery = params.Encode()
525
526 req, _ := http.NewRequest("GET", u.String(), nil)
527 return req
528}
529
530type setAction struct {
531 Prefix string
532 Key string
533 Value string
534 PrevValue string
535 PrevIndex uint64
536 PrevExist PrevExistType
537 TTL time.Duration
538 Refresh bool
539 Dir bool
540 NoValueOnSuccess bool
541}
542
543func (a *setAction) HTTPRequest(ep url.URL) *http.Request {
544 u := v2KeysURL(ep, a.Prefix, a.Key)
545
546 params := u.Query()
547 form := url.Values{}
548
549 // we're either creating a directory or setting a key
550 if a.Dir {
551 params.Set("dir", strconv.FormatBool(a.Dir))
552 } else {
553 // These options are only valid for setting a key
554 if a.PrevValue != "" {
555 params.Set("prevValue", a.PrevValue)
556 }
557 form.Add("value", a.Value)
558 }
559
560 // Options which apply to both setting a key and creating a dir
561 if a.PrevIndex != 0 {
562 params.Set("prevIndex", strconv.FormatUint(a.PrevIndex, 10))
563 }
564 if a.PrevExist != PrevIgnore {
565 params.Set("prevExist", string(a.PrevExist))
566 }
567 if a.TTL > 0 {
568 form.Add("ttl", strconv.FormatUint(uint64(a.TTL.Seconds()), 10))
569 }
570
571 if a.Refresh {
572 form.Add("refresh", "true")
573 }
574 if a.NoValueOnSuccess {
575 params.Set("noValueOnSuccess", strconv.FormatBool(a.NoValueOnSuccess))
576 }
577
578 u.RawQuery = params.Encode()
579 body := strings.NewReader(form.Encode())
580
581 req, _ := http.NewRequest("PUT", u.String(), body)
582 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
583
584 return req
585}
586
587type deleteAction struct {
588 Prefix string
589 Key string
590 PrevValue string
591 PrevIndex uint64
592 Dir bool
593 Recursive bool
594}
595
596func (a *deleteAction) HTTPRequest(ep url.URL) *http.Request {
597 u := v2KeysURL(ep, a.Prefix, a.Key)
598
599 params := u.Query()
600 if a.PrevValue != "" {
601 params.Set("prevValue", a.PrevValue)
602 }
603 if a.PrevIndex != 0 {
604 params.Set("prevIndex", strconv.FormatUint(a.PrevIndex, 10))
605 }
606 if a.Dir {
607 params.Set("dir", "true")
608 }
609 if a.Recursive {
610 params.Set("recursive", "true")
611 }
612 u.RawQuery = params.Encode()
613
614 req, _ := http.NewRequest("DELETE", u.String(), nil)
615 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
616
617 return req
618}
619
620type createInOrderAction struct {
621 Prefix string
622 Dir string
623 Value string
624 TTL time.Duration
625}
626
627func (a *createInOrderAction) HTTPRequest(ep url.URL) *http.Request {
628 u := v2KeysURL(ep, a.Prefix, a.Dir)
629
630 form := url.Values{}
631 form.Add("value", a.Value)
632 if a.TTL > 0 {
633 form.Add("ttl", strconv.FormatUint(uint64(a.TTL.Seconds()), 10))
634 }
635 body := strings.NewReader(form.Encode())
636
637 req, _ := http.NewRequest("POST", u.String(), body)
638 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
639 return req
640}
641
642func unmarshalHTTPResponse(code int, header http.Header, body []byte) (res *Response, err error) {
643 switch code {
644 case http.StatusOK, http.StatusCreated:
645 if len(body) == 0 {
646 return nil, ErrEmptyBody
647 }
648 res, err = unmarshalSuccessfulKeysResponse(header, body)
649 default:
650 err = unmarshalFailedKeysResponse(body)
651 }
652 return res, err
653}
654
655var jsonIterator = caseSensitiveJsonIterator()
656
657func unmarshalSuccessfulKeysResponse(header http.Header, body []byte) (*Response, error) {
658 var res Response
659 err := jsonIterator.Unmarshal(body, &res)
660 if err != nil {
661 return nil, ErrInvalidJSON
662 }
663 if header.Get("X-Etcd-Index") != "" {
664 res.Index, err = strconv.ParseUint(header.Get("X-Etcd-Index"), 10, 64)
665 if err != nil {
666 return nil, err
667 }
668 }
669 res.ClusterID = header.Get("X-Etcd-Cluster-ID")
670 return &res, nil
671}
672
673func unmarshalFailedKeysResponse(body []byte) error {
674 var etcdErr Error
675 if err := json.Unmarshal(body, &etcdErr); err != nil {
676 return ErrInvalidJSON
677 }
678 return etcdErr
679}