blob: a54f2f37ed9cb532e48f63e92ad7bb6aaebafa7f [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301package redis
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "math"
8 "net"
9 "runtime"
10 "sort"
11 "sync"
12 "sync/atomic"
13 "time"
14
15 "github.com/go-redis/redis/v8/internal"
16 "github.com/go-redis/redis/v8/internal/hashtag"
17 "github.com/go-redis/redis/v8/internal/pool"
18 "github.com/go-redis/redis/v8/internal/proto"
19 "github.com/go-redis/redis/v8/internal/rand"
20)
21
22var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
23
24// ClusterOptions are used to configure a cluster client and should be
25// passed to NewClusterClient.
26type ClusterOptions struct {
27 // A seed list of host:port addresses of cluster nodes.
28 Addrs []string
29
30 // NewClient creates a cluster node client with provided name and options.
31 NewClient func(opt *Options) *Client
32
33 // The maximum number of retries before giving up. Command is retried
34 // on network errors and MOVED/ASK redirects.
35 // Default is 3 retries.
36 MaxRedirects int
37
38 // Enables read-only commands on slave nodes.
39 ReadOnly bool
40 // Allows routing read-only commands to the closest master or slave node.
41 // It automatically enables ReadOnly.
42 RouteByLatency bool
43 // Allows routing read-only commands to the random master or slave node.
44 // It automatically enables ReadOnly.
45 RouteRandomly bool
46
47 // Optional function that returns cluster slots information.
48 // It is useful to manually create cluster of standalone Redis servers
49 // and load-balance read/write operations between master and slaves.
50 // It can use service like ZooKeeper to maintain configuration information
51 // and Cluster.ReloadState to manually trigger state reloading.
52 ClusterSlots func(context.Context) ([]ClusterSlot, error)
53
54 // Following options are copied from Options struct.
55
56 Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
57
58 OnConnect func(ctx context.Context, cn *Conn) error
59
60 Username string
61 Password string
62
63 MaxRetries int
64 MinRetryBackoff time.Duration
65 MaxRetryBackoff time.Duration
66
67 DialTimeout time.Duration
68 ReadTimeout time.Duration
69 WriteTimeout time.Duration
70
71 // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
72 PoolFIFO bool
73
74 // PoolSize applies per cluster node and not for the whole cluster.
75 PoolSize int
76 MinIdleConns int
77 MaxConnAge time.Duration
78 PoolTimeout time.Duration
79 IdleTimeout time.Duration
80 IdleCheckFrequency time.Duration
81
82 TLSConfig *tls.Config
83}
84
85func (opt *ClusterOptions) init() {
86 if opt.MaxRedirects == -1 {
87 opt.MaxRedirects = 0
88 } else if opt.MaxRedirects == 0 {
89 opt.MaxRedirects = 3
90 }
91
92 if opt.RouteByLatency || opt.RouteRandomly {
93 opt.ReadOnly = true
94 }
95
96 if opt.PoolSize == 0 {
97 opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
98 }
99
100 switch opt.ReadTimeout {
101 case -1:
102 opt.ReadTimeout = 0
103 case 0:
104 opt.ReadTimeout = 3 * time.Second
105 }
106 switch opt.WriteTimeout {
107 case -1:
108 opt.WriteTimeout = 0
109 case 0:
110 opt.WriteTimeout = opt.ReadTimeout
111 }
112
113 if opt.MaxRetries == 0 {
114 opt.MaxRetries = -1
115 }
116 switch opt.MinRetryBackoff {
117 case -1:
118 opt.MinRetryBackoff = 0
119 case 0:
120 opt.MinRetryBackoff = 8 * time.Millisecond
121 }
122 switch opt.MaxRetryBackoff {
123 case -1:
124 opt.MaxRetryBackoff = 0
125 case 0:
126 opt.MaxRetryBackoff = 512 * time.Millisecond
127 }
128
129 if opt.NewClient == nil {
130 opt.NewClient = NewClient
131 }
132}
133
134func (opt *ClusterOptions) clientOptions() *Options {
135 const disableIdleCheck = -1
136
137 return &Options{
138 Dialer: opt.Dialer,
139 OnConnect: opt.OnConnect,
140
141 Username: opt.Username,
142 Password: opt.Password,
143
144 MaxRetries: opt.MaxRetries,
145 MinRetryBackoff: opt.MinRetryBackoff,
146 MaxRetryBackoff: opt.MaxRetryBackoff,
147
148 DialTimeout: opt.DialTimeout,
149 ReadTimeout: opt.ReadTimeout,
150 WriteTimeout: opt.WriteTimeout,
151
152 PoolFIFO: opt.PoolFIFO,
153 PoolSize: opt.PoolSize,
154 MinIdleConns: opt.MinIdleConns,
155 MaxConnAge: opt.MaxConnAge,
156 PoolTimeout: opt.PoolTimeout,
157 IdleTimeout: opt.IdleTimeout,
158 IdleCheckFrequency: disableIdleCheck,
159
160 TLSConfig: opt.TLSConfig,
161 // If ClusterSlots is populated, then we probably have an artificial
162 // cluster whose nodes are not in clustering mode (otherwise there isn't
163 // much use for ClusterSlots config). This means we cannot execute the
164 // READONLY command against that node -- setting readOnly to false in such
165 // situations in the options below will prevent that from happening.
166 readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
167 }
168}
169
170//------------------------------------------------------------------------------
171
172type clusterNode struct {
173 Client *Client
174
175 latency uint32 // atomic
176 generation uint32 // atomic
177 failing uint32 // atomic
178}
179
180func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
181 opt := clOpt.clientOptions()
182 opt.Addr = addr
183 node := clusterNode{
184 Client: clOpt.NewClient(opt),
185 }
186
187 node.latency = math.MaxUint32
188 if clOpt.RouteByLatency {
189 go node.updateLatency()
190 }
191
192 return &node
193}
194
195func (n *clusterNode) String() string {
196 return n.Client.String()
197}
198
199func (n *clusterNode) Close() error {
200 return n.Client.Close()
201}
202
203func (n *clusterNode) updateLatency() {
204 const numProbe = 10
205 var dur uint64
206
207 for i := 0; i < numProbe; i++ {
208 time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)
209
210 start := time.Now()
211 n.Client.Ping(context.TODO())
212 dur += uint64(time.Since(start) / time.Microsecond)
213 }
214
215 latency := float64(dur) / float64(numProbe)
216 atomic.StoreUint32(&n.latency, uint32(latency+0.5))
217}
218
219func (n *clusterNode) Latency() time.Duration {
220 latency := atomic.LoadUint32(&n.latency)
221 return time.Duration(latency) * time.Microsecond
222}
223
224func (n *clusterNode) MarkAsFailing() {
225 atomic.StoreUint32(&n.failing, uint32(time.Now().Unix()))
226}
227
228func (n *clusterNode) Failing() bool {
229 const timeout = 15 // 15 seconds
230
231 failing := atomic.LoadUint32(&n.failing)
232 if failing == 0 {
233 return false
234 }
235 if time.Now().Unix()-int64(failing) < timeout {
236 return true
237 }
238 atomic.StoreUint32(&n.failing, 0)
239 return false
240}
241
242func (n *clusterNode) Generation() uint32 {
243 return atomic.LoadUint32(&n.generation)
244}
245
246func (n *clusterNode) SetGeneration(gen uint32) {
247 for {
248 v := atomic.LoadUint32(&n.generation)
249 if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
250 break
251 }
252 }
253}
254
255//------------------------------------------------------------------------------
256
257type clusterNodes struct {
258 opt *ClusterOptions
259
260 mu sync.RWMutex
261 addrs []string
262 nodes map[string]*clusterNode
263 activeAddrs []string
264 closed bool
265
266 _generation uint32 // atomic
267}
268
269func newClusterNodes(opt *ClusterOptions) *clusterNodes {
270 return &clusterNodes{
271 opt: opt,
272
273 addrs: opt.Addrs,
274 nodes: make(map[string]*clusterNode),
275 }
276}
277
278func (c *clusterNodes) Close() error {
279 c.mu.Lock()
280 defer c.mu.Unlock()
281
282 if c.closed {
283 return nil
284 }
285 c.closed = true
286
287 var firstErr error
288 for _, node := range c.nodes {
289 if err := node.Client.Close(); err != nil && firstErr == nil {
290 firstErr = err
291 }
292 }
293
294 c.nodes = nil
295 c.activeAddrs = nil
296
297 return firstErr
298}
299
300func (c *clusterNodes) Addrs() ([]string, error) {
301 var addrs []string
302
303 c.mu.RLock()
304 closed := c.closed //nolint:ifshort
305 if !closed {
306 if len(c.activeAddrs) > 0 {
307 addrs = c.activeAddrs
308 } else {
309 addrs = c.addrs
310 }
311 }
312 c.mu.RUnlock()
313
314 if closed {
315 return nil, pool.ErrClosed
316 }
317 if len(addrs) == 0 {
318 return nil, errClusterNoNodes
319 }
320 return addrs, nil
321}
322
323func (c *clusterNodes) NextGeneration() uint32 {
324 return atomic.AddUint32(&c._generation, 1)
325}
326
327// GC removes unused nodes.
328func (c *clusterNodes) GC(generation uint32) {
329 //nolint:prealloc
330 var collected []*clusterNode
331
332 c.mu.Lock()
333
334 c.activeAddrs = c.activeAddrs[:0]
335 for addr, node := range c.nodes {
336 if node.Generation() >= generation {
337 c.activeAddrs = append(c.activeAddrs, addr)
338 if c.opt.RouteByLatency {
339 go node.updateLatency()
340 }
341 continue
342 }
343
344 delete(c.nodes, addr)
345 collected = append(collected, node)
346 }
347
348 c.mu.Unlock()
349
350 for _, node := range collected {
351 _ = node.Client.Close()
352 }
353}
354
355func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
356 node, err := c.get(addr)
357 if err != nil {
358 return nil, err
359 }
360 if node != nil {
361 return node, nil
362 }
363
364 c.mu.Lock()
365 defer c.mu.Unlock()
366
367 if c.closed {
368 return nil, pool.ErrClosed
369 }
370
371 node, ok := c.nodes[addr]
372 if ok {
373 return node, nil
374 }
375
376 node = newClusterNode(c.opt, addr)
377
378 c.addrs = appendIfNotExists(c.addrs, addr)
379 c.nodes[addr] = node
380
381 return node, nil
382}
383
384func (c *clusterNodes) get(addr string) (*clusterNode, error) {
385 var node *clusterNode
386 var err error
387 c.mu.RLock()
388 if c.closed {
389 err = pool.ErrClosed
390 } else {
391 node = c.nodes[addr]
392 }
393 c.mu.RUnlock()
394 return node, err
395}
396
397func (c *clusterNodes) All() ([]*clusterNode, error) {
398 c.mu.RLock()
399 defer c.mu.RUnlock()
400
401 if c.closed {
402 return nil, pool.ErrClosed
403 }
404
405 cp := make([]*clusterNode, 0, len(c.nodes))
406 for _, node := range c.nodes {
407 cp = append(cp, node)
408 }
409 return cp, nil
410}
411
412func (c *clusterNodes) Random() (*clusterNode, error) {
413 addrs, err := c.Addrs()
414 if err != nil {
415 return nil, err
416 }
417
418 n := rand.Intn(len(addrs))
419 return c.GetOrCreate(addrs[n])
420}
421
422//------------------------------------------------------------------------------
423
424type clusterSlot struct {
425 start, end int
426 nodes []*clusterNode
427}
428
429type clusterSlotSlice []*clusterSlot
430
431func (p clusterSlotSlice) Len() int {
432 return len(p)
433}
434
435func (p clusterSlotSlice) Less(i, j int) bool {
436 return p[i].start < p[j].start
437}
438
439func (p clusterSlotSlice) Swap(i, j int) {
440 p[i], p[j] = p[j], p[i]
441}
442
443type clusterState struct {
444 nodes *clusterNodes
445 Masters []*clusterNode
446 Slaves []*clusterNode
447
448 slots []*clusterSlot
449
450 generation uint32
451 createdAt time.Time
452}
453
454func newClusterState(
455 nodes *clusterNodes, slots []ClusterSlot, origin string,
456) (*clusterState, error) {
457 c := clusterState{
458 nodes: nodes,
459
460 slots: make([]*clusterSlot, 0, len(slots)),
461
462 generation: nodes.NextGeneration(),
463 createdAt: time.Now(),
464 }
465
466 originHost, _, _ := net.SplitHostPort(origin)
467 isLoopbackOrigin := isLoopback(originHost)
468
469 for _, slot := range slots {
470 var nodes []*clusterNode
471 for i, slotNode := range slot.Nodes {
472 addr := slotNode.Addr
473 if !isLoopbackOrigin {
474 addr = replaceLoopbackHost(addr, originHost)
475 }
476
477 node, err := c.nodes.GetOrCreate(addr)
478 if err != nil {
479 return nil, err
480 }
481
482 node.SetGeneration(c.generation)
483 nodes = append(nodes, node)
484
485 if i == 0 {
486 c.Masters = appendUniqueNode(c.Masters, node)
487 } else {
488 c.Slaves = appendUniqueNode(c.Slaves, node)
489 }
490 }
491
492 c.slots = append(c.slots, &clusterSlot{
493 start: slot.Start,
494 end: slot.End,
495 nodes: nodes,
496 })
497 }
498
499 sort.Sort(clusterSlotSlice(c.slots))
500
501 time.AfterFunc(time.Minute, func() {
502 nodes.GC(c.generation)
503 })
504
505 return &c, nil
506}
507
508func replaceLoopbackHost(nodeAddr, originHost string) string {
509 nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
510 if err != nil {
511 return nodeAddr
512 }
513
514 nodeIP := net.ParseIP(nodeHost)
515 if nodeIP == nil {
516 return nodeAddr
517 }
518
519 if !nodeIP.IsLoopback() {
520 return nodeAddr
521 }
522
523 // Use origin host which is not loopback and node port.
524 return net.JoinHostPort(originHost, nodePort)
525}
526
527func isLoopback(host string) bool {
528 ip := net.ParseIP(host)
529 if ip == nil {
530 return true
531 }
532 return ip.IsLoopback()
533}
534
535func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
536 nodes := c.slotNodes(slot)
537 if len(nodes) > 0 {
538 return nodes[0], nil
539 }
540 return c.nodes.Random()
541}
542
543func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
544 nodes := c.slotNodes(slot)
545 switch len(nodes) {
546 case 0:
547 return c.nodes.Random()
548 case 1:
549 return nodes[0], nil
550 case 2:
551 if slave := nodes[1]; !slave.Failing() {
552 return slave, nil
553 }
554 return nodes[0], nil
555 default:
556 var slave *clusterNode
557 for i := 0; i < 10; i++ {
558 n := rand.Intn(len(nodes)-1) + 1
559 slave = nodes[n]
560 if !slave.Failing() {
561 return slave, nil
562 }
563 }
564
565 // All slaves are loading - use master.
566 return nodes[0], nil
567 }
568}
569
570func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
571 nodes := c.slotNodes(slot)
572 if len(nodes) == 0 {
573 return c.nodes.Random()
574 }
575
576 var node *clusterNode
577 for _, n := range nodes {
578 if n.Failing() {
579 continue
580 }
581 if node == nil || n.Latency() < node.Latency() {
582 node = n
583 }
584 }
585 if node != nil {
586 return node, nil
587 }
588
589 // If all nodes are failing - return random node
590 return c.nodes.Random()
591}
592
593func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
594 nodes := c.slotNodes(slot)
595 if len(nodes) == 0 {
596 return c.nodes.Random()
597 }
598 if len(nodes) == 1 {
599 return nodes[0], nil
600 }
601 randomNodes := rand.Perm(len(nodes))
602 for _, idx := range randomNodes {
603 if node := nodes[idx]; !node.Failing() {
604 return node, nil
605 }
606 }
607 return nodes[randomNodes[0]], nil
608}
609
610func (c *clusterState) slotNodes(slot int) []*clusterNode {
611 i := sort.Search(len(c.slots), func(i int) bool {
612 return c.slots[i].end >= slot
613 })
614 if i >= len(c.slots) {
615 return nil
616 }
617 x := c.slots[i]
618 if slot >= x.start && slot <= x.end {
619 return x.nodes
620 }
621 return nil
622}
623
624//------------------------------------------------------------------------------
625
626type clusterStateHolder struct {
627 load func(ctx context.Context) (*clusterState, error)
628
629 state atomic.Value
630 reloading uint32 // atomic
631}
632
633func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder {
634 return &clusterStateHolder{
635 load: fn,
636 }
637}
638
639func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error) {
640 state, err := c.load(ctx)
641 if err != nil {
642 return nil, err
643 }
644 c.state.Store(state)
645 return state, nil
646}
647
648func (c *clusterStateHolder) LazyReload() {
649 if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
650 return
651 }
652 go func() {
653 defer atomic.StoreUint32(&c.reloading, 0)
654
655 _, err := c.Reload(context.Background())
656 if err != nil {
657 return
658 }
659 time.Sleep(200 * time.Millisecond)
660 }()
661}
662
663func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {
664 v := c.state.Load()
665 if v == nil {
666 return c.Reload(ctx)
667 }
668
669 state := v.(*clusterState)
670 if time.Since(state.createdAt) > 10*time.Second {
671 c.LazyReload()
672 }
673 return state, nil
674}
675
676func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, error) {
677 state, err := c.Reload(ctx)
678 if err == nil {
679 return state, nil
680 }
681 return c.Get(ctx)
682}
683
684//------------------------------------------------------------------------------
685
686type clusterClient struct {
687 opt *ClusterOptions
688 nodes *clusterNodes
689 state *clusterStateHolder //nolint:structcheck
690 cmdsInfoCache *cmdsInfoCache //nolint:structcheck
691}
692
693// ClusterClient is a Redis Cluster client representing a pool of zero
694// or more underlying connections. It's safe for concurrent use by
695// multiple goroutines.
696type ClusterClient struct {
697 *clusterClient
698 cmdable
699 hooks
700 ctx context.Context
701}
702
703// NewClusterClient returns a Redis Cluster client as described in
704// http://redis.io/topics/cluster-spec.
705func NewClusterClient(opt *ClusterOptions) *ClusterClient {
706 opt.init()
707
708 c := &ClusterClient{
709 clusterClient: &clusterClient{
710 opt: opt,
711 nodes: newClusterNodes(opt),
712 },
713 ctx: context.Background(),
714 }
715 c.state = newClusterStateHolder(c.loadState)
716 c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
717 c.cmdable = c.Process
718
719 if opt.IdleCheckFrequency > 0 {
720 go c.reaper(opt.IdleCheckFrequency)
721 }
722
723 return c
724}
725
726func (c *ClusterClient) Context() context.Context {
727 return c.ctx
728}
729
730func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
731 if ctx == nil {
732 panic("nil context")
733 }
734 clone := *c
735 clone.cmdable = clone.Process
736 clone.hooks.lock()
737 clone.ctx = ctx
738 return &clone
739}
740
741// Options returns read-only Options that were used to create the client.
742func (c *ClusterClient) Options() *ClusterOptions {
743 return c.opt
744}
745
746// ReloadState reloads cluster state. If available it calls ClusterSlots func
747// to get cluster slots information.
748func (c *ClusterClient) ReloadState(ctx context.Context) {
749 c.state.LazyReload()
750}
751
752// Close closes the cluster client, releasing any open resources.
753//
754// It is rare to Close a ClusterClient, as the ClusterClient is meant
755// to be long-lived and shared between many goroutines.
756func (c *ClusterClient) Close() error {
757 return c.nodes.Close()
758}
759
760// Do creates a Cmd from the args and processes the cmd.
761func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
762 cmd := NewCmd(ctx, args...)
763 _ = c.Process(ctx, cmd)
764 return cmd
765}
766
767func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
768 return c.hooks.process(ctx, cmd, c.process)
769}
770
771func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
772 cmdInfo := c.cmdInfo(cmd.Name())
773 slot := c.cmdSlot(cmd)
774
775 var node *clusterNode
776 var ask bool
777 var lastErr error
778 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
779 if attempt > 0 {
780 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
781 return err
782 }
783 }
784
785 if node == nil {
786 var err error
787 node, err = c.cmdNode(ctx, cmdInfo, slot)
788 if err != nil {
789 return err
790 }
791 }
792
793 if ask {
794 pipe := node.Client.Pipeline()
795 _ = pipe.Process(ctx, NewCmd(ctx, "asking"))
796 _ = pipe.Process(ctx, cmd)
797 _, lastErr = pipe.Exec(ctx)
798 _ = pipe.Close()
799 ask = false
800 } else {
801 lastErr = node.Client.Process(ctx, cmd)
802 }
803
804 // If there is no error - we are done.
805 if lastErr == nil {
806 return nil
807 }
808 if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
809 if isReadOnly {
810 c.state.LazyReload()
811 }
812 node = nil
813 continue
814 }
815
816 // If slave is loading - pick another node.
817 if c.opt.ReadOnly && isLoadingError(lastErr) {
818 node.MarkAsFailing()
819 node = nil
820 continue
821 }
822
823 var moved bool
824 var addr string
825 moved, ask, addr = isMovedError(lastErr)
826 if moved || ask {
827 c.state.LazyReload()
828
829 var err error
830 node, err = c.nodes.GetOrCreate(addr)
831 if err != nil {
832 return err
833 }
834 continue
835 }
836
837 if shouldRetry(lastErr, cmd.readTimeout() == nil) {
838 // First retry the same node.
839 if attempt == 0 {
840 continue
841 }
842
843 // Second try another node.
844 node.MarkAsFailing()
845 node = nil
846 continue
847 }
848
849 return lastErr
850 }
851 return lastErr
852}
853
854// ForEachMaster concurrently calls the fn on each master node in the cluster.
855// It returns the first error if any.
856func (c *ClusterClient) ForEachMaster(
857 ctx context.Context,
858 fn func(ctx context.Context, client *Client) error,
859) error {
860 state, err := c.state.ReloadOrGet(ctx)
861 if err != nil {
862 return err
863 }
864
865 var wg sync.WaitGroup
866 errCh := make(chan error, 1)
867
868 for _, master := range state.Masters {
869 wg.Add(1)
870 go func(node *clusterNode) {
871 defer wg.Done()
872 err := fn(ctx, node.Client)
873 if err != nil {
874 select {
875 case errCh <- err:
876 default:
877 }
878 }
879 }(master)
880 }
881
882 wg.Wait()
883
884 select {
885 case err := <-errCh:
886 return err
887 default:
888 return nil
889 }
890}
891
892// ForEachSlave concurrently calls the fn on each slave node in the cluster.
893// It returns the first error if any.
894func (c *ClusterClient) ForEachSlave(
895 ctx context.Context,
896 fn func(ctx context.Context, client *Client) error,
897) error {
898 state, err := c.state.ReloadOrGet(ctx)
899 if err != nil {
900 return err
901 }
902
903 var wg sync.WaitGroup
904 errCh := make(chan error, 1)
905
906 for _, slave := range state.Slaves {
907 wg.Add(1)
908 go func(node *clusterNode) {
909 defer wg.Done()
910 err := fn(ctx, node.Client)
911 if err != nil {
912 select {
913 case errCh <- err:
914 default:
915 }
916 }
917 }(slave)
918 }
919
920 wg.Wait()
921
922 select {
923 case err := <-errCh:
924 return err
925 default:
926 return nil
927 }
928}
929
930// ForEachShard concurrently calls the fn on each known node in the cluster.
931// It returns the first error if any.
932func (c *ClusterClient) ForEachShard(
933 ctx context.Context,
934 fn func(ctx context.Context, client *Client) error,
935) error {
936 state, err := c.state.ReloadOrGet(ctx)
937 if err != nil {
938 return err
939 }
940
941 var wg sync.WaitGroup
942 errCh := make(chan error, 1)
943
944 worker := func(node *clusterNode) {
945 defer wg.Done()
946 err := fn(ctx, node.Client)
947 if err != nil {
948 select {
949 case errCh <- err:
950 default:
951 }
952 }
953 }
954
955 for _, node := range state.Masters {
956 wg.Add(1)
957 go worker(node)
958 }
959 for _, node := range state.Slaves {
960 wg.Add(1)
961 go worker(node)
962 }
963
964 wg.Wait()
965
966 select {
967 case err := <-errCh:
968 return err
969 default:
970 return nil
971 }
972}
973
974// PoolStats returns accumulated connection pool stats.
975func (c *ClusterClient) PoolStats() *PoolStats {
976 var acc PoolStats
977
978 state, _ := c.state.Get(context.TODO())
979 if state == nil {
980 return &acc
981 }
982
983 for _, node := range state.Masters {
984 s := node.Client.connPool.Stats()
985 acc.Hits += s.Hits
986 acc.Misses += s.Misses
987 acc.Timeouts += s.Timeouts
988
989 acc.TotalConns += s.TotalConns
990 acc.IdleConns += s.IdleConns
991 acc.StaleConns += s.StaleConns
992 }
993
994 for _, node := range state.Slaves {
995 s := node.Client.connPool.Stats()
996 acc.Hits += s.Hits
997 acc.Misses += s.Misses
998 acc.Timeouts += s.Timeouts
999
1000 acc.TotalConns += s.TotalConns
1001 acc.IdleConns += s.IdleConns
1002 acc.StaleConns += s.StaleConns
1003 }
1004
1005 return &acc
1006}
1007
1008func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
1009 if c.opt.ClusterSlots != nil {
1010 slots, err := c.opt.ClusterSlots(ctx)
1011 if err != nil {
1012 return nil, err
1013 }
1014 return newClusterState(c.nodes, slots, "")
1015 }
1016
1017 addrs, err := c.nodes.Addrs()
1018 if err != nil {
1019 return nil, err
1020 }
1021
1022 var firstErr error
1023
1024 for _, idx := range rand.Perm(len(addrs)) {
1025 addr := addrs[idx]
1026
1027 node, err := c.nodes.GetOrCreate(addr)
1028 if err != nil {
1029 if firstErr == nil {
1030 firstErr = err
1031 }
1032 continue
1033 }
1034
1035 slots, err := node.Client.ClusterSlots(ctx).Result()
1036 if err != nil {
1037 if firstErr == nil {
1038 firstErr = err
1039 }
1040 continue
1041 }
1042
1043 return newClusterState(c.nodes, slots, node.Client.opt.Addr)
1044 }
1045
1046 /*
1047 * No node is connectable. It's possible that all nodes' IP has changed.
1048 * Clear activeAddrs to let client be able to re-connect using the initial
1049 * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),
1050 * which might have chance to resolve domain name and get updated IP address.
1051 */
1052 c.nodes.mu.Lock()
1053 c.nodes.activeAddrs = nil
1054 c.nodes.mu.Unlock()
1055
1056 return nil, firstErr
1057}
1058
1059// reaper closes idle connections to the cluster.
1060func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
1061 ticker := time.NewTicker(idleCheckFrequency)
1062 defer ticker.Stop()
1063
1064 for range ticker.C {
1065 nodes, err := c.nodes.All()
1066 if err != nil {
1067 break
1068 }
1069
1070 for _, node := range nodes {
1071 _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
1072 if err != nil {
1073 internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err)
1074 }
1075 }
1076 }
1077}
1078
1079func (c *ClusterClient) Pipeline() Pipeliner {
1080 pipe := Pipeline{
1081 ctx: c.ctx,
1082 exec: c.processPipeline,
1083 }
1084 pipe.init()
1085 return &pipe
1086}
1087
1088func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
1089 return c.Pipeline().Pipelined(ctx, fn)
1090}
1091
1092func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
1093 return c.hooks.processPipeline(ctx, cmds, c._processPipeline)
1094}
1095
1096func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
1097 cmdsMap := newCmdsMap()
1098 err := c.mapCmdsByNode(ctx, cmdsMap, cmds)
1099 if err != nil {
1100 setCmdsErr(cmds, err)
1101 return err
1102 }
1103
1104 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1105 if attempt > 0 {
1106 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1107 setCmdsErr(cmds, err)
1108 return err
1109 }
1110 }
1111
1112 failedCmds := newCmdsMap()
1113 var wg sync.WaitGroup
1114
1115 for node, cmds := range cmdsMap.m {
1116 wg.Add(1)
1117 go func(node *clusterNode, cmds []Cmder) {
1118 defer wg.Done()
1119
1120 err := c._processPipelineNode(ctx, node, cmds, failedCmds)
1121 if err == nil {
1122 return
1123 }
1124 if attempt < c.opt.MaxRedirects {
1125 if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
1126 setCmdsErr(cmds, err)
1127 }
1128 } else {
1129 setCmdsErr(cmds, err)
1130 }
1131 }(node, cmds)
1132 }
1133
1134 wg.Wait()
1135 if len(failedCmds.m) == 0 {
1136 break
1137 }
1138 cmdsMap = failedCmds
1139 }
1140
1141 return cmdsFirstErr(cmds)
1142}
1143
1144func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error {
1145 state, err := c.state.Get(ctx)
1146 if err != nil {
1147 return err
1148 }
1149
1150 if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) {
1151 for _, cmd := range cmds {
1152 slot := c.cmdSlot(cmd)
1153 node, err := c.slotReadOnlyNode(state, slot)
1154 if err != nil {
1155 return err
1156 }
1157 cmdsMap.Add(node, cmd)
1158 }
1159 return nil
1160 }
1161
1162 for _, cmd := range cmds {
1163 slot := c.cmdSlot(cmd)
1164 node, err := state.slotMasterNode(slot)
1165 if err != nil {
1166 return err
1167 }
1168 cmdsMap.Add(node, cmd)
1169 }
1170 return nil
1171}
1172
1173func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
1174 for _, cmd := range cmds {
1175 cmdInfo := c.cmdInfo(cmd.Name())
1176 if cmdInfo == nil || !cmdInfo.ReadOnly {
1177 return false
1178 }
1179 }
1180 return true
1181}
1182
1183func (c *ClusterClient) _processPipelineNode(
1184 ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1185) error {
1186 return node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1187 return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1188 err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1189 return writeCmds(wr, cmds)
1190 })
1191 if err != nil {
1192 return err
1193 }
1194
1195 return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1196 return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
1197 })
1198 })
1199 })
1200}
1201
1202func (c *ClusterClient) pipelineReadCmds(
1203 ctx context.Context,
1204 node *clusterNode,
1205 rd *proto.Reader,
1206 cmds []Cmder,
1207 failedCmds *cmdsMap,
1208) error {
1209 for _, cmd := range cmds {
1210 err := cmd.readReply(rd)
1211 cmd.SetErr(err)
1212
1213 if err == nil {
1214 continue
1215 }
1216
1217 if c.checkMovedErr(ctx, cmd, err, failedCmds) {
1218 continue
1219 }
1220
1221 if c.opt.ReadOnly && isLoadingError(err) {
1222 node.MarkAsFailing()
1223 return err
1224 }
1225 if isRedisError(err) {
1226 continue
1227 }
1228 return err
1229 }
1230 return nil
1231}
1232
1233func (c *ClusterClient) checkMovedErr(
1234 ctx context.Context, cmd Cmder, err error, failedCmds *cmdsMap,
1235) bool {
1236 moved, ask, addr := isMovedError(err)
1237 if !moved && !ask {
1238 return false
1239 }
1240
1241 node, err := c.nodes.GetOrCreate(addr)
1242 if err != nil {
1243 return false
1244 }
1245
1246 if moved {
1247 c.state.LazyReload()
1248 failedCmds.Add(node, cmd)
1249 return true
1250 }
1251
1252 if ask {
1253 failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1254 return true
1255 }
1256
1257 panic("not reached")
1258}
1259
1260// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
1261func (c *ClusterClient) TxPipeline() Pipeliner {
1262 pipe := Pipeline{
1263 ctx: c.ctx,
1264 exec: c.processTxPipeline,
1265 }
1266 pipe.init()
1267 return &pipe
1268}
1269
1270func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
1271 return c.TxPipeline().Pipelined(ctx, fn)
1272}
1273
1274func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
1275 return c.hooks.processTxPipeline(ctx, cmds, c._processTxPipeline)
1276}
1277
1278func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error {
1279 // Trim multi .. exec.
1280 cmds = cmds[1 : len(cmds)-1]
1281
1282 state, err := c.state.Get(ctx)
1283 if err != nil {
1284 setCmdsErr(cmds, err)
1285 return err
1286 }
1287
1288 cmdsMap := c.mapCmdsBySlot(cmds)
1289 for slot, cmds := range cmdsMap {
1290 node, err := state.slotMasterNode(slot)
1291 if err != nil {
1292 setCmdsErr(cmds, err)
1293 continue
1294 }
1295
1296 cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1297 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1298 if attempt > 0 {
1299 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1300 setCmdsErr(cmds, err)
1301 return err
1302 }
1303 }
1304
1305 failedCmds := newCmdsMap()
1306 var wg sync.WaitGroup
1307
1308 for node, cmds := range cmdsMap {
1309 wg.Add(1)
1310 go func(node *clusterNode, cmds []Cmder) {
1311 defer wg.Done()
1312
1313 err := c._processTxPipelineNode(ctx, node, cmds, failedCmds)
1314 if err == nil {
1315 return
1316 }
1317
1318 if attempt < c.opt.MaxRedirects {
1319 if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
1320 setCmdsErr(cmds, err)
1321 }
1322 } else {
1323 setCmdsErr(cmds, err)
1324 }
1325 }(node, cmds)
1326 }
1327
1328 wg.Wait()
1329 if len(failedCmds.m) == 0 {
1330 break
1331 }
1332 cmdsMap = failedCmds.m
1333 }
1334 }
1335
1336 return cmdsFirstErr(cmds)
1337}
1338
1339func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
1340 cmdsMap := make(map[int][]Cmder)
1341 for _, cmd := range cmds {
1342 slot := c.cmdSlot(cmd)
1343 cmdsMap[slot] = append(cmdsMap[slot], cmd)
1344 }
1345 return cmdsMap
1346}
1347
1348func (c *ClusterClient) _processTxPipelineNode(
1349 ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1350) error {
1351 return node.Client.hooks.processTxPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1352 return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1353 err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
1354 return writeCmds(wr, cmds)
1355 })
1356 if err != nil {
1357 return err
1358 }
1359
1360 return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
1361 statusCmd := cmds[0].(*StatusCmd)
1362 // Trim multi and exec.
1363 cmds = cmds[1 : len(cmds)-1]
1364
1365 err := c.txPipelineReadQueued(ctx, rd, statusCmd, cmds, failedCmds)
1366 if err != nil {
1367 moved, ask, addr := isMovedError(err)
1368 if moved || ask {
1369 return c.cmdsMoved(ctx, cmds, moved, ask, addr, failedCmds)
1370 }
1371 return err
1372 }
1373
1374 return pipelineReadCmds(rd, cmds)
1375 })
1376 })
1377 })
1378}
1379
1380func (c *ClusterClient) txPipelineReadQueued(
1381 ctx context.Context,
1382 rd *proto.Reader,
1383 statusCmd *StatusCmd,
1384 cmds []Cmder,
1385 failedCmds *cmdsMap,
1386) error {
1387 // Parse queued replies.
1388 if err := statusCmd.readReply(rd); err != nil {
1389 return err
1390 }
1391
1392 for _, cmd := range cmds {
1393 err := statusCmd.readReply(rd)
1394 if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) {
1395 continue
1396 }
1397 return err
1398 }
1399
1400 // Parse number of replies.
1401 line, err := rd.ReadLine()
1402 if err != nil {
1403 if err == Nil {
1404 err = TxFailedErr
1405 }
1406 return err
1407 }
1408
1409 switch line[0] {
1410 case proto.ErrorReply:
1411 return proto.ParseErrorReply(line)
1412 case proto.ArrayReply:
1413 // ok
1414 default:
1415 return fmt.Errorf("redis: expected '*', but got line %q", line)
1416 }
1417
1418 return nil
1419}
1420
1421func (c *ClusterClient) cmdsMoved(
1422 ctx context.Context, cmds []Cmder,
1423 moved, ask bool,
1424 addr string,
1425 failedCmds *cmdsMap,
1426) error {
1427 node, err := c.nodes.GetOrCreate(addr)
1428 if err != nil {
1429 return err
1430 }
1431
1432 if moved {
1433 c.state.LazyReload()
1434 for _, cmd := range cmds {
1435 failedCmds.Add(node, cmd)
1436 }
1437 return nil
1438 }
1439
1440 if ask {
1441 for _, cmd := range cmds {
1442 failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1443 }
1444 return nil
1445 }
1446
1447 return nil
1448}
1449
1450func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
1451 if len(keys) == 0 {
1452 return fmt.Errorf("redis: Watch requires at least one key")
1453 }
1454
1455 slot := hashtag.Slot(keys[0])
1456 for _, key := range keys[1:] {
1457 if hashtag.Slot(key) != slot {
1458 err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
1459 return err
1460 }
1461 }
1462
1463 node, err := c.slotMasterNode(ctx, slot)
1464 if err != nil {
1465 return err
1466 }
1467
1468 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1469 if attempt > 0 {
1470 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1471 return err
1472 }
1473 }
1474
1475 err = node.Client.Watch(ctx, fn, keys...)
1476 if err == nil {
1477 break
1478 }
1479
1480 moved, ask, addr := isMovedError(err)
1481 if moved || ask {
1482 node, err = c.nodes.GetOrCreate(addr)
1483 if err != nil {
1484 return err
1485 }
1486 continue
1487 }
1488
1489 if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed {
1490 if isReadOnly {
1491 c.state.LazyReload()
1492 }
1493 node, err = c.slotMasterNode(ctx, slot)
1494 if err != nil {
1495 return err
1496 }
1497 continue
1498 }
1499
1500 if shouldRetry(err, true) {
1501 continue
1502 }
1503
1504 return err
1505 }
1506
1507 return err
1508}
1509
1510func (c *ClusterClient) pubSub() *PubSub {
1511 var node *clusterNode
1512 pubsub := &PubSub{
1513 opt: c.opt.clientOptions(),
1514
1515 newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
1516 if node != nil {
1517 panic("node != nil")
1518 }
1519
1520 var err error
1521 if len(channels) > 0 {
1522 slot := hashtag.Slot(channels[0])
1523 node, err = c.slotMasterNode(ctx, slot)
1524 } else {
1525 node, err = c.nodes.Random()
1526 }
1527 if err != nil {
1528 return nil, err
1529 }
1530
1531 cn, err := node.Client.newConn(context.TODO())
1532 if err != nil {
1533 node = nil
1534
1535 return nil, err
1536 }
1537
1538 return cn, nil
1539 },
1540 closeConn: func(cn *pool.Conn) error {
1541 err := node.Client.connPool.CloseConn(cn)
1542 node = nil
1543 return err
1544 },
1545 }
1546 pubsub.init()
1547
1548 return pubsub
1549}
1550
1551// Subscribe subscribes the client to the specified channels.
1552// Channels can be omitted to create empty subscription.
1553func (c *ClusterClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
1554 pubsub := c.pubSub()
1555 if len(channels) > 0 {
1556 _ = pubsub.Subscribe(ctx, channels...)
1557 }
1558 return pubsub
1559}
1560
1561// PSubscribe subscribes the client to the given patterns.
1562// Patterns can be omitted to create empty subscription.
1563func (c *ClusterClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
1564 pubsub := c.pubSub()
1565 if len(channels) > 0 {
1566 _ = pubsub.PSubscribe(ctx, channels...)
1567 }
1568 return pubsub
1569}
1570
1571func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
1572 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
1573}
1574
1575func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
1576 // Try 3 random nodes.
1577 const nodeLimit = 3
1578
1579 addrs, err := c.nodes.Addrs()
1580 if err != nil {
1581 return nil, err
1582 }
1583
1584 var firstErr error
1585
1586 perm := rand.Perm(len(addrs))
1587 if len(perm) > nodeLimit {
1588 perm = perm[:nodeLimit]
1589 }
1590
1591 for _, idx := range perm {
1592 addr := addrs[idx]
1593
1594 node, err := c.nodes.GetOrCreate(addr)
1595 if err != nil {
1596 if firstErr == nil {
1597 firstErr = err
1598 }
1599 continue
1600 }
1601
1602 info, err := node.Client.Command(ctx).Result()
1603 if err == nil {
1604 return info, nil
1605 }
1606 if firstErr == nil {
1607 firstErr = err
1608 }
1609 }
1610
1611 if firstErr == nil {
1612 panic("not reached")
1613 }
1614 return nil, firstErr
1615}
1616
1617func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
1618 cmdsInfo, err := c.cmdsInfoCache.Get(c.ctx)
1619 if err != nil {
1620 return nil
1621 }
1622
1623 info := cmdsInfo[name]
1624 if info == nil {
1625 internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name)
1626 }
1627 return info
1628}
1629
1630func (c *ClusterClient) cmdSlot(cmd Cmder) int {
1631 args := cmd.Args()
1632 if args[0] == "cluster" && args[1] == "getkeysinslot" {
1633 return args[2].(int)
1634 }
1635
1636 cmdInfo := c.cmdInfo(cmd.Name())
1637 return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
1638}
1639
1640func cmdSlot(cmd Cmder, pos int) int {
1641 if pos == 0 {
1642 return hashtag.RandomSlot()
1643 }
1644 firstKey := cmd.stringArg(pos)
1645 return hashtag.Slot(firstKey)
1646}
1647
1648func (c *ClusterClient) cmdNode(
1649 ctx context.Context,
1650 cmdInfo *CommandInfo,
1651 slot int,
1652) (*clusterNode, error) {
1653 state, err := c.state.Get(ctx)
1654 if err != nil {
1655 return nil, err
1656 }
1657
1658 if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
1659 return c.slotReadOnlyNode(state, slot)
1660 }
1661 return state.slotMasterNode(slot)
1662}
1663
1664func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
1665 if c.opt.RouteByLatency {
1666 return state.slotClosestNode(slot)
1667 }
1668 if c.opt.RouteRandomly {
1669 return state.slotRandomNode(slot)
1670 }
1671 return state.slotSlaveNode(slot)
1672}
1673
1674func (c *ClusterClient) slotMasterNode(ctx context.Context, slot int) (*clusterNode, error) {
1675 state, err := c.state.Get(ctx)
1676 if err != nil {
1677 return nil, err
1678 }
1679 return state.slotMasterNode(slot)
1680}
1681
1682// SlaveForKey gets a client for a replica node to run any command on it.
1683// This is especially useful if we want to run a particular lua script which has
1684// only read only commands on the replica.
1685// This is because other redis commands generally have a flag that points that
1686// they are read only and automatically run on the replica nodes
1687// if ClusterOptions.ReadOnly flag is set to true.
1688func (c *ClusterClient) SlaveForKey(ctx context.Context, key string) (*Client, error) {
1689 state, err := c.state.Get(ctx)
1690 if err != nil {
1691 return nil, err
1692 }
1693 slot := hashtag.Slot(key)
1694 node, err := c.slotReadOnlyNode(state, slot)
1695 if err != nil {
1696 return nil, err
1697 }
1698 return node.Client, err
1699}
1700
1701// MasterForKey return a client to the master node for a particular key.
1702func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client, error) {
1703 slot := hashtag.Slot(key)
1704 node, err := c.slotMasterNode(ctx, slot)
1705 if err != nil {
1706 return nil, err
1707 }
1708 return node.Client, err
1709}
1710
1711func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
1712 for _, n := range nodes {
1713 if n == node {
1714 return nodes
1715 }
1716 }
1717 return append(nodes, node)
1718}
1719
1720func appendIfNotExists(ss []string, es ...string) []string {
1721loop:
1722 for _, e := range es {
1723 for _, s := range ss {
1724 if s == e {
1725 continue loop
1726 }
1727 }
1728 ss = append(ss, e)
1729 }
1730 return ss
1731}
1732
1733//------------------------------------------------------------------------------
1734
1735type cmdsMap struct {
1736 mu sync.Mutex
1737 m map[*clusterNode][]Cmder
1738}
1739
1740func newCmdsMap() *cmdsMap {
1741 return &cmdsMap{
1742 m: make(map[*clusterNode][]Cmder),
1743 }
1744}
1745
1746func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
1747 m.mu.Lock()
1748 m.m[node] = append(m.m[node], cmds...)
1749 m.mu.Unlock()
1750}