blob: 444f93f3a8e7d03a4949ba96c93543b327353c60 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v2v3
16
17import (
18 "context"
19 "fmt"
20 "path"
21 "strings"
22 "time"
23
24 "github.com/coreos/etcd/clientv3"
25 "github.com/coreos/etcd/clientv3/concurrency"
26 etcdErr "github.com/coreos/etcd/error"
27 "github.com/coreos/etcd/mvcc/mvccpb"
28 "github.com/coreos/etcd/store"
29)
30
31// store implements the Store interface for V2 using
32// a v3 client.
33type v2v3Store struct {
34 c *clientv3.Client
35 // pfx is the v3 prefix where keys should be stored.
36 pfx string
37 ctx context.Context
38}
39
40const maxPathDepth = 63
41
42var errUnsupported = fmt.Errorf("TTLs are unsupported")
43
44func NewStore(c *clientv3.Client, pfx string) store.Store { return newStore(c, pfx) }
45
46func newStore(c *clientv3.Client, pfx string) *v2v3Store { return &v2v3Store{c, pfx, c.Ctx()} }
47
48func (s *v2v3Store) Index() uint64 { panic("STUB") }
49
50func (s *v2v3Store) Get(nodePath string, recursive, sorted bool) (*store.Event, error) {
51 key := s.mkPath(nodePath)
52 resp, err := s.c.Txn(s.ctx).Then(
53 clientv3.OpGet(key+"/"),
54 clientv3.OpGet(key),
55 ).Commit()
56 if err != nil {
57 return nil, err
58 }
59
60 if kvs := resp.Responses[0].GetResponseRange().Kvs; len(kvs) != 0 || isRoot(nodePath) {
61 nodes, err := s.getDir(nodePath, recursive, sorted, resp.Header.Revision)
62 if err != nil {
63 return nil, err
64 }
65 cidx, midx := uint64(0), uint64(0)
66 if len(kvs) > 0 {
67 cidx, midx = mkV2Rev(kvs[0].CreateRevision), mkV2Rev(kvs[0].ModRevision)
68 }
69 return &store.Event{
70 Action: store.Get,
71 Node: &store.NodeExtern{
72 Key: nodePath,
73 Dir: true,
74 Nodes: nodes,
75 CreatedIndex: cidx,
76 ModifiedIndex: midx,
77 },
78 EtcdIndex: mkV2Rev(resp.Header.Revision),
79 }, nil
80 }
81
82 kvs := resp.Responses[1].GetResponseRange().Kvs
83 if len(kvs) == 0 {
84 return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
85 }
86
87 return &store.Event{
88 Action: store.Get,
89 Node: s.mkV2Node(kvs[0]),
90 EtcdIndex: mkV2Rev(resp.Header.Revision),
91 }, nil
92}
93
94func (s *v2v3Store) getDir(nodePath string, recursive, sorted bool, rev int64) ([]*store.NodeExtern, error) {
95 rootNodes, err := s.getDirDepth(nodePath, 1, rev)
96 if err != nil || !recursive {
97 return rootNodes, err
98 }
99 nextNodes := rootNodes
100 nodes := make(map[string]*store.NodeExtern)
101 // Breadth walk the subdirectories
102 for i := 2; len(nextNodes) > 0; i++ {
103 for _, n := range nextNodes {
104 nodes[n.Key] = n
105 if parent := nodes[path.Dir(n.Key)]; parent != nil {
106 parent.Nodes = append(parent.Nodes, n)
107 }
108 }
109 if nextNodes, err = s.getDirDepth(nodePath, i, rev); err != nil {
110 return nil, err
111 }
112 }
113 return rootNodes, nil
114}
115
116func (s *v2v3Store) getDirDepth(nodePath string, depth int, rev int64) ([]*store.NodeExtern, error) {
117 pd := s.mkPathDepth(nodePath, depth)
118 resp, err := s.c.Get(s.ctx, pd, clientv3.WithPrefix(), clientv3.WithRev(rev))
119 if err != nil {
120 return nil, err
121 }
122
123 nodes := make([]*store.NodeExtern, len(resp.Kvs))
124 for i, kv := range resp.Kvs {
125 nodes[i] = s.mkV2Node(kv)
126 }
127 return nodes, nil
128}
129
130func (s *v2v3Store) Set(
131 nodePath string,
132 dir bool,
133 value string,
134 expireOpts store.TTLOptionSet,
135) (*store.Event, error) {
136 if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
137 return nil, errUnsupported
138 }
139
140 if isRoot(nodePath) {
141 return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, nodePath, 0)
142 }
143
144 ecode := 0
145 applyf := func(stm concurrency.STM) error {
146 parent := path.Dir(nodePath)
147 if !isRoot(parent) && stm.Rev(s.mkPath(parent)+"/") == 0 {
148 ecode = etcdErr.EcodeKeyNotFound
149 return nil
150 }
151
152 key := s.mkPath(nodePath)
153 if dir {
154 if stm.Rev(key) != 0 {
155 // exists as non-dir
156 ecode = etcdErr.EcodeNotDir
157 return nil
158 }
159 key = key + "/"
160 } else if stm.Rev(key+"/") != 0 {
161 ecode = etcdErr.EcodeNotFile
162 return nil
163 }
164 stm.Put(key, value, clientv3.WithPrevKV())
165 stm.Put(s.mkActionKey(), store.Set)
166 return nil
167 }
168
169 resp, err := s.newSTM(applyf)
170 if err != nil {
171 return nil, err
172 }
173 if ecode != 0 {
174 return nil, etcdErr.NewError(ecode, nodePath, mkV2Rev(resp.Header.Revision))
175 }
176
177 createRev := resp.Header.Revision
178 var pn *store.NodeExtern
179 if pkv := prevKeyFromPuts(resp); pkv != nil {
180 pn = s.mkV2Node(pkv)
181 createRev = pkv.CreateRevision
182 }
183
184 vp := &value
185 if dir {
186 vp = nil
187 }
188 return &store.Event{
189 Action: store.Set,
190 Node: &store.NodeExtern{
191 Key: nodePath,
192 Value: vp,
193 Dir: dir,
194 ModifiedIndex: mkV2Rev(resp.Header.Revision),
195 CreatedIndex: mkV2Rev(createRev),
196 },
197 PrevNode: pn,
198 EtcdIndex: mkV2Rev(resp.Header.Revision),
199 }, nil
200}
201
202func (s *v2v3Store) Update(nodePath, newValue string, expireOpts store.TTLOptionSet) (*store.Event, error) {
203 if isRoot(nodePath) {
204 return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, nodePath, 0)
205 }
206
207 if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
208 return nil, errUnsupported
209 }
210
211 key := s.mkPath(nodePath)
212 ecode := 0
213 applyf := func(stm concurrency.STM) error {
214 if rev := stm.Rev(key + "/"); rev != 0 {
215 ecode = etcdErr.EcodeNotFile
216 return nil
217 }
218 if rev := stm.Rev(key); rev == 0 {
219 ecode = etcdErr.EcodeKeyNotFound
220 return nil
221 }
222 stm.Put(key, newValue, clientv3.WithPrevKV())
223 stm.Put(s.mkActionKey(), store.Update)
224 return nil
225 }
226
227 resp, err := s.newSTM(applyf)
228 if err != nil {
229 return nil, err
230 }
231 if ecode != 0 {
232 return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
233 }
234
235 pkv := prevKeyFromPuts(resp)
236 return &store.Event{
237 Action: store.Update,
238 Node: &store.NodeExtern{
239 Key: nodePath,
240 Value: &newValue,
241 ModifiedIndex: mkV2Rev(resp.Header.Revision),
242 CreatedIndex: mkV2Rev(pkv.CreateRevision),
243 },
244 PrevNode: s.mkV2Node(pkv),
245 EtcdIndex: mkV2Rev(resp.Header.Revision),
246 }, nil
247}
248
249func (s *v2v3Store) Create(
250 nodePath string,
251 dir bool,
252 value string,
253 unique bool,
254 expireOpts store.TTLOptionSet,
255) (*store.Event, error) {
256 if isRoot(nodePath) {
257 return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, nodePath, 0)
258 }
259 if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
260 return nil, errUnsupported
261 }
262 ecode := 0
263 applyf := func(stm concurrency.STM) error {
264 ecode = 0
265 key := s.mkPath(nodePath)
266 if unique {
267 // append unique item under the node path
268 for {
269 key = nodePath + "/" + fmt.Sprintf("%020s", time.Now())
270 key = path.Clean(path.Join("/", key))
271 key = s.mkPath(key)
272 if stm.Rev(key) == 0 {
273 break
274 }
275 }
276 }
277 if stm.Rev(key) > 0 || stm.Rev(key+"/") > 0 {
278 ecode = etcdErr.EcodeNodeExist
279 return nil
280 }
281 // build path if any directories in path do not exist
282 dirs := []string{}
283 for p := path.Dir(nodePath); !isRoot(p); p = path.Dir(p) {
284 pp := s.mkPath(p)
285 if stm.Rev(pp) > 0 {
286 ecode = etcdErr.EcodeNotDir
287 return nil
288 }
289 if stm.Rev(pp+"/") == 0 {
290 dirs = append(dirs, pp+"/")
291 }
292 }
293 for _, d := range dirs {
294 stm.Put(d, "")
295 }
296
297 if dir {
298 // directories marked with extra slash in key name
299 key += "/"
300 }
301 stm.Put(key, value)
302 stm.Put(s.mkActionKey(), store.Create)
303 return nil
304 }
305
306 resp, err := s.newSTM(applyf)
307 if err != nil {
308 return nil, err
309 }
310 if ecode != 0 {
311 return nil, etcdErr.NewError(ecode, nodePath, mkV2Rev(resp.Header.Revision))
312 }
313
314 var v *string
315 if !dir {
316 v = &value
317 }
318
319 return &store.Event{
320 Action: store.Create,
321 Node: &store.NodeExtern{
322 Key: nodePath,
323 Value: v,
324 Dir: dir,
325 ModifiedIndex: mkV2Rev(resp.Header.Revision),
326 CreatedIndex: mkV2Rev(resp.Header.Revision),
327 },
328 EtcdIndex: mkV2Rev(resp.Header.Revision),
329 }, nil
330}
331
332func (s *v2v3Store) CompareAndSwap(
333 nodePath string,
334 prevValue string,
335 prevIndex uint64,
336 value string,
337 expireOpts store.TTLOptionSet,
338) (*store.Event, error) {
339 if isRoot(nodePath) {
340 return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, nodePath, 0)
341 }
342 if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
343 return nil, errUnsupported
344 }
345
346 key := s.mkPath(nodePath)
347 resp, err := s.c.Txn(s.ctx).If(
348 s.mkCompare(nodePath, prevValue, prevIndex)...,
349 ).Then(
350 clientv3.OpPut(key, value, clientv3.WithPrevKV()),
351 clientv3.OpPut(s.mkActionKey(), store.CompareAndSwap),
352 ).Else(
353 clientv3.OpGet(key),
354 clientv3.OpGet(key+"/"),
355 ).Commit()
356
357 if err != nil {
358 return nil, err
359 }
360 if !resp.Succeeded {
361 return nil, compareFail(nodePath, prevValue, prevIndex, resp)
362 }
363
364 pkv := resp.Responses[0].GetResponsePut().PrevKv
365 return &store.Event{
366 Action: store.CompareAndSwap,
367 Node: &store.NodeExtern{
368 Key: nodePath,
369 Value: &value,
370 CreatedIndex: mkV2Rev(pkv.CreateRevision),
371 ModifiedIndex: mkV2Rev(resp.Header.Revision),
372 },
373 PrevNode: s.mkV2Node(pkv),
374 EtcdIndex: mkV2Rev(resp.Header.Revision),
375 }, nil
376}
377
378func (s *v2v3Store) Delete(nodePath string, dir, recursive bool) (*store.Event, error) {
379 if isRoot(nodePath) {
380 return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, nodePath, 0)
381 }
382 if !dir && !recursive {
383 return s.deleteNode(nodePath)
384 }
385 if !recursive {
386 return s.deleteEmptyDir(nodePath)
387 }
388
389 dels := make([]clientv3.Op, maxPathDepth+1)
390 dels[0] = clientv3.OpDelete(s.mkPath(nodePath)+"/", clientv3.WithPrevKV())
391 for i := 1; i < maxPathDepth; i++ {
392 dels[i] = clientv3.OpDelete(s.mkPathDepth(nodePath, i), clientv3.WithPrefix())
393 }
394 dels[maxPathDepth] = clientv3.OpPut(s.mkActionKey(), store.Delete)
395
396 resp, err := s.c.Txn(s.ctx).If(
397 clientv3.Compare(clientv3.Version(s.mkPath(nodePath)+"/"), ">", 0),
398 clientv3.Compare(clientv3.Version(s.mkPathDepth(nodePath, maxPathDepth)+"/"), "=", 0),
399 ).Then(
400 dels...,
401 ).Commit()
402 if err != nil {
403 return nil, err
404 }
405 if !resp.Succeeded {
406 return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, mkV2Rev(resp.Header.Revision))
407 }
408 dresp := resp.Responses[0].GetResponseDeleteRange()
409 return &store.Event{
410 Action: store.Delete,
411 PrevNode: s.mkV2Node(dresp.PrevKvs[0]),
412 EtcdIndex: mkV2Rev(resp.Header.Revision),
413 }, nil
414}
415
416func (s *v2v3Store) deleteEmptyDir(nodePath string) (*store.Event, error) {
417 resp, err := s.c.Txn(s.ctx).If(
418 clientv3.Compare(clientv3.Version(s.mkPathDepth(nodePath, 1)), "=", 0).WithPrefix(),
419 ).Then(
420 clientv3.OpDelete(s.mkPath(nodePath)+"/", clientv3.WithPrevKV()),
421 clientv3.OpPut(s.mkActionKey(), store.Delete),
422 ).Commit()
423 if err != nil {
424 return nil, err
425 }
426 if !resp.Succeeded {
427 return nil, etcdErr.NewError(etcdErr.EcodeDirNotEmpty, nodePath, mkV2Rev(resp.Header.Revision))
428 }
429 dresp := resp.Responses[0].GetResponseDeleteRange()
430 if len(dresp.PrevKvs) == 0 {
431 return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, mkV2Rev(resp.Header.Revision))
432 }
433 return &store.Event{
434 Action: store.Delete,
435 PrevNode: s.mkV2Node(dresp.PrevKvs[0]),
436 EtcdIndex: mkV2Rev(resp.Header.Revision),
437 }, nil
438}
439
440func (s *v2v3Store) deleteNode(nodePath string) (*store.Event, error) {
441 resp, err := s.c.Txn(s.ctx).If(
442 clientv3.Compare(clientv3.Version(s.mkPath(nodePath)+"/"), "=", 0),
443 ).Then(
444 clientv3.OpDelete(s.mkPath(nodePath), clientv3.WithPrevKV()),
445 clientv3.OpPut(s.mkActionKey(), store.Delete),
446 ).Commit()
447 if err != nil {
448 return nil, err
449 }
450 if !resp.Succeeded {
451 return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
452 }
453 pkvs := resp.Responses[0].GetResponseDeleteRange().PrevKvs
454 if len(pkvs) == 0 {
455 return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
456 }
457 pkv := pkvs[0]
458 return &store.Event{
459 Action: store.Delete,
460 Node: &store.NodeExtern{
461 Key: nodePath,
462 CreatedIndex: mkV2Rev(pkv.CreateRevision),
463 ModifiedIndex: mkV2Rev(resp.Header.Revision),
464 },
465 PrevNode: s.mkV2Node(pkv),
466 EtcdIndex: mkV2Rev(resp.Header.Revision),
467 }, nil
468}
469
470func (s *v2v3Store) CompareAndDelete(nodePath, prevValue string, prevIndex uint64) (*store.Event, error) {
471 if isRoot(nodePath) {
472 return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, nodePath, 0)
473 }
474
475 key := s.mkPath(nodePath)
476 resp, err := s.c.Txn(s.ctx).If(
477 s.mkCompare(nodePath, prevValue, prevIndex)...,
478 ).Then(
479 clientv3.OpDelete(key, clientv3.WithPrevKV()),
480 clientv3.OpPut(s.mkActionKey(), store.CompareAndDelete),
481 ).Else(
482 clientv3.OpGet(key),
483 clientv3.OpGet(key+"/"),
484 ).Commit()
485
486 if err != nil {
487 return nil, err
488 }
489 if !resp.Succeeded {
490 return nil, compareFail(nodePath, prevValue, prevIndex, resp)
491 }
492
493 // len(pkvs) > 1 since txn only succeeds when key exists
494 pkv := resp.Responses[0].GetResponseDeleteRange().PrevKvs[0]
495 return &store.Event{
496 Action: store.CompareAndDelete,
497 Node: &store.NodeExtern{
498 Key: nodePath,
499 CreatedIndex: mkV2Rev(pkv.CreateRevision),
500 ModifiedIndex: mkV2Rev(resp.Header.Revision),
501 },
502 PrevNode: s.mkV2Node(pkv),
503 EtcdIndex: mkV2Rev(resp.Header.Revision),
504 }, nil
505}
506
507func compareFail(nodePath, prevValue string, prevIndex uint64, resp *clientv3.TxnResponse) error {
508 if dkvs := resp.Responses[1].GetResponseRange().Kvs; len(dkvs) > 0 {
509 return etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
510 }
511 kvs := resp.Responses[0].GetResponseRange().Kvs
512 if len(kvs) == 0 {
513 return etcdErr.NewError(etcdErr.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
514 }
515 kv := kvs[0]
516 indexMatch := (prevIndex == 0 || kv.ModRevision == int64(prevIndex))
517 valueMatch := (prevValue == "" || string(kv.Value) == prevValue)
518 var cause string
519 switch {
520 case indexMatch && !valueMatch:
521 cause = fmt.Sprintf("[%v != %v]", prevValue, string(kv.Value))
522 case valueMatch && !indexMatch:
523 cause = fmt.Sprintf("[%v != %v]", prevIndex, kv.ModRevision)
524 default:
525 cause = fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, string(kv.Value), prevIndex, kv.ModRevision)
526 }
527 return etcdErr.NewError(etcdErr.EcodeTestFailed, cause, mkV2Rev(resp.Header.Revision))
528}
529
530func (s *v2v3Store) mkCompare(nodePath, prevValue string, prevIndex uint64) []clientv3.Cmp {
531 key := s.mkPath(nodePath)
532 cmps := []clientv3.Cmp{clientv3.Compare(clientv3.Version(key), ">", 0)}
533 if prevIndex != 0 {
534 cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(key), "=", mkV3Rev(prevIndex)))
535 }
536 if prevValue != "" {
537 cmps = append(cmps, clientv3.Compare(clientv3.Value(key), "=", prevValue))
538 }
539 return cmps
540}
541
542func (s *v2v3Store) JsonStats() []byte { panic("STUB") }
543func (s *v2v3Store) DeleteExpiredKeys(cutoff time.Time) { panic("STUB") }
544
545func (s *v2v3Store) Version() int { return 2 }
546
547// TODO: move this out of the Store interface?
548
549func (s *v2v3Store) Save() ([]byte, error) { panic("STUB") }
550func (s *v2v3Store) Recovery(state []byte) error { panic("STUB") }
551func (s *v2v3Store) Clone() store.Store { panic("STUB") }
552func (s *v2v3Store) SaveNoCopy() ([]byte, error) { panic("STUB") }
553func (s *v2v3Store) HasTTLKeys() bool { panic("STUB") }
554
555func (s *v2v3Store) mkPath(nodePath string) string { return s.mkPathDepth(nodePath, 0) }
556
557func (s *v2v3Store) mkNodePath(p string) string {
558 return path.Clean(p[len(s.pfx)+len("/k/000/"):])
559}
560
561// mkPathDepth makes a path to a key that encodes its directory depth
562// for fast directory listing. If a depth is provided, it is added
563// to the computed depth.
564func (s *v2v3Store) mkPathDepth(nodePath string, depth int) string {
565 normalForm := path.Clean(path.Join("/", nodePath))
566 n := strings.Count(normalForm, "/") + depth
567 return fmt.Sprintf("%s/%03d/k/%s", s.pfx, n, normalForm)
568}
569
570func (s *v2v3Store) mkActionKey() string { return s.pfx + "/act" }
571
572func isRoot(s string) bool { return len(s) == 0 || s == "/" || s == "/0" || s == "/1" }
573
574func mkV2Rev(v3Rev int64) uint64 {
575 if v3Rev == 0 {
576 return 0
577 }
578 return uint64(v3Rev - 1)
579}
580
581func mkV3Rev(v2Rev uint64) int64 {
582 if v2Rev == 0 {
583 return 0
584 }
585 return int64(v2Rev + 1)
586}
587
588// mkV2Node creates a V2 NodeExtern from a V3 KeyValue
589func (s *v2v3Store) mkV2Node(kv *mvccpb.KeyValue) *store.NodeExtern {
590 if kv == nil {
591 return nil
592 }
593 n := &store.NodeExtern{
594 Key: string(s.mkNodePath(string(kv.Key))),
595 Dir: kv.Key[len(kv.Key)-1] == '/',
596 CreatedIndex: mkV2Rev(kv.CreateRevision),
597 ModifiedIndex: mkV2Rev(kv.ModRevision),
598 }
599 if !n.Dir {
600 v := string(kv.Value)
601 n.Value = &v
602 }
603 return n
604}
605
606// prevKeyFromPuts gets the prev key that is being put; ignores
607// the put action response.
608func prevKeyFromPuts(resp *clientv3.TxnResponse) *mvccpb.KeyValue {
609 for _, r := range resp.Responses {
610 pkv := r.GetResponsePut().PrevKv
611 if pkv != nil && pkv.CreateRevision > 0 {
612 return pkv
613 }
614 }
615 return nil
616}
617
618func (s *v2v3Store) newSTM(applyf func(concurrency.STM) error) (*clientv3.TxnResponse, error) {
619 return concurrency.NewSTM(s.c, applyf, concurrency.WithIsolation(concurrency.Serializable))
620}