blob: e5e6e57f26262dc54b574008f2995a2b1736325a [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package iradix
2
3import (
4 "bytes"
5 "strings"
6
7 "github.com/hashicorp/golang-lru/simplelru"
8)
9
10const (
11 // defaultModifiedCache is the default size of the modified node
12 // cache used per transaction. This is used to cache the updates
13 // to the nodes near the root, while the leaves do not need to be
14 // cached. This is important for very large transactions to prevent
15 // the modified cache from growing to be enormous. This is also used
16 // to set the max size of the mutation notify maps since those should
17 // also be bounded in a similar way.
18 defaultModifiedCache = 8192
19)
20
21// Tree implements an immutable radix tree. This can be treated as a
22// Dictionary abstract data type. The main advantage over a standard
23// hash map is prefix-based lookups and ordered iteration. The immutability
24// means that it is safe to concurrently read from a Tree without any
25// coordination.
26type Tree struct {
27 root *Node
28 size int
29}
30
31// New returns an empty Tree
32func New() *Tree {
33 t := &Tree{
34 root: &Node{
35 mutateCh: make(chan struct{}),
36 },
37 }
38 return t
39}
40
41// Len is used to return the number of elements in the tree
42func (t *Tree) Len() int {
43 return t.size
44}
45
46// Txn is a transaction on the tree. This transaction is applied
47// atomically and returns a new tree when committed. A transaction
48// is not thread safe, and should only be used by a single goroutine.
49type Txn struct {
50 // root is the modified root for the transaction.
51 root *Node
52
53 // snap is a snapshot of the root node for use if we have to run the
54 // slow notify algorithm.
55 snap *Node
56
57 // size tracks the size of the tree as it is modified during the
58 // transaction.
59 size int
60
61 // writable is a cache of writable nodes that have been created during
62 // the course of the transaction. This allows us to re-use the same
63 // nodes for further writes and avoid unnecessary copies of nodes that
64 // have never been exposed outside the transaction. This will only hold
65 // up to defaultModifiedCache number of entries.
66 writable *simplelru.LRU
67
68 // trackChannels is used to hold channels that need to be notified to
69 // signal mutation of the tree. This will only hold up to
70 // defaultModifiedCache number of entries, after which we will set the
71 // trackOverflow flag, which will cause us to use a more expensive
72 // algorithm to perform the notifications. Mutation tracking is only
73 // performed if trackMutate is true.
74 trackChannels map[chan struct{}]struct{}
75 trackOverflow bool
76 trackMutate bool
77}
78
79// Txn starts a new transaction that can be used to mutate the tree
80func (t *Tree) Txn() *Txn {
81 txn := &Txn{
82 root: t.root,
83 snap: t.root,
84 size: t.size,
85 }
86 return txn
87}
88
89// TrackMutate can be used to toggle if mutations are tracked. If this is enabled
90// then notifications will be issued for affected internal nodes and leaves when
91// the transaction is committed.
92func (t *Txn) TrackMutate(track bool) {
93 t.trackMutate = track
94}
95
96// trackChannel safely attempts to track the given mutation channel, setting the
97// overflow flag if we can no longer track any more. This limits the amount of
98// state that will accumulate during a transaction and we have a slower algorithm
99// to switch to if we overflow.
100func (t *Txn) trackChannel(ch chan struct{}) {
101 // In overflow, make sure we don't store any more objects.
102 if t.trackOverflow {
103 return
104 }
105
106 // If this would overflow the state we reject it and set the flag (since
107 // we aren't tracking everything that's required any longer).
108 if len(t.trackChannels) >= defaultModifiedCache {
109 // Mark that we are in the overflow state
110 t.trackOverflow = true
111
112 // Clear the map so that the channels can be garbage collected. It is
113 // safe to do this since we have already overflowed and will be using
114 // the slow notify algorithm.
115 t.trackChannels = nil
116 return
117 }
118
119 // Create the map on the fly when we need it.
120 if t.trackChannels == nil {
121 t.trackChannels = make(map[chan struct{}]struct{})
122 }
123
124 // Otherwise we are good to track it.
125 t.trackChannels[ch] = struct{}{}
126}
127
128// writeNode returns a node to be modified, if the current node has already been
129// modified during the course of the transaction, it is used in-place. Set
130// forLeafUpdate to true if you are getting a write node to update the leaf,
131// which will set leaf mutation tracking appropriately as well.
132func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node {
133 // Ensure the writable set exists.
134 if t.writable == nil {
135 lru, err := simplelru.NewLRU(defaultModifiedCache, nil)
136 if err != nil {
137 panic(err)
138 }
139 t.writable = lru
140 }
141
142 // If this node has already been modified, we can continue to use it
143 // during this transaction. We know that we don't need to track it for
144 // a node update since the node is writable, but if this is for a leaf
145 // update we track it, in case the initial write to this node didn't
146 // update the leaf.
147 if _, ok := t.writable.Get(n); ok {
148 if t.trackMutate && forLeafUpdate && n.leaf != nil {
149 t.trackChannel(n.leaf.mutateCh)
150 }
151 return n
152 }
153
154 // Mark this node as being mutated.
155 if t.trackMutate {
156 t.trackChannel(n.mutateCh)
157 }
158
159 // Mark its leaf as being mutated, if appropriate.
160 if t.trackMutate && forLeafUpdate && n.leaf != nil {
161 t.trackChannel(n.leaf.mutateCh)
162 }
163
164 // Copy the existing node. If you have set forLeafUpdate it will be
165 // safe to replace this leaf with another after you get your node for
166 // writing. You MUST replace it, because the channel associated with
167 // this leaf will be closed when this transaction is committed.
168 nc := &Node{
169 mutateCh: make(chan struct{}),
170 leaf: n.leaf,
171 }
172 if n.prefix != nil {
173 nc.prefix = make([]byte, len(n.prefix))
174 copy(nc.prefix, n.prefix)
175 }
176 if len(n.edges) != 0 {
177 nc.edges = make([]edge, len(n.edges))
178 copy(nc.edges, n.edges)
179 }
180
181 // Mark this node as writable.
182 t.writable.Add(nc, nil)
183 return nc
184}
185
186// Visit all the nodes in the tree under n, and add their mutateChannels to the transaction
187// Returns the size of the subtree visited
188func (t *Txn) trackChannelsAndCount(n *Node) int {
189 // Count only leaf nodes
190 leaves := 0
191 if n.leaf != nil {
192 leaves = 1
193 }
194 // Mark this node as being mutated.
195 if t.trackMutate {
196 t.trackChannel(n.mutateCh)
197 }
198
199 // Mark its leaf as being mutated, if appropriate.
200 if t.trackMutate && n.leaf != nil {
201 t.trackChannel(n.leaf.mutateCh)
202 }
203
204 // Recurse on the children
205 for _, e := range n.edges {
206 leaves += t.trackChannelsAndCount(e.node)
207 }
208 return leaves
209}
210
211// mergeChild is called to collapse the given node with its child. This is only
212// called when the given node is not a leaf and has a single edge.
213func (t *Txn) mergeChild(n *Node) {
214 // Mark the child node as being mutated since we are about to abandon
215 // it. We don't need to mark the leaf since we are retaining it if it
216 // is there.
217 e := n.edges[0]
218 child := e.node
219 if t.trackMutate {
220 t.trackChannel(child.mutateCh)
221 }
222
223 // Merge the nodes.
224 n.prefix = concat(n.prefix, child.prefix)
225 n.leaf = child.leaf
226 if len(child.edges) != 0 {
227 n.edges = make([]edge, len(child.edges))
228 copy(n.edges, child.edges)
229 } else {
230 n.edges = nil
231 }
232}
233
234// insert does a recursive insertion
235func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface{}, bool) {
236 // Handle key exhaustion
237 if len(search) == 0 {
238 var oldVal interface{}
239 didUpdate := false
240 if n.isLeaf() {
241 oldVal = n.leaf.val
242 didUpdate = true
243 }
244
245 nc := t.writeNode(n, true)
246 nc.leaf = &leafNode{
247 mutateCh: make(chan struct{}),
248 key: k,
249 val: v,
250 }
251 return nc, oldVal, didUpdate
252 }
253
254 // Look for the edge
255 idx, child := n.getEdge(search[0])
256
257 // No edge, create one
258 if child == nil {
259 e := edge{
260 label: search[0],
261 node: &Node{
262 mutateCh: make(chan struct{}),
263 leaf: &leafNode{
264 mutateCh: make(chan struct{}),
265 key: k,
266 val: v,
267 },
268 prefix: search,
269 },
270 }
271 nc := t.writeNode(n, false)
272 nc.addEdge(e)
273 return nc, nil, false
274 }
275
276 // Determine longest prefix of the search key on match
277 commonPrefix := longestPrefix(search, child.prefix)
278 if commonPrefix == len(child.prefix) {
279 search = search[commonPrefix:]
280 newChild, oldVal, didUpdate := t.insert(child, k, search, v)
281 if newChild != nil {
282 nc := t.writeNode(n, false)
283 nc.edges[idx].node = newChild
284 return nc, oldVal, didUpdate
285 }
286 return nil, oldVal, didUpdate
287 }
288
289 // Split the node
290 nc := t.writeNode(n, false)
291 splitNode := &Node{
292 mutateCh: make(chan struct{}),
293 prefix: search[:commonPrefix],
294 }
295 nc.replaceEdge(edge{
296 label: search[0],
297 node: splitNode,
298 })
299
300 // Restore the existing child node
301 modChild := t.writeNode(child, false)
302 splitNode.addEdge(edge{
303 label: modChild.prefix[commonPrefix],
304 node: modChild,
305 })
306 modChild.prefix = modChild.prefix[commonPrefix:]
307
308 // Create a new leaf node
309 leaf := &leafNode{
310 mutateCh: make(chan struct{}),
311 key: k,
312 val: v,
313 }
314
315 // If the new key is a subset, add to to this node
316 search = search[commonPrefix:]
317 if len(search) == 0 {
318 splitNode.leaf = leaf
319 return nc, nil, false
320 }
321
322 // Create a new edge for the node
323 splitNode.addEdge(edge{
324 label: search[0],
325 node: &Node{
326 mutateCh: make(chan struct{}),
327 leaf: leaf,
328 prefix: search,
329 },
330 })
331 return nc, nil, false
332}
333
334// delete does a recursive deletion
335func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
336 // Check for key exhaustion
337 if len(search) == 0 {
338 if !n.isLeaf() {
339 return nil, nil
340 }
341 // Copy the pointer in case we are in a transaction that already
342 // modified this node since the node will be reused. Any changes
343 // made to the node will not affect returning the original leaf
344 // value.
345 oldLeaf := n.leaf
346
347 // Remove the leaf node
348 nc := t.writeNode(n, true)
349 nc.leaf = nil
350
351 // Check if this node should be merged
352 if n != t.root && len(nc.edges) == 1 {
353 t.mergeChild(nc)
354 }
355 return nc, oldLeaf
356 }
357
358 // Look for an edge
359 label := search[0]
360 idx, child := n.getEdge(label)
361 if child == nil || !bytes.HasPrefix(search, child.prefix) {
362 return nil, nil
363 }
364
365 // Consume the search prefix
366 search = search[len(child.prefix):]
367 newChild, leaf := t.delete(n, child, search)
368 if newChild == nil {
369 return nil, nil
370 }
371
372 // Copy this node. WATCH OUT - it's safe to pass "false" here because we
373 // will only ADD a leaf via nc.mergeChild() if there isn't one due to
374 // the !nc.isLeaf() check in the logic just below. This is pretty subtle,
375 // so be careful if you change any of the logic here.
376 nc := t.writeNode(n, false)
377
378 // Delete the edge if the node has no edges
379 if newChild.leaf == nil && len(newChild.edges) == 0 {
380 nc.delEdge(label)
381 if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
382 t.mergeChild(nc)
383 }
384 } else {
385 nc.edges[idx].node = newChild
386 }
387 return nc, leaf
388}
389
390// delete does a recursive deletion
391func (t *Txn) deletePrefix(parent, n *Node, search []byte) (*Node, int) {
392 // Check for key exhaustion
393 if len(search) == 0 {
394 nc := t.writeNode(n, true)
395 if n.isLeaf() {
396 nc.leaf = nil
397 }
398 nc.edges = nil
399 return nc, t.trackChannelsAndCount(n)
400 }
401
402 // Look for an edge
403 label := search[0]
404 idx, child := n.getEdge(label)
405 // We make sure that either the child node's prefix starts with the search term, or the search term starts with the child node's prefix
406 // Need to do both so that we can delete prefixes that don't correspond to any node in the tree
407 if child == nil || (!bytes.HasPrefix(child.prefix, search) && !bytes.HasPrefix(search, child.prefix)) {
408 return nil, 0
409 }
410
411 // Consume the search prefix
412 if len(child.prefix) > len(search) {
413 search = []byte("")
414 } else {
415 search = search[len(child.prefix):]
416 }
417 newChild, numDeletions := t.deletePrefix(n, child, search)
418 if newChild == nil {
419 return nil, 0
420 }
421 // Copy this node. WATCH OUT - it's safe to pass "false" here because we
422 // will only ADD a leaf via nc.mergeChild() if there isn't one due to
423 // the !nc.isLeaf() check in the logic just below. This is pretty subtle,
424 // so be careful if you change any of the logic here.
425
426 nc := t.writeNode(n, false)
427
428 // Delete the edge if the node has no edges
429 if newChild.leaf == nil && len(newChild.edges) == 0 {
430 nc.delEdge(label)
431 if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
432 t.mergeChild(nc)
433 }
434 } else {
435 nc.edges[idx].node = newChild
436 }
437 return nc, numDeletions
438}
439
440// Insert is used to add or update a given key. The return provides
441// the previous value and a bool indicating if any was set.
442func (t *Txn) Insert(k []byte, v interface{}) (interface{}, bool) {
443 newRoot, oldVal, didUpdate := t.insert(t.root, k, k, v)
444 if newRoot != nil {
445 t.root = newRoot
446 }
447 if !didUpdate {
448 t.size++
449 }
450 return oldVal, didUpdate
451}
452
453// Delete is used to delete a given key. Returns the old value if any,
454// and a bool indicating if the key was set.
455func (t *Txn) Delete(k []byte) (interface{}, bool) {
456 newRoot, leaf := t.delete(nil, t.root, k)
457 if newRoot != nil {
458 t.root = newRoot
459 }
460 if leaf != nil {
461 t.size--
462 return leaf.val, true
463 }
464 return nil, false
465}
466
467// DeletePrefix is used to delete an entire subtree that matches the prefix
468// This will delete all nodes under that prefix
469func (t *Txn) DeletePrefix(prefix []byte) bool {
470 newRoot, numDeletions := t.deletePrefix(nil, t.root, prefix)
471 if newRoot != nil {
472 t.root = newRoot
473 t.size = t.size - numDeletions
474 return true
475 }
476 return false
477
478}
479
480// Root returns the current root of the radix tree within this
481// transaction. The root is not safe across insert and delete operations,
482// but can be used to read the current state during a transaction.
483func (t *Txn) Root() *Node {
484 return t.root
485}
486
487// Get is used to lookup a specific key, returning
488// the value and if it was found
489func (t *Txn) Get(k []byte) (interface{}, bool) {
490 return t.root.Get(k)
491}
492
493// GetWatch is used to lookup a specific key, returning
494// the watch channel, value and if it was found
495func (t *Txn) GetWatch(k []byte) (<-chan struct{}, interface{}, bool) {
496 return t.root.GetWatch(k)
497}
498
499// Commit is used to finalize the transaction and return a new tree. If mutation
500// tracking is turned on then notifications will also be issued.
501func (t *Txn) Commit() *Tree {
502 nt := t.CommitOnly()
503 if t.trackMutate {
504 t.Notify()
505 }
506 return nt
507}
508
509// CommitOnly is used to finalize the transaction and return a new tree, but
510// does not issue any notifications until Notify is called.
511func (t *Txn) CommitOnly() *Tree {
512 nt := &Tree{t.root, t.size}
513 t.writable = nil
514 return nt
515}
516
517// slowNotify does a complete comparison of the before and after trees in order
518// to trigger notifications. This doesn't require any additional state but it
519// is very expensive to compute.
520func (t *Txn) slowNotify() {
521 snapIter := t.snap.rawIterator()
522 rootIter := t.root.rawIterator()
523 for snapIter.Front() != nil || rootIter.Front() != nil {
524 // If we've exhausted the nodes in the old snapshot, we know
525 // there's nothing remaining to notify.
526 if snapIter.Front() == nil {
527 return
528 }
529 snapElem := snapIter.Front()
530
531 // If we've exhausted the nodes in the new root, we know we need
532 // to invalidate everything that remains in the old snapshot. We
533 // know from the loop condition there's something in the old
534 // snapshot.
535 if rootIter.Front() == nil {
536 close(snapElem.mutateCh)
537 if snapElem.isLeaf() {
538 close(snapElem.leaf.mutateCh)
539 }
540 snapIter.Next()
541 continue
542 }
543
544 // Do one string compare so we can check the various conditions
545 // below without repeating the compare.
546 cmp := strings.Compare(snapIter.Path(), rootIter.Path())
547
548 // If the snapshot is behind the root, then we must have deleted
549 // this node during the transaction.
550 if cmp < 0 {
551 close(snapElem.mutateCh)
552 if snapElem.isLeaf() {
553 close(snapElem.leaf.mutateCh)
554 }
555 snapIter.Next()
556 continue
557 }
558
559 // If the snapshot is ahead of the root, then we must have added
560 // this node during the transaction.
561 if cmp > 0 {
562 rootIter.Next()
563 continue
564 }
565
566 // If we have the same path, then we need to see if we mutated a
567 // node and possibly the leaf.
568 rootElem := rootIter.Front()
569 if snapElem != rootElem {
570 close(snapElem.mutateCh)
571 if snapElem.leaf != nil && (snapElem.leaf != rootElem.leaf) {
572 close(snapElem.leaf.mutateCh)
573 }
574 }
575 snapIter.Next()
576 rootIter.Next()
577 }
578}
579
580// Notify is used along with TrackMutate to trigger notifications. This must
581// only be done once a transaction is committed via CommitOnly, and it is called
582// automatically by Commit.
583func (t *Txn) Notify() {
584 if !t.trackMutate {
585 return
586 }
587
588 // If we've overflowed the tracking state we can't use it in any way and
589 // need to do a full tree compare.
590 if t.trackOverflow {
591 t.slowNotify()
592 } else {
593 for ch := range t.trackChannels {
594 close(ch)
595 }
596 }
597
598 // Clean up the tracking state so that a re-notify is safe (will trigger
599 // the else clause above which will be a no-op).
600 t.trackChannels = nil
601 t.trackOverflow = false
602}
603
604// Insert is used to add or update a given key. The return provides
605// the new tree, previous value and a bool indicating if any was set.
606func (t *Tree) Insert(k []byte, v interface{}) (*Tree, interface{}, bool) {
607 txn := t.Txn()
608 old, ok := txn.Insert(k, v)
609 return txn.Commit(), old, ok
610}
611
612// Delete is used to delete a given key. Returns the new tree,
613// old value if any, and a bool indicating if the key was set.
614func (t *Tree) Delete(k []byte) (*Tree, interface{}, bool) {
615 txn := t.Txn()
616 old, ok := txn.Delete(k)
617 return txn.Commit(), old, ok
618}
619
620// DeletePrefix is used to delete all nodes starting with a given prefix. Returns the new tree,
621// and a bool indicating if the prefix matched any nodes
622func (t *Tree) DeletePrefix(k []byte) (*Tree, bool) {
623 txn := t.Txn()
624 ok := txn.DeletePrefix(k)
625 return txn.Commit(), ok
626}
627
628// Root returns the root node of the tree which can be used for richer
629// query operations.
630func (t *Tree) Root() *Node {
631 return t.root
632}
633
634// Get is used to lookup a specific key, returning
635// the value and if it was found
636func (t *Tree) Get(k []byte) (interface{}, bool) {
637 return t.root.Get(k)
638}
639
640// longestPrefix finds the length of the shared prefix
641// of two strings
642func longestPrefix(k1, k2 []byte) int {
643 max := len(k1)
644 if l := len(k2); l < max {
645 max = l
646 }
647 var i int
648 for i = 0; i < max; i++ {
649 if k1[i] != k2[i] {
650 break
651 }
652 }
653 return i
654}
655
656// concat two byte slices, returning a third new copy
657func concat(a, b []byte) []byte {
658 c := make([]byte, len(a)+len(b))
659 copy(c, a)
660 copy(c[len(a):], b)
661 return c
662}