blob: bd45a067c90821f3b23aa0de170066ec3576b1c4 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package api
2
3import (
4 "bytes"
5 "fmt"
6 "io"
7 "net/http"
8 "strconv"
9 "strings"
10)
11
12// KVPair is used to represent a single K/V entry
13type KVPair struct {
14 // Key is the name of the key. It is also part of the URL path when accessed
15 // via the API.
16 Key string
17
18 // CreateIndex holds the index corresponding the creation of this KVPair. This
19 // is a read-only field.
20 CreateIndex uint64
21
22 // ModifyIndex is used for the Check-And-Set operations and can also be fed
23 // back into the WaitIndex of the QueryOptions in order to perform blocking
24 // queries.
25 ModifyIndex uint64
26
27 // LockIndex holds the index corresponding to a lock on this key, if any. This
28 // is a read-only field.
29 LockIndex uint64
30
31 // Flags are any user-defined flags on the key. It is up to the implementer
32 // to check these values, since Consul does not treat them specially.
33 Flags uint64
34
35 // Value is the value for the key. This can be any value, but it will be
36 // base64 encoded upon transport.
37 Value []byte
38
39 // Session is a string representing the ID of the session. Any other
40 // interactions with this key over the same session must specify the same
41 // session ID.
42 Session string
43}
44
45// KVPairs is a list of KVPair objects
46type KVPairs []*KVPair
47
48// KV is used to manipulate the K/V API
49type KV struct {
50 c *Client
51}
52
53// KV is used to return a handle to the K/V apis
54func (c *Client) KV() *KV {
55 return &KV{c}
56}
57
58// Get is used to lookup a single key. The returned pointer
59// to the KVPair will be nil if the key does not exist.
60func (k *KV) Get(key string, q *QueryOptions) (*KVPair, *QueryMeta, error) {
61 resp, qm, err := k.getInternal(key, nil, q)
62 if err != nil {
63 return nil, nil, err
64 }
65 if resp == nil {
66 return nil, qm, nil
67 }
68 defer resp.Body.Close()
69
70 var entries []*KVPair
71 if err := decodeBody(resp, &entries); err != nil {
72 return nil, nil, err
73 }
74 if len(entries) > 0 {
75 return entries[0], qm, nil
76 }
77 return nil, qm, nil
78}
79
80// List is used to lookup all keys under a prefix
81func (k *KV) List(prefix string, q *QueryOptions) (KVPairs, *QueryMeta, error) {
82 resp, qm, err := k.getInternal(prefix, map[string]string{"recurse": ""}, q)
83 if err != nil {
84 return nil, nil, err
85 }
86 if resp == nil {
87 return nil, qm, nil
88 }
89 defer resp.Body.Close()
90
91 var entries []*KVPair
92 if err := decodeBody(resp, &entries); err != nil {
93 return nil, nil, err
94 }
95 return entries, qm, nil
96}
97
98// Keys is used to list all the keys under a prefix. Optionally,
99// a separator can be used to limit the responses.
100func (k *KV) Keys(prefix, separator string, q *QueryOptions) ([]string, *QueryMeta, error) {
101 params := map[string]string{"keys": ""}
102 if separator != "" {
103 params["separator"] = separator
104 }
105 resp, qm, err := k.getInternal(prefix, params, q)
106 if err != nil {
107 return nil, nil, err
108 }
109 if resp == nil {
110 return nil, qm, nil
111 }
112 defer resp.Body.Close()
113
114 var entries []string
115 if err := decodeBody(resp, &entries); err != nil {
116 return nil, nil, err
117 }
118 return entries, qm, nil
119}
120
121func (k *KV) getInternal(key string, params map[string]string, q *QueryOptions) (*http.Response, *QueryMeta, error) {
122 r := k.c.newRequest("GET", "/v1/kv/"+strings.TrimPrefix(key, "/"))
123 r.setQueryOptions(q)
124 for param, val := range params {
125 r.params.Set(param, val)
126 }
127 rtt, resp, err := k.c.doRequest(r)
128 if err != nil {
129 return nil, nil, err
130 }
131
132 qm := &QueryMeta{}
133 parseQueryMeta(resp, qm)
134 qm.RequestTime = rtt
135
136 if resp.StatusCode == 404 {
137 resp.Body.Close()
138 return nil, qm, nil
139 } else if resp.StatusCode != 200 {
140 resp.Body.Close()
141 return nil, nil, fmt.Errorf("Unexpected response code: %d", resp.StatusCode)
142 }
143 return resp, qm, nil
144}
145
146// Put is used to write a new value. Only the
147// Key, Flags and Value is respected.
148func (k *KV) Put(p *KVPair, q *WriteOptions) (*WriteMeta, error) {
149 params := make(map[string]string, 1)
150 if p.Flags != 0 {
151 params["flags"] = strconv.FormatUint(p.Flags, 10)
152 }
153 _, wm, err := k.put(p.Key, params, p.Value, q)
154 return wm, err
155}
156
157// CAS is used for a Check-And-Set operation. The Key,
158// ModifyIndex, Flags and Value are respected. Returns true
159// on success or false on failures.
160func (k *KV) CAS(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
161 params := make(map[string]string, 2)
162 if p.Flags != 0 {
163 params["flags"] = strconv.FormatUint(p.Flags, 10)
164 }
165 params["cas"] = strconv.FormatUint(p.ModifyIndex, 10)
166 return k.put(p.Key, params, p.Value, q)
167}
168
169// Acquire is used for a lock acquisition operation. The Key,
170// Flags, Value and Session are respected. Returns true
171// on success or false on failures.
172func (k *KV) Acquire(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
173 params := make(map[string]string, 2)
174 if p.Flags != 0 {
175 params["flags"] = strconv.FormatUint(p.Flags, 10)
176 }
177 params["acquire"] = p.Session
178 return k.put(p.Key, params, p.Value, q)
179}
180
181// Release is used for a lock release operation. The Key,
182// Flags, Value and Session are respected. Returns true
183// on success or false on failures.
184func (k *KV) Release(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
185 params := make(map[string]string, 2)
186 if p.Flags != 0 {
187 params["flags"] = strconv.FormatUint(p.Flags, 10)
188 }
189 params["release"] = p.Session
190 return k.put(p.Key, params, p.Value, q)
191}
192
193func (k *KV) put(key string, params map[string]string, body []byte, q *WriteOptions) (bool, *WriteMeta, error) {
194 if len(key) > 0 && key[0] == '/' {
195 return false, nil, fmt.Errorf("Invalid key. Key must not begin with a '/': %s", key)
196 }
197
198 r := k.c.newRequest("PUT", "/v1/kv/"+key)
199 r.setWriteOptions(q)
200 for param, val := range params {
201 r.params.Set(param, val)
202 }
203 r.body = bytes.NewReader(body)
204 rtt, resp, err := requireOK(k.c.doRequest(r))
205 if err != nil {
206 return false, nil, err
207 }
208 defer resp.Body.Close()
209
210 qm := &WriteMeta{}
211 qm.RequestTime = rtt
212
213 var buf bytes.Buffer
214 if _, err := io.Copy(&buf, resp.Body); err != nil {
215 return false, nil, fmt.Errorf("Failed to read response: %v", err)
216 }
217 res := strings.Contains(buf.String(), "true")
218 return res, qm, nil
219}
220
221// Delete is used to delete a single key
222func (k *KV) Delete(key string, w *WriteOptions) (*WriteMeta, error) {
223 _, qm, err := k.deleteInternal(key, nil, w)
224 return qm, err
225}
226
227// DeleteCAS is used for a Delete Check-And-Set operation. The Key
228// and ModifyIndex are respected. Returns true on success or false on failures.
229func (k *KV) DeleteCAS(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
230 params := map[string]string{
231 "cas": strconv.FormatUint(p.ModifyIndex, 10),
232 }
233 return k.deleteInternal(p.Key, params, q)
234}
235
236// DeleteTree is used to delete all keys under a prefix
237func (k *KV) DeleteTree(prefix string, w *WriteOptions) (*WriteMeta, error) {
238 _, qm, err := k.deleteInternal(prefix, map[string]string{"recurse": ""}, w)
239 return qm, err
240}
241
242func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOptions) (bool, *WriteMeta, error) {
243 r := k.c.newRequest("DELETE", "/v1/kv/"+strings.TrimPrefix(key, "/"))
244 r.setWriteOptions(q)
245 for param, val := range params {
246 r.params.Set(param, val)
247 }
248 rtt, resp, err := requireOK(k.c.doRequest(r))
249 if err != nil {
250 return false, nil, err
251 }
252 defer resp.Body.Close()
253
254 qm := &WriteMeta{}
255 qm.RequestTime = rtt
256
257 var buf bytes.Buffer
258 if _, err := io.Copy(&buf, resp.Body); err != nil {
259 return false, nil, fmt.Errorf("Failed to read response: %v", err)
260 }
261 res := strings.Contains(buf.String(), "true")
262 return res, qm, nil
263}
264
265// The Txn function has been deprecated from the KV object; please see the Txn
266// object for more information about Transactions.
267func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) {
268 var ops TxnOps
269 for _, op := range txn {
270 ops = append(ops, &TxnOp{KV: op})
271 }
272
273 respOk, txnResp, qm, err := k.c.txn(ops, q)
274 if err != nil {
275 return false, nil, nil, err
276 }
277
278 // Convert from the internal format.
279 kvResp := KVTxnResponse{
280 Errors: txnResp.Errors,
281 }
282 for _, result := range txnResp.Results {
283 kvResp.Results = append(kvResp.Results, result.KV)
284 }
285 return respOk, &kvResp, qm, nil
286}