blob: edf7f21942b9a1ee626b101ead8b1a1a2da9dfbb [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package store
16
17import (
18 "encoding/json"
19 "fmt"
20 "path"
21 "strconv"
22 "strings"
23 "sync"
24 "time"
25
26 etcdErr "github.com/coreos/etcd/error"
27 "github.com/coreos/etcd/pkg/types"
28 "github.com/jonboulle/clockwork"
29)
30
31// The default version to set when the store is first initialized.
32const defaultVersion = 2
33
34var minExpireTime time.Time
35
36func init() {
37 minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
38}
39
40type Store interface {
41 Version() int
42 Index() uint64
43
44 Get(nodePath string, recursive, sorted bool) (*Event, error)
45 Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error)
46 Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error)
47 Create(nodePath string, dir bool, value string, unique bool,
48 expireOpts TTLOptionSet) (*Event, error)
49 CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
50 value string, expireOpts TTLOptionSet) (*Event, error)
51 Delete(nodePath string, dir, recursive bool) (*Event, error)
52 CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
53
54 Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
55
56 Save() ([]byte, error)
57 Recovery(state []byte) error
58
59 Clone() Store
60 SaveNoCopy() ([]byte, error)
61
62 JsonStats() []byte
63 DeleteExpiredKeys(cutoff time.Time)
64
65 HasTTLKeys() bool
66}
67
68type TTLOptionSet struct {
69 ExpireTime time.Time
70 Refresh bool
71}
72
73type store struct {
74 Root *node
75 WatcherHub *watcherHub
76 CurrentIndex uint64
77 Stats *Stats
78 CurrentVersion int
79 ttlKeyHeap *ttlKeyHeap // need to recovery manually
80 worldLock sync.RWMutex // stop the world lock
81 clock clockwork.Clock
82 readonlySet types.Set
83}
84
85// New creates a store where the given namespaces will be created as initial directories.
86func New(namespaces ...string) Store {
87 s := newStore(namespaces...)
88 s.clock = clockwork.NewRealClock()
89 return s
90}
91
92func newStore(namespaces ...string) *store {
93 s := new(store)
94 s.CurrentVersion = defaultVersion
95 s.Root = newDir(s, "/", s.CurrentIndex, nil, Permanent)
96 for _, namespace := range namespaces {
97 s.Root.Add(newDir(s, namespace, s.CurrentIndex, s.Root, Permanent))
98 }
99 s.Stats = newStats()
100 s.WatcherHub = newWatchHub(1000)
101 s.ttlKeyHeap = newTtlKeyHeap()
102 s.readonlySet = types.NewUnsafeSet(append(namespaces, "/")...)
103 return s
104}
105
106// Version retrieves current version of the store.
107func (s *store) Version() int {
108 return s.CurrentVersion
109}
110
111// Index retrieves the current index of the store.
112func (s *store) Index() uint64 {
113 s.worldLock.RLock()
114 defer s.worldLock.RUnlock()
115 return s.CurrentIndex
116}
117
118// Get returns a get event.
119// If recursive is true, it will return all the content under the node path.
120// If sorted is true, it will sort the content by keys.
121func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
122 var err *etcdErr.Error
123
124 s.worldLock.RLock()
125 defer s.worldLock.RUnlock()
126
127 defer func() {
128 if err == nil {
129 s.Stats.Inc(GetSuccess)
130 if recursive {
131 reportReadSuccess(GetRecursive)
132 } else {
133 reportReadSuccess(Get)
134 }
135 return
136 }
137
138 s.Stats.Inc(GetFail)
139 if recursive {
140 reportReadFailure(GetRecursive)
141 } else {
142 reportReadFailure(Get)
143 }
144 }()
145
146 n, err := s.internalGet(nodePath)
147 if err != nil {
148 return nil, err
149 }
150
151 e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
152 e.EtcdIndex = s.CurrentIndex
153 e.Node.loadInternalNode(n, recursive, sorted, s.clock)
154
155 return e, nil
156}
157
158// Create creates the node at nodePath. Create will help to create intermediate directories with no ttl.
159// If the node has already existed, create will fail.
160// If any node on the path is a file, create will fail.
161func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) {
162 var err *etcdErr.Error
163
164 s.worldLock.Lock()
165 defer s.worldLock.Unlock()
166
167 defer func() {
168 if err == nil {
169 s.Stats.Inc(CreateSuccess)
170 reportWriteSuccess(Create)
171 return
172 }
173
174 s.Stats.Inc(CreateFail)
175 reportWriteFailure(Create)
176 }()
177
178 e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create)
179 if err != nil {
180 return nil, err
181 }
182
183 e.EtcdIndex = s.CurrentIndex
184 s.WatcherHub.notify(e)
185
186 return e, nil
187}
188
189// Set creates or replace the node at nodePath.
190func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) {
191 var err *etcdErr.Error
192
193 s.worldLock.Lock()
194 defer s.worldLock.Unlock()
195
196 defer func() {
197 if err == nil {
198 s.Stats.Inc(SetSuccess)
199 reportWriteSuccess(Set)
200 return
201 }
202
203 s.Stats.Inc(SetFail)
204 reportWriteFailure(Set)
205 }()
206
207 // Get prevNode value
208 n, getErr := s.internalGet(nodePath)
209 if getErr != nil && getErr.ErrorCode != etcdErr.EcodeKeyNotFound {
210 err = getErr
211 return nil, err
212 }
213
214 if expireOpts.Refresh {
215 if getErr != nil {
216 err = getErr
217 return nil, err
218 } else {
219 value = n.Value
220 }
221 }
222
223 // Set new value
224 e, err := s.internalCreate(nodePath, dir, value, false, true, expireOpts.ExpireTime, Set)
225 if err != nil {
226 return nil, err
227 }
228 e.EtcdIndex = s.CurrentIndex
229
230 // Put prevNode into event
231 if getErr == nil {
232 prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
233 prev.Node.loadInternalNode(n, false, false, s.clock)
234 e.PrevNode = prev.Node
235 }
236
237 if !expireOpts.Refresh {
238 s.WatcherHub.notify(e)
239 } else {
240 e.SetRefresh()
241 s.WatcherHub.add(e)
242 }
243
244 return e, nil
245}
246
247// returns user-readable cause of failed comparison
248func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string {
249 switch which {
250 case CompareIndexNotMatch:
251 return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex)
252 case CompareValueNotMatch:
253 return fmt.Sprintf("[%v != %v]", prevValue, n.Value)
254 default:
255 return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
256 }
257}
258
259func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
260 value string, expireOpts TTLOptionSet) (*Event, error) {
261
262 var err *etcdErr.Error
263
264 s.worldLock.Lock()
265 defer s.worldLock.Unlock()
266
267 defer func() {
268 if err == nil {
269 s.Stats.Inc(CompareAndSwapSuccess)
270 reportWriteSuccess(CompareAndSwap)
271 return
272 }
273
274 s.Stats.Inc(CompareAndSwapFail)
275 reportWriteFailure(CompareAndSwap)
276 }()
277
278 nodePath = path.Clean(path.Join("/", nodePath))
279 // we do not allow the user to change "/"
280 if s.readonlySet.Contains(nodePath) {
281 return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
282 }
283
284 n, err := s.internalGet(nodePath)
285 if err != nil {
286 return nil, err
287 }
288 if n.IsDir() { // can only compare and swap file
289 err = etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
290 return nil, err
291 }
292
293 // If both of the prevValue and prevIndex are given, we will test both of them.
294 // Command will be executed, only if both of the tests are successful.
295 if ok, which := n.Compare(prevValue, prevIndex); !ok {
296 cause := getCompareFailCause(n, which, prevValue, prevIndex)
297 err = etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
298 return nil, err
299 }
300
301 if expireOpts.Refresh {
302 value = n.Value
303 }
304
305 // update etcd index
306 s.CurrentIndex++
307
308 e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
309 e.EtcdIndex = s.CurrentIndex
310 e.PrevNode = n.Repr(false, false, s.clock)
311 eNode := e.Node
312
313 // if test succeed, write the value
314 n.Write(value, s.CurrentIndex)
315 n.UpdateTTL(expireOpts.ExpireTime)
316
317 // copy the value for safety
318 valueCopy := value
319 eNode.Value = &valueCopy
320 eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
321
322 if !expireOpts.Refresh {
323 s.WatcherHub.notify(e)
324 } else {
325 e.SetRefresh()
326 s.WatcherHub.add(e)
327 }
328
329 return e, nil
330}
331
332// Delete deletes the node at the given path.
333// If the node is a directory, recursive must be true to delete it.
334func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
335 var err *etcdErr.Error
336
337 s.worldLock.Lock()
338 defer s.worldLock.Unlock()
339
340 defer func() {
341 if err == nil {
342 s.Stats.Inc(DeleteSuccess)
343 reportWriteSuccess(Delete)
344 return
345 }
346
347 s.Stats.Inc(DeleteFail)
348 reportWriteFailure(Delete)
349 }()
350
351 nodePath = path.Clean(path.Join("/", nodePath))
352 // we do not allow the user to change "/"
353 if s.readonlySet.Contains(nodePath) {
354 return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
355 }
356
357 // recursive implies dir
358 if recursive {
359 dir = true
360 }
361
362 n, err := s.internalGet(nodePath)
363 if err != nil { // if the node does not exist, return error
364 return nil, err
365 }
366
367 nextIndex := s.CurrentIndex + 1
368 e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
369 e.EtcdIndex = nextIndex
370 e.PrevNode = n.Repr(false, false, s.clock)
371 eNode := e.Node
372
373 if n.IsDir() {
374 eNode.Dir = true
375 }
376
377 callback := func(path string) { // notify function
378 // notify the watchers with deleted set true
379 s.WatcherHub.notifyWatchers(e, path, true)
380 }
381
382 err = n.Remove(dir, recursive, callback)
383 if err != nil {
384 return nil, err
385 }
386
387 // update etcd index
388 s.CurrentIndex++
389
390 s.WatcherHub.notify(e)
391
392 return e, nil
393}
394
395func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
396 var err *etcdErr.Error
397
398 s.worldLock.Lock()
399 defer s.worldLock.Unlock()
400
401 defer func() {
402 if err == nil {
403 s.Stats.Inc(CompareAndDeleteSuccess)
404 reportWriteSuccess(CompareAndDelete)
405 return
406 }
407
408 s.Stats.Inc(CompareAndDeleteFail)
409 reportWriteFailure(CompareAndDelete)
410 }()
411
412 nodePath = path.Clean(path.Join("/", nodePath))
413
414 n, err := s.internalGet(nodePath)
415 if err != nil { // if the node does not exist, return error
416 return nil, err
417 }
418 if n.IsDir() { // can only compare and delete file
419 return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
420 }
421
422 // If both of the prevValue and prevIndex are given, we will test both of them.
423 // Command will be executed, only if both of the tests are successful.
424 if ok, which := n.Compare(prevValue, prevIndex); !ok {
425 cause := getCompareFailCause(n, which, prevValue, prevIndex)
426 return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
427 }
428
429 // update etcd index
430 s.CurrentIndex++
431
432 e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
433 e.EtcdIndex = s.CurrentIndex
434 e.PrevNode = n.Repr(false, false, s.clock)
435
436 callback := func(path string) { // notify function
437 // notify the watchers with deleted set true
438 s.WatcherHub.notifyWatchers(e, path, true)
439 }
440
441 err = n.Remove(false, false, callback)
442 if err != nil {
443 return nil, err
444 }
445
446 s.WatcherHub.notify(e)
447
448 return e, nil
449}
450
451func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
452 s.worldLock.RLock()
453 defer s.worldLock.RUnlock()
454
455 key = path.Clean(path.Join("/", key))
456 if sinceIndex == 0 {
457 sinceIndex = s.CurrentIndex + 1
458 }
459 // WatcherHub does not know about the current index, so we need to pass it in
460 w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
461 if err != nil {
462 return nil, err
463 }
464
465 return w, nil
466}
467
468// walk walks all the nodePath and apply the walkFunc on each directory
469func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {
470 components := strings.Split(nodePath, "/")
471
472 curr := s.Root
473 var err *etcdErr.Error
474
475 for i := 1; i < len(components); i++ {
476 if len(components[i]) == 0 { // ignore empty string
477 return curr, nil
478 }
479
480 curr, err = walkFunc(curr, components[i])
481 if err != nil {
482 return nil, err
483 }
484 }
485
486 return curr, nil
487}
488
489// Update updates the value/ttl of the node.
490// If the node is a file, the value and the ttl can be updated.
491// If the node is a directory, only the ttl can be updated.
492func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) {
493 var err *etcdErr.Error
494
495 s.worldLock.Lock()
496 defer s.worldLock.Unlock()
497
498 defer func() {
499 if err == nil {
500 s.Stats.Inc(UpdateSuccess)
501 reportWriteSuccess(Update)
502 return
503 }
504
505 s.Stats.Inc(UpdateFail)
506 reportWriteFailure(Update)
507 }()
508
509 nodePath = path.Clean(path.Join("/", nodePath))
510 // we do not allow the user to change "/"
511 if s.readonlySet.Contains(nodePath) {
512 return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
513 }
514
515 currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
516
517 n, err := s.internalGet(nodePath)
518 if err != nil { // if the node does not exist, return error
519 return nil, err
520 }
521 if n.IsDir() && len(newValue) != 0 {
522 // if the node is a directory, we cannot update value to non-empty
523 return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
524 }
525
526 if expireOpts.Refresh {
527 newValue = n.Value
528 }
529
530 e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
531 e.EtcdIndex = nextIndex
532 e.PrevNode = n.Repr(false, false, s.clock)
533 eNode := e.Node
534
535 n.Write(newValue, nextIndex)
536
537 if n.IsDir() {
538 eNode.Dir = true
539 } else {
540 // copy the value for safety
541 newValueCopy := newValue
542 eNode.Value = &newValueCopy
543 }
544
545 // update ttl
546 n.UpdateTTL(expireOpts.ExpireTime)
547
548 eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
549
550 if !expireOpts.Refresh {
551 s.WatcherHub.notify(e)
552 } else {
553 e.SetRefresh()
554 s.WatcherHub.add(e)
555 }
556
557 s.CurrentIndex = nextIndex
558
559 return e, nil
560}
561
562func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
563 expireTime time.Time, action string) (*Event, *etcdErr.Error) {
564
565 currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
566
567 if unique { // append unique item under the node path
568 nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10))
569 }
570
571 nodePath = path.Clean(path.Join("/", nodePath))
572
573 // we do not allow the user to change "/"
574 if s.readonlySet.Contains(nodePath) {
575 return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
576 }
577
578 // Assume expire times that are way in the past are
579 // This can occur when the time is serialized to JS
580 if expireTime.Before(minExpireTime) {
581 expireTime = Permanent
582 }
583
584 dirName, nodeName := path.Split(nodePath)
585
586 // walk through the nodePath, create dirs and get the last directory node
587 d, err := s.walk(dirName, s.checkDir)
588
589 if err != nil {
590 s.Stats.Inc(SetFail)
591 reportWriteFailure(action)
592 err.Index = currIndex
593 return nil, err
594 }
595
596 e := newEvent(action, nodePath, nextIndex, nextIndex)
597 eNode := e.Node
598
599 n, _ := d.GetChild(nodeName)
600
601 // force will try to replace an existing file
602 if n != nil {
603 if replace {
604 if n.IsDir() {
605 return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
606 }
607 e.PrevNode = n.Repr(false, false, s.clock)
608
609 n.Remove(false, false, nil)
610 } else {
611 return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
612 }
613 }
614
615 if !dir { // create file
616 // copy the value for safety
617 valueCopy := value
618 eNode.Value = &valueCopy
619
620 n = newKV(s, nodePath, value, nextIndex, d, expireTime)
621
622 } else { // create directory
623 eNode.Dir = true
624
625 n = newDir(s, nodePath, nextIndex, d, expireTime)
626 }
627
628 // we are sure d is a directory and does not have the children with name n.Name
629 d.Add(n)
630
631 // node with TTL
632 if !n.IsPermanent() {
633 s.ttlKeyHeap.push(n)
634
635 eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
636 }
637
638 s.CurrentIndex = nextIndex
639
640 return e, nil
641}
642
643// InternalGet gets the node of the given nodePath.
644func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
645 nodePath = path.Clean(path.Join("/", nodePath))
646
647 walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
648
649 if !parent.IsDir() {
650 err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
651 return nil, err
652 }
653
654 child, ok := parent.Children[name]
655 if ok {
656 return child, nil
657 }
658
659 return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
660 }
661
662 f, err := s.walk(nodePath, walkFunc)
663
664 if err != nil {
665 return nil, err
666 }
667 return f, nil
668}
669
670// DeleteExpiredKeys will delete all expired keys
671func (s *store) DeleteExpiredKeys(cutoff time.Time) {
672 s.worldLock.Lock()
673 defer s.worldLock.Unlock()
674
675 for {
676 node := s.ttlKeyHeap.top()
677 if node == nil || node.ExpireTime.After(cutoff) {
678 break
679 }
680
681 s.CurrentIndex++
682 e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
683 e.EtcdIndex = s.CurrentIndex
684 e.PrevNode = node.Repr(false, false, s.clock)
685 if node.IsDir() {
686 e.Node.Dir = true
687 }
688
689 callback := func(path string) { // notify function
690 // notify the watchers with deleted set true
691 s.WatcherHub.notifyWatchers(e, path, true)
692 }
693
694 s.ttlKeyHeap.pop()
695 node.Remove(true, true, callback)
696
697 reportExpiredKey()
698 s.Stats.Inc(ExpireCount)
699
700 s.WatcherHub.notify(e)
701 }
702
703}
704
705// checkDir will check whether the component is a directory under parent node.
706// If it is a directory, this function will return the pointer to that node.
707// If it does not exist, this function will create a new directory and return the pointer to that node.
708// If it is a file, this function will return error.
709func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
710 node, ok := parent.Children[dirName]
711
712 if ok {
713 if node.IsDir() {
714 return node, nil
715 }
716
717 return nil, etcdErr.NewError(etcdErr.EcodeNotDir, node.Path, s.CurrentIndex)
718 }
719
720 n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent)
721
722 parent.Children[dirName] = n
723
724 return n, nil
725}
726
727// Save saves the static state of the store system.
728// It will not be able to save the state of watchers.
729// It will not save the parent field of the node. Or there will
730// be cyclic dependencies issue for the json package.
731func (s *store) Save() ([]byte, error) {
732 b, err := json.Marshal(s.Clone())
733 if err != nil {
734 return nil, err
735 }
736
737 return b, nil
738}
739
740func (s *store) SaveNoCopy() ([]byte, error) {
741 b, err := json.Marshal(s)
742 if err != nil {
743 return nil, err
744 }
745
746 return b, nil
747}
748
749func (s *store) Clone() Store {
750 s.worldLock.Lock()
751
752 clonedStore := newStore()
753 clonedStore.CurrentIndex = s.CurrentIndex
754 clonedStore.Root = s.Root.Clone()
755 clonedStore.WatcherHub = s.WatcherHub.clone()
756 clonedStore.Stats = s.Stats.clone()
757 clonedStore.CurrentVersion = s.CurrentVersion
758
759 s.worldLock.Unlock()
760 return clonedStore
761}
762
763// Recovery recovers the store system from a static state
764// It needs to recover the parent field of the nodes.
765// It needs to delete the expired nodes since the saved time and also
766// needs to create monitoring go routines.
767func (s *store) Recovery(state []byte) error {
768 s.worldLock.Lock()
769 defer s.worldLock.Unlock()
770 err := json.Unmarshal(state, s)
771
772 if err != nil {
773 return err
774 }
775
776 s.ttlKeyHeap = newTtlKeyHeap()
777
778 s.Root.recoverAndclean()
779 return nil
780}
781
782func (s *store) JsonStats() []byte {
783 s.Stats.Watchers = uint64(s.WatcherHub.count)
784 return s.Stats.toJson()
785}
786
787func (s *store) HasTTLKeys() bool {
788 s.worldLock.RLock()
789 defer s.worldLock.RUnlock()
790 return s.ttlKeyHeap.Len() != 0
791}