blob: 97f51568559b692d339ce7f719475c9dd21a4e63 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package api
2
3import (
4 "bytes"
5 "fmt"
6 "io"
7 "net/http"
8 "strconv"
9 "strings"
10)
11
12// KVPair is used to represent a single K/V entry
13type KVPair struct {
14 // Key is the name of the key. It is also part of the URL path when accessed
15 // via the API.
16 Key string
17
18 // CreateIndex holds the index corresponding the creation of this KVPair. This
19 // is a read-only field.
20 CreateIndex uint64
21
22 // ModifyIndex is used for the Check-And-Set operations and can also be fed
23 // back into the WaitIndex of the QueryOptions in order to perform blocking
24 // queries.
25 ModifyIndex uint64
26
27 // LockIndex holds the index corresponding to a lock on this key, if any. This
28 // is a read-only field.
29 LockIndex uint64
30
31 // Flags are any user-defined flags on the key. It is up to the implementer
32 // to check these values, since Consul does not treat them specially.
33 Flags uint64
34
35 // Value is the value for the key. This can be any value, but it will be
36 // base64 encoded upon transport.
37 Value []byte
38
39 // Session is a string representing the ID of the session. Any other
40 // interactions with this key over the same session must specify the same
41 // session ID.
42 Session string
43}
44
45// KVPairs is a list of KVPair objects
46type KVPairs []*KVPair
47
48// KVOp constants give possible operations available in a KVTxn.
49type KVOp string
50
51const (
52 KVSet KVOp = "set"
53 KVDelete KVOp = "delete"
54 KVDeleteCAS KVOp = "delete-cas"
55 KVDeleteTree KVOp = "delete-tree"
56 KVCAS KVOp = "cas"
57 KVLock KVOp = "lock"
58 KVUnlock KVOp = "unlock"
59 KVGet KVOp = "get"
60 KVGetTree KVOp = "get-tree"
61 KVCheckSession KVOp = "check-session"
62 KVCheckIndex KVOp = "check-index"
63 KVCheckNotExists KVOp = "check-not-exists"
64)
65
66// KVTxnOp defines a single operation inside a transaction.
67type KVTxnOp struct {
68 Verb KVOp
69 Key string
70 Value []byte
71 Flags uint64
72 Index uint64
73 Session string
74}
75
76// KVTxnOps defines a set of operations to be performed inside a single
77// transaction.
78type KVTxnOps []*KVTxnOp
79
80// KVTxnResponse has the outcome of a transaction.
81type KVTxnResponse struct {
82 Results []*KVPair
83 Errors TxnErrors
84}
85
86// KV is used to manipulate the K/V API
87type KV struct {
88 c *Client
89}
90
91// KV is used to return a handle to the K/V apis
92func (c *Client) KV() *KV {
93 return &KV{c}
94}
95
96// Get is used to lookup a single key. The returned pointer
97// to the KVPair will be nil if the key does not exist.
98func (k *KV) Get(key string, q *QueryOptions) (*KVPair, *QueryMeta, error) {
99 resp, qm, err := k.getInternal(key, nil, q)
100 if err != nil {
101 return nil, nil, err
102 }
103 if resp == nil {
104 return nil, qm, nil
105 }
106 defer resp.Body.Close()
107
108 var entries []*KVPair
109 if err := decodeBody(resp, &entries); err != nil {
110 return nil, nil, err
111 }
112 if len(entries) > 0 {
113 return entries[0], qm, nil
114 }
115 return nil, qm, nil
116}
117
118// List is used to lookup all keys under a prefix
119func (k *KV) List(prefix string, q *QueryOptions) (KVPairs, *QueryMeta, error) {
120 resp, qm, err := k.getInternal(prefix, map[string]string{"recurse": ""}, q)
121 if err != nil {
122 return nil, nil, err
123 }
124 if resp == nil {
125 return nil, qm, nil
126 }
127 defer resp.Body.Close()
128
129 var entries []*KVPair
130 if err := decodeBody(resp, &entries); err != nil {
131 return nil, nil, err
132 }
133 return entries, qm, nil
134}
135
136// Keys is used to list all the keys under a prefix. Optionally,
137// a separator can be used to limit the responses.
138func (k *KV) Keys(prefix, separator string, q *QueryOptions) ([]string, *QueryMeta, error) {
139 params := map[string]string{"keys": ""}
140 if separator != "" {
141 params["separator"] = separator
142 }
143 resp, qm, err := k.getInternal(prefix, params, q)
144 if err != nil {
145 return nil, nil, err
146 }
147 if resp == nil {
148 return nil, qm, nil
149 }
150 defer resp.Body.Close()
151
152 var entries []string
153 if err := decodeBody(resp, &entries); err != nil {
154 return nil, nil, err
155 }
156 return entries, qm, nil
157}
158
159func (k *KV) getInternal(key string, params map[string]string, q *QueryOptions) (*http.Response, *QueryMeta, error) {
160 r := k.c.newRequest("GET", "/v1/kv/"+strings.TrimPrefix(key, "/"))
161 r.setQueryOptions(q)
162 for param, val := range params {
163 r.params.Set(param, val)
164 }
165 rtt, resp, err := k.c.doRequest(r)
166 if err != nil {
167 return nil, nil, err
168 }
169
170 qm := &QueryMeta{}
171 parseQueryMeta(resp, qm)
172 qm.RequestTime = rtt
173
174 if resp.StatusCode == 404 {
175 resp.Body.Close()
176 return nil, qm, nil
177 } else if resp.StatusCode != 200 {
178 resp.Body.Close()
179 return nil, nil, fmt.Errorf("Unexpected response code: %d", resp.StatusCode)
180 }
181 return resp, qm, nil
182}
183
184// Put is used to write a new value. Only the
185// Key, Flags and Value is respected.
186func (k *KV) Put(p *KVPair, q *WriteOptions) (*WriteMeta, error) {
187 params := make(map[string]string, 1)
188 if p.Flags != 0 {
189 params["flags"] = strconv.FormatUint(p.Flags, 10)
190 }
191 _, wm, err := k.put(p.Key, params, p.Value, q)
192 return wm, err
193}
194
195// CAS is used for a Check-And-Set operation. The Key,
196// ModifyIndex, Flags and Value are respected. Returns true
197// on success or false on failures.
198func (k *KV) CAS(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
199 params := make(map[string]string, 2)
200 if p.Flags != 0 {
201 params["flags"] = strconv.FormatUint(p.Flags, 10)
202 }
203 params["cas"] = strconv.FormatUint(p.ModifyIndex, 10)
204 return k.put(p.Key, params, p.Value, q)
205}
206
207// Acquire is used for a lock acquisition operation. The Key,
208// Flags, Value and Session are respected. Returns true
209// on success or false on failures.
210func (k *KV) Acquire(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
211 params := make(map[string]string, 2)
212 if p.Flags != 0 {
213 params["flags"] = strconv.FormatUint(p.Flags, 10)
214 }
215 params["acquire"] = p.Session
216 return k.put(p.Key, params, p.Value, q)
217}
218
219// Release is used for a lock release operation. The Key,
220// Flags, Value and Session are respected. Returns true
221// on success or false on failures.
222func (k *KV) Release(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
223 params := make(map[string]string, 2)
224 if p.Flags != 0 {
225 params["flags"] = strconv.FormatUint(p.Flags, 10)
226 }
227 params["release"] = p.Session
228 return k.put(p.Key, params, p.Value, q)
229}
230
231func (k *KV) put(key string, params map[string]string, body []byte, q *WriteOptions) (bool, *WriteMeta, error) {
232 if len(key) > 0 && key[0] == '/' {
233 return false, nil, fmt.Errorf("Invalid key. Key must not begin with a '/': %s", key)
234 }
235
236 r := k.c.newRequest("PUT", "/v1/kv/"+key)
237 r.setWriteOptions(q)
238 for param, val := range params {
239 r.params.Set(param, val)
240 }
241 r.body = bytes.NewReader(body)
242 rtt, resp, err := requireOK(k.c.doRequest(r))
243 if err != nil {
244 return false, nil, err
245 }
246 defer resp.Body.Close()
247
248 qm := &WriteMeta{}
249 qm.RequestTime = rtt
250
251 var buf bytes.Buffer
252 if _, err := io.Copy(&buf, resp.Body); err != nil {
253 return false, nil, fmt.Errorf("Failed to read response: %v", err)
254 }
255 res := strings.Contains(buf.String(), "true")
256 return res, qm, nil
257}
258
259// Delete is used to delete a single key
260func (k *KV) Delete(key string, w *WriteOptions) (*WriteMeta, error) {
261 _, qm, err := k.deleteInternal(key, nil, w)
262 return qm, err
263}
264
265// DeleteCAS is used for a Delete Check-And-Set operation. The Key
266// and ModifyIndex are respected. Returns true on success or false on failures.
267func (k *KV) DeleteCAS(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
268 params := map[string]string{
269 "cas": strconv.FormatUint(p.ModifyIndex, 10),
270 }
271 return k.deleteInternal(p.Key, params, q)
272}
273
274// DeleteTree is used to delete all keys under a prefix
275func (k *KV) DeleteTree(prefix string, w *WriteOptions) (*WriteMeta, error) {
276 _, qm, err := k.deleteInternal(prefix, map[string]string{"recurse": ""}, w)
277 return qm, err
278}
279
280func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOptions) (bool, *WriteMeta, error) {
281 r := k.c.newRequest("DELETE", "/v1/kv/"+strings.TrimPrefix(key, "/"))
282 r.setWriteOptions(q)
283 for param, val := range params {
284 r.params.Set(param, val)
285 }
286 rtt, resp, err := requireOK(k.c.doRequest(r))
287 if err != nil {
288 return false, nil, err
289 }
290 defer resp.Body.Close()
291
292 qm := &WriteMeta{}
293 qm.RequestTime = rtt
294
295 var buf bytes.Buffer
296 if _, err := io.Copy(&buf, resp.Body); err != nil {
297 return false, nil, fmt.Errorf("Failed to read response: %v", err)
298 }
299 res := strings.Contains(buf.String(), "true")
300 return res, qm, nil
301}
302
303// TxnOp is the internal format we send to Consul. It's not specific to KV,
304// though currently only KV operations are supported.
305type TxnOp struct {
306 KV *KVTxnOp
307}
308
309// TxnOps is a list of transaction operations.
310type TxnOps []*TxnOp
311
312// TxnResult is the internal format we receive from Consul.
313type TxnResult struct {
314 KV *KVPair
315}
316
317// TxnResults is a list of TxnResult objects.
318type TxnResults []*TxnResult
319
320// TxnError is used to return information about an operation in a transaction.
321type TxnError struct {
322 OpIndex int
323 What string
324}
325
326// TxnErrors is a list of TxnError objects.
327type TxnErrors []*TxnError
328
329// TxnResponse is the internal format we receive from Consul.
330type TxnResponse struct {
331 Results TxnResults
332 Errors TxnErrors
333}
334
335// Txn is used to apply multiple KV operations in a single, atomic transaction.
336//
337// Note that Go will perform the required base64 encoding on the values
338// automatically because the type is a byte slice. Transactions are defined as a
339// list of operations to perform, using the KVOp constants and KVTxnOp structure
340// to define operations. If any operation fails, none of the changes are applied
341// to the state store. Note that this hides the internal raw transaction interface
342// and munges the input and output types into KV-specific ones for ease of use.
343// If there are more non-KV operations in the future we may break out a new
344// transaction API client, but it will be easy to keep this KV-specific variant
345// supported.
346//
347// Even though this is generally a write operation, we take a QueryOptions input
348// and return a QueryMeta output. If the transaction contains only read ops, then
349// Consul will fast-path it to a different endpoint internally which supports
350// consistency controls, but not blocking. If there are write operations then
351// the request will always be routed through raft and any consistency settings
352// will be ignored.
353//
354// Here's an example:
355//
356// ops := KVTxnOps{
357// &KVTxnOp{
358// Verb: KVLock,
359// Key: "test/lock",
360// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
361// Value: []byte("hello"),
362// },
363// &KVTxnOp{
364// Verb: KVGet,
365// Key: "another/key",
366// },
367// }
368// ok, response, _, err := kv.Txn(&ops, nil)
369//
370// If there is a problem making the transaction request then an error will be
371// returned. Otherwise, the ok value will be true if the transaction succeeded
372// or false if it was rolled back. The response is a structured return value which
373// will have the outcome of the transaction. Its Results member will have entries
374// for each operation. Deleted keys will have a nil entry in the, and to save
375// space, the Value of each key in the Results will be nil unless the operation
376// is a KVGet. If the transaction was rolled back, the Errors member will have
377// entries referencing the index of the operation that failed along with an error
378// message.
379func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) {
380 r := k.c.newRequest("PUT", "/v1/txn")
381 r.setQueryOptions(q)
382
383 // Convert into the internal format since this is an all-KV txn.
384 ops := make(TxnOps, 0, len(txn))
385 for _, kvOp := range txn {
386 ops = append(ops, &TxnOp{KV: kvOp})
387 }
388 r.obj = ops
389 rtt, resp, err := k.c.doRequest(r)
390 if err != nil {
391 return false, nil, nil, err
392 }
393 defer resp.Body.Close()
394
395 qm := &QueryMeta{}
396 parseQueryMeta(resp, qm)
397 qm.RequestTime = rtt
398
399 if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
400 var txnResp TxnResponse
401 if err := decodeBody(resp, &txnResp); err != nil {
402 return false, nil, nil, err
403 }
404
405 // Convert from the internal format.
406 kvResp := KVTxnResponse{
407 Errors: txnResp.Errors,
408 }
409 for _, result := range txnResp.Results {
410 kvResp.Results = append(kvResp.Results, result.KV)
411 }
412 return resp.StatusCode == http.StatusOK, &kvResp, qm, nil
413 }
414
415 var buf bytes.Buffer
416 if _, err := io.Copy(&buf, resp.Body); err != nil {
417 return false, nil, nil, fmt.Errorf("Failed to read response: %v", err)
418 }
419 return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String())
420}