blob: a6ce5c58462e8deb8b741b678db206b8ccc155f3 [file] [log] [blame]
Joey Armstronge8c091f2023-01-17 16:56:26 -05001package redis
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "math"
8 "net"
9 "runtime"
10 "sort"
11 "sync"
12 "sync/atomic"
13 "time"
14
15 "github.com/go-redis/redis/v8/internal"
16 "github.com/go-redis/redis/v8/internal/hashtag"
17 "github.com/go-redis/redis/v8/internal/pool"
18 "github.com/go-redis/redis/v8/internal/proto"
19 "github.com/go-redis/redis/v8/internal/rand"
20)
21
22var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
23
24// ClusterOptions are used to configure a cluster client and should be
25// passed to NewClusterClient.
26type ClusterOptions struct {
27 // A seed list of host:port addresses of cluster nodes.
28 Addrs []string
29
30 // NewClient creates a cluster node client with provided name and options.
31 NewClient func(opt *Options) *Client
32
33 // The maximum number of retries before giving up. Command is retried
34 // on network errors and MOVED/ASK redirects.
35 // Default is 3 retries.
36 MaxRedirects int
37
38 // Enables read-only commands on slave nodes.
39 ReadOnly bool
40 // Allows routing read-only commands to the closest master or slave node.
41 // It automatically enables ReadOnly.
42 RouteByLatency bool
43 // Allows routing read-only commands to the random master or slave node.
44 // It automatically enables ReadOnly.
45 RouteRandomly bool
46
47 // Optional function that returns cluster slots information.
48 // It is useful to manually create cluster of standalone Redis servers
49 // and load-balance read/write operations between master and slaves.
50 // It can use service like ZooKeeper to maintain configuration information
51 // and Cluster.ReloadState to manually trigger state reloading.
52 ClusterSlots func(context.Context) ([]ClusterSlot, error)
53
54 // Following options are copied from Options struct.
55
56 Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
57
58 OnConnect func(ctx context.Context, cn *Conn) error
59
60 Username string
61 Password string
62
63 MaxRetries int
64 MinRetryBackoff time.Duration
65 MaxRetryBackoff time.Duration
66
67 DialTimeout time.Duration
68 ReadTimeout time.Duration
69 WriteTimeout time.Duration
70
71 // PoolSize applies per cluster node and not for the whole cluster.
72 PoolSize int
73 MinIdleConns int
74 MaxConnAge time.Duration
75 PoolTimeout time.Duration
76 IdleTimeout time.Duration
77 IdleCheckFrequency time.Duration
78
79 TLSConfig *tls.Config
80}
81
82func (opt *ClusterOptions) init() {
83 if opt.MaxRedirects == -1 {
84 opt.MaxRedirects = 0
85 } else if opt.MaxRedirects == 0 {
86 opt.MaxRedirects = 3
87 }
88
89 if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
90 opt.ReadOnly = true
91 }
92
93 if opt.PoolSize == 0 {
94 opt.PoolSize = 5 * runtime.NumCPU()
95 }
96
97 switch opt.ReadTimeout {
98 case -1:
99 opt.ReadTimeout = 0
100 case 0:
101 opt.ReadTimeout = 3 * time.Second
102 }
103 switch opt.WriteTimeout {
104 case -1:
105 opt.WriteTimeout = 0
106 case 0:
107 opt.WriteTimeout = opt.ReadTimeout
108 }
109
110 if opt.MaxRetries == 0 {
111 opt.MaxRetries = -1
112 }
113 switch opt.MinRetryBackoff {
114 case -1:
115 opt.MinRetryBackoff = 0
116 case 0:
117 opt.MinRetryBackoff = 8 * time.Millisecond
118 }
119 switch opt.MaxRetryBackoff {
120 case -1:
121 opt.MaxRetryBackoff = 0
122 case 0:
123 opt.MaxRetryBackoff = 512 * time.Millisecond
124 }
125
126 if opt.NewClient == nil {
127 opt.NewClient = NewClient
128 }
129}
130
131func (opt *ClusterOptions) clientOptions() *Options {
132 const disableIdleCheck = -1
133
134 return &Options{
135 Dialer: opt.Dialer,
136 OnConnect: opt.OnConnect,
137
138 Username: opt.Username,
139 Password: opt.Password,
140
141 MaxRetries: opt.MaxRetries,
142 MinRetryBackoff: opt.MinRetryBackoff,
143 MaxRetryBackoff: opt.MaxRetryBackoff,
144
145 DialTimeout: opt.DialTimeout,
146 ReadTimeout: opt.ReadTimeout,
147 WriteTimeout: opt.WriteTimeout,
148
149 PoolSize: opt.PoolSize,
150 MinIdleConns: opt.MinIdleConns,
151 MaxConnAge: opt.MaxConnAge,
152 PoolTimeout: opt.PoolTimeout,
153 IdleTimeout: opt.IdleTimeout,
154 IdleCheckFrequency: disableIdleCheck,
155
156 readOnly: opt.ReadOnly,
157
158 TLSConfig: opt.TLSConfig,
159 }
160}
161
162//------------------------------------------------------------------------------
163
164type clusterNode struct {
165 Client *Client
166
167 latency uint32 // atomic
168 generation uint32 // atomic
169 failing uint32 // atomic
170}
171
172func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
173 opt := clOpt.clientOptions()
174 opt.Addr = addr
175 node := clusterNode{
176 Client: clOpt.NewClient(opt),
177 }
178
179 node.latency = math.MaxUint32
180 if clOpt.RouteByLatency {
181 go node.updateLatency()
182 }
183
184 return &node
185}
186
187func (n *clusterNode) String() string {
188 return n.Client.String()
189}
190
191func (n *clusterNode) Close() error {
192 return n.Client.Close()
193}
194
195func (n *clusterNode) updateLatency() {
196 const numProbe = 10
197 var dur uint64
198
199 for i := 0; i < numProbe; i++ {
200 time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)
201
202 start := time.Now()
203 n.Client.Ping(context.TODO())
204 dur += uint64(time.Since(start) / time.Microsecond)
205 }
206
207 latency := float64(dur) / float64(numProbe)
208 atomic.StoreUint32(&n.latency, uint32(latency+0.5))
209}
210
211func (n *clusterNode) Latency() time.Duration {
212 latency := atomic.LoadUint32(&n.latency)
213 return time.Duration(latency) * time.Microsecond
214}
215
216func (n *clusterNode) MarkAsFailing() {
217 atomic.StoreUint32(&n.failing, uint32(time.Now().Unix()))
218}
219
220func (n *clusterNode) Failing() bool {
221 const timeout = 15 // 15 seconds
222
223 failing := atomic.LoadUint32(&n.failing)
224 if failing == 0 {
225 return false
226 }
227 if time.Now().Unix()-int64(failing) < timeout {
228 return true
229 }
230 atomic.StoreUint32(&n.failing, 0)
231 return false
232}
233
234func (n *clusterNode) Generation() uint32 {
235 return atomic.LoadUint32(&n.generation)
236}
237
238func (n *clusterNode) SetGeneration(gen uint32) {
239 for {
240 v := atomic.LoadUint32(&n.generation)
241 if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
242 break
243 }
244 }
245}
246
247//------------------------------------------------------------------------------
248
249type clusterNodes struct {
250 opt *ClusterOptions
251
252 mu sync.RWMutex
253 addrs []string
254 nodes map[string]*clusterNode
255 activeAddrs []string
256 closed bool
257
258 _generation uint32 // atomic
259}
260
261func newClusterNodes(opt *ClusterOptions) *clusterNodes {
262 return &clusterNodes{
263 opt: opt,
264
265 addrs: opt.Addrs,
266 nodes: make(map[string]*clusterNode),
267 }
268}
269
270func (c *clusterNodes) Close() error {
271 c.mu.Lock()
272 defer c.mu.Unlock()
273
274 if c.closed {
275 return nil
276 }
277 c.closed = true
278
279 var firstErr error
280 for _, node := range c.nodes {
281 if err := node.Client.Close(); err != nil && firstErr == nil {
282 firstErr = err
283 }
284 }
285
286 c.nodes = nil
287 c.activeAddrs = nil
288
289 return firstErr
290}
291
292func (c *clusterNodes) Addrs() ([]string, error) {
293 var addrs []string
294 c.mu.RLock()
295 closed := c.closed
296 if !closed {
297 if len(c.activeAddrs) > 0 {
298 addrs = c.activeAddrs
299 } else {
300 addrs = c.addrs
301 }
302 }
303 c.mu.RUnlock()
304
305 if closed {
306 return nil, pool.ErrClosed
307 }
308 if len(addrs) == 0 {
309 return nil, errClusterNoNodes
310 }
311 return addrs, nil
312}
313
314func (c *clusterNodes) NextGeneration() uint32 {
315 return atomic.AddUint32(&c._generation, 1)
316}
317
318// GC removes unused nodes.
319func (c *clusterNodes) GC(generation uint32) {
320 //nolint:prealloc
321 var collected []*clusterNode
322
323 c.mu.Lock()
324
325 c.activeAddrs = c.activeAddrs[:0]
326 for addr, node := range c.nodes {
327 if node.Generation() >= generation {
328 c.activeAddrs = append(c.activeAddrs, addr)
329 if c.opt.RouteByLatency {
330 go node.updateLatency()
331 }
332 continue
333 }
334
335 delete(c.nodes, addr)
336 collected = append(collected, node)
337 }
338
339 c.mu.Unlock()
340
341 for _, node := range collected {
342 _ = node.Client.Close()
343 }
344}
345
346func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
347 node, err := c.get(addr)
348 if err != nil {
349 return nil, err
350 }
351 if node != nil {
352 return node, nil
353 }
354
355 c.mu.Lock()
356 defer c.mu.Unlock()
357
358 if c.closed {
359 return nil, pool.ErrClosed
360 }
361
362 node, ok := c.nodes[addr]
363 if ok {
364 return node, nil
365 }
366
367 node = newClusterNode(c.opt, addr)
368
369 c.addrs = appendIfNotExists(c.addrs, addr)
370 c.nodes[addr] = node
371
372 return node, nil
373}
374
375func (c *clusterNodes) get(addr string) (*clusterNode, error) {
376 var node *clusterNode
377 var err error
378 c.mu.RLock()
379 if c.closed {
380 err = pool.ErrClosed
381 } else {
382 node = c.nodes[addr]
383 }
384 c.mu.RUnlock()
385 return node, err
386}
387
388func (c *clusterNodes) All() ([]*clusterNode, error) {
389 c.mu.RLock()
390 defer c.mu.RUnlock()
391
392 if c.closed {
393 return nil, pool.ErrClosed
394 }
395
396 cp := make([]*clusterNode, 0, len(c.nodes))
397 for _, node := range c.nodes {
398 cp = append(cp, node)
399 }
400 return cp, nil
401}
402
403func (c *clusterNodes) Random() (*clusterNode, error) {
404 addrs, err := c.Addrs()
405 if err != nil {
406 return nil, err
407 }
408
409 n := rand.Intn(len(addrs))
410 return c.Get(addrs[n])
411}
412
413//------------------------------------------------------------------------------
414
415type clusterSlot struct {
416 start, end int
417 nodes []*clusterNode
418}
419
420type clusterSlotSlice []*clusterSlot
421
422func (p clusterSlotSlice) Len() int {
423 return len(p)
424}
425
426func (p clusterSlotSlice) Less(i, j int) bool {
427 return p[i].start < p[j].start
428}
429
430func (p clusterSlotSlice) Swap(i, j int) {
431 p[i], p[j] = p[j], p[i]
432}
433
434type clusterState struct {
435 nodes *clusterNodes
436 Masters []*clusterNode
437 Slaves []*clusterNode
438
439 slots []*clusterSlot
440
441 generation uint32
442 createdAt time.Time
443}
444
445func newClusterState(
446 nodes *clusterNodes, slots []ClusterSlot, origin string,
447) (*clusterState, error) {
448 c := clusterState{
449 nodes: nodes,
450
451 slots: make([]*clusterSlot, 0, len(slots)),
452
453 generation: nodes.NextGeneration(),
454 createdAt: time.Now(),
455 }
456
457 originHost, _, _ := net.SplitHostPort(origin)
458 isLoopbackOrigin := isLoopback(originHost)
459
460 for _, slot := range slots {
461 var nodes []*clusterNode
462 for i, slotNode := range slot.Nodes {
463 addr := slotNode.Addr
464 if !isLoopbackOrigin {
465 addr = replaceLoopbackHost(addr, originHost)
466 }
467
468 node, err := c.nodes.Get(addr)
469 if err != nil {
470 return nil, err
471 }
472
473 node.SetGeneration(c.generation)
474 nodes = append(nodes, node)
475
476 if i == 0 {
477 c.Masters = appendUniqueNode(c.Masters, node)
478 } else {
479 c.Slaves = appendUniqueNode(c.Slaves, node)
480 }
481 }
482
483 c.slots = append(c.slots, &clusterSlot{
484 start: slot.Start,
485 end: slot.End,
486 nodes: nodes,
487 })
488 }
489
490 sort.Sort(clusterSlotSlice(c.slots))
491
492 time.AfterFunc(time.Minute, func() {
493 nodes.GC(c.generation)
494 })
495
496 return &c, nil
497}
498
499func replaceLoopbackHost(nodeAddr, originHost string) string {
500 nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
501 if err != nil {
502 return nodeAddr
503 }
504
505 nodeIP := net.ParseIP(nodeHost)
506 if nodeIP == nil {
507 return nodeAddr
508 }
509
510 if !nodeIP.IsLoopback() {
511 return nodeAddr
512 }
513
514 // Use origin host which is not loopback and node port.
515 return net.JoinHostPort(originHost, nodePort)
516}
517
518func isLoopback(host string) bool {
519 ip := net.ParseIP(host)
520 if ip == nil {
521 return true
522 }
523 return ip.IsLoopback()
524}
525
526func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
527 nodes := c.slotNodes(slot)
528 if len(nodes) > 0 {
529 return nodes[0], nil
530 }
531 return c.nodes.Random()
532}
533
534func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
535 nodes := c.slotNodes(slot)
536 switch len(nodes) {
537 case 0:
538 return c.nodes.Random()
539 case 1:
540 return nodes[0], nil
541 case 2:
542 if slave := nodes[1]; !slave.Failing() {
543 return slave, nil
544 }
545 return nodes[0], nil
546 default:
547 var slave *clusterNode
548 for i := 0; i < 10; i++ {
549 n := rand.Intn(len(nodes)-1) + 1
550 slave = nodes[n]
551 if !slave.Failing() {
552 return slave, nil
553 }
554 }
555
556 // All slaves are loading - use master.
557 return nodes[0], nil
558 }
559}
560
561func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
562 nodes := c.slotNodes(slot)
563 if len(nodes) == 0 {
564 return c.nodes.Random()
565 }
566
567 var node *clusterNode
568 for _, n := range nodes {
569 if n.Failing() {
570 continue
571 }
572 if node == nil || n.Latency() < node.Latency() {
573 node = n
574 }
575 }
576 if node != nil {
577 return node, nil
578 }
579
580 // If all nodes are failing - return random node
581 return c.nodes.Random()
582}
583
584func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
585 nodes := c.slotNodes(slot)
586 if len(nodes) == 0 {
587 return c.nodes.Random()
588 }
589 n := rand.Intn(len(nodes))
590 return nodes[n], nil
591}
592
593func (c *clusterState) slotNodes(slot int) []*clusterNode {
594 i := sort.Search(len(c.slots), func(i int) bool {
595 return c.slots[i].end >= slot
596 })
597 if i >= len(c.slots) {
598 return nil
599 }
600 x := c.slots[i]
601 if slot >= x.start && slot <= x.end {
602 return x.nodes
603 }
604 return nil
605}
606
607//------------------------------------------------------------------------------
608
609type clusterStateHolder struct {
610 load func(ctx context.Context) (*clusterState, error)
611
612 state atomic.Value
613 reloading uint32 // atomic
614}
615
616func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder {
617 return &clusterStateHolder{
618 load: fn,
619 }
620}
621
622func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error) {
623 state, err := c.load(ctx)
624 if err != nil {
625 return nil, err
626 }
627 c.state.Store(state)
628 return state, nil
629}
630
631func (c *clusterStateHolder) LazyReload(ctx context.Context) {
632 if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
633 return
634 }
635 go func() {
636 defer atomic.StoreUint32(&c.reloading, 0)
637
638 _, err := c.Reload(ctx)
639 if err != nil {
640 return
641 }
642 time.Sleep(200 * time.Millisecond)
643 }()
644}
645
646func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {
647 v := c.state.Load()
648 if v != nil {
649 state := v.(*clusterState)
650 if time.Since(state.createdAt) > 10*time.Second {
651 c.LazyReload(ctx)
652 }
653 return state, nil
654 }
655 return c.Reload(ctx)
656}
657
658func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, error) {
659 state, err := c.Reload(ctx)
660 if err == nil {
661 return state, nil
662 }
663 return c.Get(ctx)
664}
665
666//------------------------------------------------------------------------------
667
668type clusterClient struct {
669 opt *ClusterOptions
670 nodes *clusterNodes
671 state *clusterStateHolder //nolint:structcheck
672 cmdsInfoCache *cmdsInfoCache //nolint:structcheck
673}
674
675// ClusterClient is a Redis Cluster client representing a pool of zero
676// or more underlying connections. It's safe for concurrent use by
677// multiple goroutines.
678type ClusterClient struct {
679 *clusterClient
680 cmdable
681 hooks
682 ctx context.Context
683}
684
685// NewClusterClient returns a Redis Cluster client as described in
686// http://redis.io/topics/cluster-spec.
687func NewClusterClient(opt *ClusterOptions) *ClusterClient {
688 opt.init()
689
690 c := &ClusterClient{
691 clusterClient: &clusterClient{
692 opt: opt,
693 nodes: newClusterNodes(opt),
694 },
695 ctx: context.Background(),
696 }
697 c.state = newClusterStateHolder(c.loadState)
698 c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
699 c.cmdable = c.Process
700
701 if opt.IdleCheckFrequency > 0 {
702 go c.reaper(opt.IdleCheckFrequency)
703 }
704
705 return c
706}
707
708func (c *ClusterClient) Context() context.Context {
709 return c.ctx
710}
711
712func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
713 if ctx == nil {
714 panic("nil context")
715 }
716 clone := *c
717 clone.cmdable = clone.Process
718 clone.hooks.lock()
719 clone.ctx = ctx
720 return &clone
721}
722
723// Options returns read-only Options that were used to create the client.
724func (c *ClusterClient) Options() *ClusterOptions {
725 return c.opt
726}
727
728// ReloadState reloads cluster state. If available it calls ClusterSlots func
729// to get cluster slots information.
730func (c *ClusterClient) ReloadState(ctx context.Context) {
731 c.state.LazyReload(ctx)
732}
733
734// Close closes the cluster client, releasing any open resources.
735//
736// It is rare to Close a ClusterClient, as the ClusterClient is meant
737// to be long-lived and shared between many goroutines.
738func (c *ClusterClient) Close() error {
739 return c.nodes.Close()
740}
741
742// Do creates a Cmd from the args and processes the cmd.
743func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
744 cmd := NewCmd(ctx, args...)
745 _ = c.Process(ctx, cmd)
746 return cmd
747}
748
749func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
750 return c.hooks.process(ctx, cmd, c.process)
751}
752
753func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
754 cmdInfo := c.cmdInfo(cmd.Name())
755 slot := c.cmdSlot(cmd)
756
757 var node *clusterNode
758 var ask bool
759 var lastErr error
760 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
761 if attempt > 0 {
762 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
763 return err
764 }
765 }
766
767 if node == nil {
768 var err error
769 node, err = c.cmdNode(ctx, cmdInfo, slot)
770 if err != nil {
771 return err
772 }
773 }
774
775 if ask {
776 pipe := node.Client.Pipeline()
777 _ = pipe.Process(ctx, NewCmd(ctx, "asking"))
778 _ = pipe.Process(ctx, cmd)
779 _, lastErr = pipe.Exec(ctx)
780 _ = pipe.Close()
781 ask = false
782 } else {
783 lastErr = node.Client.Process(ctx, cmd)
784 }
785
786 // If there is no error - we are done.
787 if lastErr == nil {
788 return nil
789 }
790 if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
791 if isReadOnly {
792 c.state.LazyReload(ctx)
793 }
794 node = nil
795 continue
796 }
797
798 // If slave is loading - pick another node.
799 if c.opt.ReadOnly && isLoadingError(lastErr) {
800 node.MarkAsFailing()
801 node = nil
802 continue
803 }
804
805 var moved bool
806 var addr string
807 moved, ask, addr = isMovedError(lastErr)
808 if moved || ask {
809 var err error
810 node, err = c.nodes.Get(addr)
811 if err != nil {
812 return err
813 }
814 continue
815 }
816
817 if shouldRetry(lastErr, cmd.readTimeout() == nil) {
818 // First retry the same node.
819 if attempt == 0 {
820 continue
821 }
822
823 // Second try another node.
824 node.MarkAsFailing()
825 node = nil
826 continue
827 }
828
829 return lastErr
830 }
831 return lastErr
832}
833
834// ForEachMaster concurrently calls the fn on each master node in the cluster.
835// It returns the first error if any.
836func (c *ClusterClient) ForEachMaster(
837 ctx context.Context,
838 fn func(ctx context.Context, client *Client) error,
839) error {
840 state, err := c.state.ReloadOrGet(ctx)
841 if err != nil {
842 return err
843 }
844
845 var wg sync.WaitGroup
846 errCh := make(chan error, 1)
847
848 for _, master := range state.Masters {
849 wg.Add(1)
850 go func(node *clusterNode) {
851 defer wg.Done()
852 err := fn(ctx, node.Client)
853 if err != nil {
854 select {
855 case errCh <- err:
856 default:
857 }
858 }
859 }(master)
860 }
861
862 wg.Wait()
863
864 select {
865 case err := <-errCh:
866 return err
867 default:
868 return nil
869 }
870}
871
872// ForEachSlave concurrently calls the fn on each slave node in the cluster.
873// It returns the first error if any.
874func (c *ClusterClient) ForEachSlave(
875 ctx context.Context,
876 fn func(ctx context.Context, client *Client) error,
877) error {
878 state, err := c.state.ReloadOrGet(ctx)
879 if err != nil {
880 return err
881 }
882
883 var wg sync.WaitGroup
884 errCh := make(chan error, 1)
885
886 for _, slave := range state.Slaves {
887 wg.Add(1)
888 go func(node *clusterNode) {
889 defer wg.Done()
890 err := fn(ctx, node.Client)
891 if err != nil {
892 select {
893 case errCh <- err:
894 default:
895 }
896 }
897 }(slave)
898 }
899
900 wg.Wait()
901
902 select {
903 case err := <-errCh:
904 return err
905 default:
906 return nil
907 }
908}
909
910// ForEachShard concurrently calls the fn on each known node in the cluster.
911// It returns the first error if any.
912func (c *ClusterClient) ForEachShard(
913 ctx context.Context,
914 fn func(ctx context.Context, client *Client) error,
915) error {
916 state, err := c.state.ReloadOrGet(ctx)
917 if err != nil {
918 return err
919 }
920
921 var wg sync.WaitGroup
922 errCh := make(chan error, 1)
923
924 worker := func(node *clusterNode) {
925 defer wg.Done()
926 err := fn(ctx, node.Client)
927 if err != nil {
928 select {
929 case errCh <- err:
930 default:
931 }
932 }
933 }
934
935 for _, node := range state.Masters {
936 wg.Add(1)
937 go worker(node)
938 }
939 for _, node := range state.Slaves {
940 wg.Add(1)
941 go worker(node)
942 }
943
944 wg.Wait()
945
946 select {
947 case err := <-errCh:
948 return err
949 default:
950 return nil
951 }
952}
953
954// PoolStats returns accumulated connection pool stats.
955func (c *ClusterClient) PoolStats() *PoolStats {
956 var acc PoolStats
957
958 state, _ := c.state.Get(context.TODO())
959 if state == nil {
960 return &acc
961 }
962
963 for _, node := range state.Masters {
964 s := node.Client.connPool.Stats()
965 acc.Hits += s.Hits
966 acc.Misses += s.Misses
967 acc.Timeouts += s.Timeouts
968
969 acc.TotalConns += s.TotalConns
970 acc.IdleConns += s.IdleConns
971 acc.StaleConns += s.StaleConns
972 }
973
974 for _, node := range state.Slaves {
975 s := node.Client.connPool.Stats()
976 acc.Hits += s.Hits
977 acc.Misses += s.Misses
978 acc.Timeouts += s.Timeouts
979
980 acc.TotalConns += s.TotalConns
981 acc.IdleConns += s.IdleConns
982 acc.StaleConns += s.StaleConns
983 }
984
985 return &acc
986}
987
988func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
989 if c.opt.ClusterSlots != nil {
990 slots, err := c.opt.ClusterSlots(ctx)
991 if err != nil {
992 return nil, err
993 }
994 return newClusterState(c.nodes, slots, "")
995 }
996
997 addrs, err := c.nodes.Addrs()
998 if err != nil {
999 return nil, err
1000 }
1001
1002 var firstErr error
1003
1004 for _, idx := range rand.Perm(len(addrs)) {
1005 addr := addrs[idx]
1006
1007 node, err := c.nodes.Get(addr)
1008 if err != nil {
1009 if firstErr == nil {
1010 firstErr = err
1011 }
1012 continue
1013 }
1014
1015 slots, err := node.Client.ClusterSlots(ctx).Result()
1016 if err != nil {
1017 if firstErr == nil {
1018 firstErr = err
1019 }
1020 continue
1021 }
1022
1023 return newClusterState(c.nodes, slots, node.Client.opt.Addr)
1024 }
1025
1026 /*
1027 * No node is connectable. It's possible that all nodes' IP has changed.
1028 * Clear activeAddrs to let client be able to re-connect using the initial
1029 * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),
1030 * which might have chance to resolve domain name and get updated IP address.
1031 */
1032 c.nodes.mu.Lock()
1033 c.nodes.activeAddrs = nil
1034 c.nodes.mu.Unlock()
1035
1036 return nil, firstErr
1037}
1038
1039// reaper closes idle connections to the cluster.
1040func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
1041 ticker := time.NewTicker(idleCheckFrequency)
1042 defer ticker.Stop()
1043
1044 for range ticker.C {
1045 nodes, err := c.nodes.All()
1046 if err != nil {
1047 break
1048 }
1049
1050 for _, node := range nodes {
1051 _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
1052 if err != nil {
1053 internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err)
1054 }
1055 }
1056 }
1057}
1058
1059func (c *ClusterClient) Pipeline() Pipeliner {
1060 pipe := Pipeline{
1061 ctx: c.ctx,
1062 exec: c.processPipeline,
1063 }
1064 pipe.init()
1065 return &pipe
1066}
1067
1068func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
1069 return c.Pipeline().Pipelined(ctx, fn)
1070}
1071
1072func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
1073 return c.hooks.processPipeline(ctx, cmds, c._processPipeline)
1074}
1075
1076func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
1077 cmdsMap := newCmdsMap()
1078 err := c.mapCmdsByNode(ctx, cmdsMap, cmds)
1079 if err != nil {
1080 setCmdsErr(cmds, err)
1081 return err
1082 }
1083
1084 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1085 if attempt > 0 {
1086 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1087 setCmdsErr(cmds, err)
1088 return err
1089 }
1090 }
1091
1092 failedCmds := newCmdsMap()
1093 var wg sync.WaitGroup
1094
1095 for node, cmds := range cmdsMap.m {
1096 wg.Add(1)
1097 go func(node *clusterNode, cmds []Cmder) {
1098 defer wg.Done()
1099
1100 err := c._processPipelineNode(ctx, node, cmds, failedCmds)
1101 if err == nil {
1102 return
1103 }
1104 if attempt < c.opt.MaxRedirects {
1105 if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
1106 setCmdsErr(cmds, err)
1107 }
1108 } else {
1109 setCmdsErr(cmds, err)
1110 }
1111 }(node, cmds)
1112 }
1113
1114 wg.Wait()
1115 if len(failedCmds.m) == 0 {
1116 break
1117 }
1118 cmdsMap = failedCmds
1119 }
1120
1121 return cmdsFirstErr(cmds)
1122}
1123
1124func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error {
1125 state, err := c.state.Get(ctx)
1126 if err != nil {
1127 return err
1128 }
1129
1130 if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) {
1131 for _, cmd := range cmds {
1132 slot := c.cmdSlot(cmd)
1133 node, err := c.slotReadOnlyNode(state, slot)
1134 if err != nil {
1135 return err
1136 }
1137 cmdsMap.Add(node, cmd)
1138 }
1139 return nil
1140 }
1141
1142 for _, cmd := range cmds {
1143 slot := c.cmdSlot(cmd)
1144 node, err := state.slotMasterNode(slot)
1145 if err != nil {
1146 return err
1147 }
1148 cmdsMap.Add(node, cmd)
1149 }
1150 return nil
1151}
1152
1153func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
1154 for _, cmd := range cmds {
1155 cmdInfo := c.cmdInfo(cmd.Name())
1156 if cmdInfo == nil || !cmdInfo.ReadOnly {
1157 return false
1158 }
1159 }
1160 return true
1161}
1162
1163func (c *ClusterClient) _processPipelineNode(
1164 ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1165) error {
1166 return node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1167 return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1168 err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1169 return writeCmds(wr, cmds)
1170 })
1171 if err != nil {
1172 return err
1173 }
1174
1175 return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1176 return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
1177 })
1178 })
1179 })
1180}
1181
1182func (c *ClusterClient) pipelineReadCmds(
1183 ctx context.Context,
1184 node *clusterNode,
1185 rd *proto.Reader,
1186 cmds []Cmder,
1187 failedCmds *cmdsMap,
1188) error {
1189 for _, cmd := range cmds {
1190 err := cmd.readReply(rd)
1191 cmd.SetErr(err)
1192
1193 if err == nil {
1194 continue
1195 }
1196
1197 if c.checkMovedErr(ctx, cmd, err, failedCmds) {
1198 continue
1199 }
1200
1201 if c.opt.ReadOnly && isLoadingError(err) {
1202 node.MarkAsFailing()
1203 return err
1204 }
1205 if isRedisError(err) {
1206 continue
1207 }
1208 return err
1209 }
1210 return nil
1211}
1212
1213func (c *ClusterClient) checkMovedErr(
1214 ctx context.Context, cmd Cmder, err error, failedCmds *cmdsMap,
1215) bool {
1216 moved, ask, addr := isMovedError(err)
1217 if !moved && !ask {
1218 return false
1219 }
1220
1221 node, err := c.nodes.Get(addr)
1222 if err != nil {
1223 return false
1224 }
1225
1226 if moved {
1227 c.state.LazyReload(ctx)
1228 failedCmds.Add(node, cmd)
1229 return true
1230 }
1231
1232 if ask {
1233 failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1234 return true
1235 }
1236
1237 panic("not reached")
1238}
1239
1240// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
1241func (c *ClusterClient) TxPipeline() Pipeliner {
1242 pipe := Pipeline{
1243 ctx: c.ctx,
1244 exec: c.processTxPipeline,
1245 }
1246 pipe.init()
1247 return &pipe
1248}
1249
1250func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
1251 return c.TxPipeline().Pipelined(ctx, fn)
1252}
1253
1254func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
1255 return c.hooks.processPipeline(ctx, cmds, c._processTxPipeline)
1256}
1257
1258func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error {
1259 state, err := c.state.Get(ctx)
1260 if err != nil {
1261 setCmdsErr(cmds, err)
1262 return err
1263 }
1264
1265 cmdsMap := c.mapCmdsBySlot(cmds)
1266 for slot, cmds := range cmdsMap {
1267 node, err := state.slotMasterNode(slot)
1268 if err != nil {
1269 setCmdsErr(cmds, err)
1270 continue
1271 }
1272
1273 cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1274 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1275 if attempt > 0 {
1276 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1277 setCmdsErr(cmds, err)
1278 return err
1279 }
1280 }
1281
1282 failedCmds := newCmdsMap()
1283 var wg sync.WaitGroup
1284
1285 for node, cmds := range cmdsMap {
1286 wg.Add(1)
1287 go func(node *clusterNode, cmds []Cmder) {
1288 defer wg.Done()
1289
1290 err := c._processTxPipelineNode(ctx, node, cmds, failedCmds)
1291 if err == nil {
1292 return
1293 }
1294 if attempt < c.opt.MaxRedirects {
1295 if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
1296 setCmdsErr(cmds, err)
1297 }
1298 } else {
1299 setCmdsErr(cmds, err)
1300 }
1301 }(node, cmds)
1302 }
1303
1304 wg.Wait()
1305 if len(failedCmds.m) == 0 {
1306 break
1307 }
1308 cmdsMap = failedCmds.m
1309 }
1310 }
1311
1312 return cmdsFirstErr(cmds)
1313}
1314
1315func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
1316 cmdsMap := make(map[int][]Cmder)
1317 for _, cmd := range cmds {
1318 slot := c.cmdSlot(cmd)
1319 cmdsMap[slot] = append(cmdsMap[slot], cmd)
1320 }
1321 return cmdsMap
1322}
1323
1324func (c *ClusterClient) _processTxPipelineNode(
1325 ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1326) error {
1327 return node.Client.hooks.processTxPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1328 return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1329 err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1330 return writeCmds(wr, cmds)
1331 })
1332 if err != nil {
1333 return err
1334 }
1335
1336 return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1337 statusCmd := cmds[0].(*StatusCmd)
1338 // Trim multi and exec.
1339 cmds = cmds[1 : len(cmds)-1]
1340
1341 err := c.txPipelineReadQueued(ctx, rd, statusCmd, cmds, failedCmds)
1342 if err != nil {
1343 moved, ask, addr := isMovedError(err)
1344 if moved || ask {
1345 return c.cmdsMoved(ctx, cmds, moved, ask, addr, failedCmds)
1346 }
1347 return err
1348 }
1349
1350 return pipelineReadCmds(rd, cmds)
1351 })
1352 })
1353 })
1354}
1355
1356func (c *ClusterClient) txPipelineReadQueued(
1357 ctx context.Context,
1358 rd *proto.Reader,
1359 statusCmd *StatusCmd,
1360 cmds []Cmder,
1361 failedCmds *cmdsMap,
1362) error {
1363 // Parse queued replies.
1364 if err := statusCmd.readReply(rd); err != nil {
1365 return err
1366 }
1367
1368 for _, cmd := range cmds {
1369 err := statusCmd.readReply(rd)
1370 if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) {
1371 continue
1372 }
1373 return err
1374 }
1375
1376 // Parse number of replies.
1377 line, err := rd.ReadLine()
1378 if err != nil {
1379 if err == Nil {
1380 err = TxFailedErr
1381 }
1382 return err
1383 }
1384
1385 switch line[0] {
1386 case proto.ErrorReply:
1387 return proto.ParseErrorReply(line)
1388 case proto.ArrayReply:
1389 // ok
1390 default:
1391 return fmt.Errorf("redis: expected '*', but got line %q", line)
1392 }
1393
1394 return nil
1395}
1396
1397func (c *ClusterClient) cmdsMoved(
1398 ctx context.Context, cmds []Cmder,
1399 moved, ask bool,
1400 addr string,
1401 failedCmds *cmdsMap,
1402) error {
1403 node, err := c.nodes.Get(addr)
1404 if err != nil {
1405 return err
1406 }
1407
1408 if moved {
1409 c.state.LazyReload(ctx)
1410 for _, cmd := range cmds {
1411 failedCmds.Add(node, cmd)
1412 }
1413 return nil
1414 }
1415
1416 if ask {
1417 for _, cmd := range cmds {
1418 failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1419 }
1420 return nil
1421 }
1422
1423 return nil
1424}
1425
1426func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
1427 if len(keys) == 0 {
1428 return fmt.Errorf("redis: Watch requires at least one key")
1429 }
1430
1431 slot := hashtag.Slot(keys[0])
1432 for _, key := range keys[1:] {
1433 if hashtag.Slot(key) != slot {
1434 err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
1435 return err
1436 }
1437 }
1438
1439 node, err := c.slotMasterNode(ctx, slot)
1440 if err != nil {
1441 return err
1442 }
1443
1444 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1445 if attempt > 0 {
1446 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1447 return err
1448 }
1449 }
1450
1451 err = node.Client.Watch(ctx, fn, keys...)
1452 if err == nil {
1453 break
1454 }
1455
1456 moved, ask, addr := isMovedError(err)
1457 if moved || ask {
1458 node, err = c.nodes.Get(addr)
1459 if err != nil {
1460 return err
1461 }
1462 continue
1463 }
1464
1465 if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed {
1466 if isReadOnly {
1467 c.state.LazyReload(ctx)
1468 }
1469 node, err = c.slotMasterNode(ctx, slot)
1470 if err != nil {
1471 return err
1472 }
1473 continue
1474 }
1475
1476 if shouldRetry(err, true) {
1477 continue
1478 }
1479
1480 return err
1481 }
1482
1483 return err
1484}
1485
1486func (c *ClusterClient) pubSub() *PubSub {
1487 var node *clusterNode
1488 pubsub := &PubSub{
1489 opt: c.opt.clientOptions(),
1490
1491 newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
1492 if node != nil {
1493 panic("node != nil")
1494 }
1495
1496 var err error
1497 if len(channels) > 0 {
1498 slot := hashtag.Slot(channels[0])
1499 node, err = c.slotMasterNode(ctx, slot)
1500 } else {
1501 node, err = c.nodes.Random()
1502 }
1503 if err != nil {
1504 return nil, err
1505 }
1506
1507 cn, err := node.Client.newConn(context.TODO())
1508 if err != nil {
1509 node = nil
1510
1511 return nil, err
1512 }
1513
1514 return cn, nil
1515 },
1516 closeConn: func(cn *pool.Conn) error {
1517 err := node.Client.connPool.CloseConn(cn)
1518 node = nil
1519 return err
1520 },
1521 }
1522 pubsub.init()
1523
1524 return pubsub
1525}
1526
1527// Subscribe subscribes the client to the specified channels.
1528// Channels can be omitted to create empty subscription.
1529func (c *ClusterClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
1530 pubsub := c.pubSub()
1531 if len(channels) > 0 {
1532 _ = pubsub.Subscribe(ctx, channels...)
1533 }
1534 return pubsub
1535}
1536
1537// PSubscribe subscribes the client to the given patterns.
1538// Patterns can be omitted to create empty subscription.
1539func (c *ClusterClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
1540 pubsub := c.pubSub()
1541 if len(channels) > 0 {
1542 _ = pubsub.PSubscribe(ctx, channels...)
1543 }
1544 return pubsub
1545}
1546
1547func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
1548 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
1549}
1550
1551func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
1552 // Try 3 random nodes.
1553 const nodeLimit = 3
1554
1555 addrs, err := c.nodes.Addrs()
1556 if err != nil {
1557 return nil, err
1558 }
1559
1560 var firstErr error
1561
1562 perm := rand.Perm(len(addrs))
1563 if len(perm) > nodeLimit {
1564 perm = perm[:nodeLimit]
1565 }
1566
1567 for _, idx := range perm {
1568 addr := addrs[idx]
1569
1570 node, err := c.nodes.Get(addr)
1571 if err != nil {
1572 if firstErr == nil {
1573 firstErr = err
1574 }
1575 continue
1576 }
1577
1578 info, err := node.Client.Command(ctx).Result()
1579 if err == nil {
1580 return info, nil
1581 }
1582 if firstErr == nil {
1583 firstErr = err
1584 }
1585 }
1586
1587 if firstErr == nil {
1588 panic("not reached")
1589 }
1590 return nil, firstErr
1591}
1592
1593func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
1594 cmdsInfo, err := c.cmdsInfoCache.Get(c.ctx)
1595 if err != nil {
1596 return nil
1597 }
1598
1599 info := cmdsInfo[name]
1600 if info == nil {
1601 internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name)
1602 }
1603 return info
1604}
1605
1606func (c *ClusterClient) cmdSlot(cmd Cmder) int {
1607 args := cmd.Args()
1608 if args[0] == "cluster" && args[1] == "getkeysinslot" {
1609 return args[2].(int)
1610 }
1611
1612 cmdInfo := c.cmdInfo(cmd.Name())
1613 return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
1614}
1615
1616func cmdSlot(cmd Cmder, pos int) int {
1617 if pos == 0 {
1618 return hashtag.RandomSlot()
1619 }
1620 firstKey := cmd.stringArg(pos)
1621 return hashtag.Slot(firstKey)
1622}
1623
1624func (c *ClusterClient) cmdNode(
1625 ctx context.Context,
1626 cmdInfo *CommandInfo,
1627 slot int,
1628) (*clusterNode, error) {
1629 state, err := c.state.Get(ctx)
1630 if err != nil {
1631 return nil, err
1632 }
1633
1634 if (c.opt.RouteByLatency || c.opt.RouteRandomly) && cmdInfo != nil && cmdInfo.ReadOnly {
1635 return c.slotReadOnlyNode(state, slot)
1636 }
1637 return state.slotMasterNode(slot)
1638}
1639
1640func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
1641 if c.opt.RouteByLatency {
1642 return state.slotClosestNode(slot)
1643 }
1644 if c.opt.RouteRandomly {
1645 return state.slotRandomNode(slot)
1646 }
1647 return state.slotSlaveNode(slot)
1648}
1649
1650func (c *ClusterClient) slotMasterNode(ctx context.Context, slot int) (*clusterNode, error) {
1651 state, err := c.state.Get(ctx)
1652 if err != nil {
1653 return nil, err
1654 }
1655 return state.slotMasterNode(slot)
1656}
1657
1658func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
1659 for _, n := range nodes {
1660 if n == node {
1661 return nodes
1662 }
1663 }
1664 return append(nodes, node)
1665}
1666
1667func appendIfNotExists(ss []string, es ...string) []string {
1668loop:
1669 for _, e := range es {
1670 for _, s := range ss {
1671 if s == e {
1672 continue loop
1673 }
1674 }
1675 ss = append(ss, e)
1676 }
1677 return ss
1678}
1679
1680//------------------------------------------------------------------------------
1681
1682type cmdsMap struct {
1683 mu sync.Mutex
1684 m map[*clusterNode][]Cmder
1685}
1686
1687func newCmdsMap() *cmdsMap {
1688 return &cmdsMap{
1689 m: make(map[*clusterNode][]Cmder),
1690 }
1691}
1692
1693func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
1694 m.mu.Lock()
1695 m.m[node] = append(m.m[node], cmds...)
1696 m.mu.Unlock()
1697}