blob: ec6221dc83d4279ed92644209ca9fbfc92a77ef0 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301package redis
2
3import (
4 "context"
5 "crypto/tls"
6 "errors"
7 "net"
8 "strings"
9 "sync"
10 "time"
11
12 "github.com/go-redis/redis/v8/internal"
13 "github.com/go-redis/redis/v8/internal/pool"
14 "github.com/go-redis/redis/v8/internal/rand"
15)
16
17//------------------------------------------------------------------------------
18
19// FailoverOptions are used to configure a failover client and should
20// be passed to NewFailoverClient.
21type FailoverOptions struct {
22 // The master name.
23 MasterName string
24 // A seed list of host:port addresses of sentinel nodes.
25 SentinelAddrs []string
26
27 // If specified with SentinelPassword, enables ACL-based authentication (via
28 // AUTH <user> <pass>).
29 SentinelUsername string
30 // Sentinel password from "requirepass <password>" (if enabled) in Sentinel
31 // configuration, or, if SentinelUsername is also supplied, used for ACL-based
32 // authentication.
33 SentinelPassword string
34
35 // Allows routing read-only commands to the closest master or slave node.
36 // This option only works with NewFailoverClusterClient.
37 RouteByLatency bool
38 // Allows routing read-only commands to the random master or slave node.
39 // This option only works with NewFailoverClusterClient.
40 RouteRandomly bool
41
42 // Route all commands to slave read-only nodes.
43 SlaveOnly bool
44
45 // Use slaves disconnected with master when cannot get connected slaves
46 // Now, this option only works in RandomSlaveAddr function.
47 UseDisconnectedSlaves bool
48
49 // Following options are copied from Options struct.
50
51 Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
52 OnConnect func(ctx context.Context, cn *Conn) error
53
54 Username string
55 Password string
56 DB int
57
58 MaxRetries int
59 MinRetryBackoff time.Duration
60 MaxRetryBackoff time.Duration
61
62 DialTimeout time.Duration
63 ReadTimeout time.Duration
64 WriteTimeout time.Duration
65
66 // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
67 PoolFIFO bool
68
69 PoolSize int
70 MinIdleConns int
71 MaxConnAge time.Duration
72 PoolTimeout time.Duration
73 IdleTimeout time.Duration
74 IdleCheckFrequency time.Duration
75
76 TLSConfig *tls.Config
77}
78
79func (opt *FailoverOptions) clientOptions() *Options {
80 return &Options{
81 Addr: "FailoverClient",
82
83 Dialer: opt.Dialer,
84 OnConnect: opt.OnConnect,
85
86 DB: opt.DB,
87 Username: opt.Username,
88 Password: opt.Password,
89
90 MaxRetries: opt.MaxRetries,
91 MinRetryBackoff: opt.MinRetryBackoff,
92 MaxRetryBackoff: opt.MaxRetryBackoff,
93
94 DialTimeout: opt.DialTimeout,
95 ReadTimeout: opt.ReadTimeout,
96 WriteTimeout: opt.WriteTimeout,
97
98 PoolFIFO: opt.PoolFIFO,
99 PoolSize: opt.PoolSize,
100 PoolTimeout: opt.PoolTimeout,
101 IdleTimeout: opt.IdleTimeout,
102 IdleCheckFrequency: opt.IdleCheckFrequency,
103 MinIdleConns: opt.MinIdleConns,
104 MaxConnAge: opt.MaxConnAge,
105
106 TLSConfig: opt.TLSConfig,
107 }
108}
109
110func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
111 return &Options{
112 Addr: addr,
113
114 Dialer: opt.Dialer,
115 OnConnect: opt.OnConnect,
116
117 DB: 0,
118 Username: opt.SentinelUsername,
119 Password: opt.SentinelPassword,
120
121 MaxRetries: opt.MaxRetries,
122 MinRetryBackoff: opt.MinRetryBackoff,
123 MaxRetryBackoff: opt.MaxRetryBackoff,
124
125 DialTimeout: opt.DialTimeout,
126 ReadTimeout: opt.ReadTimeout,
127 WriteTimeout: opt.WriteTimeout,
128
129 PoolFIFO: opt.PoolFIFO,
130 PoolSize: opt.PoolSize,
131 PoolTimeout: opt.PoolTimeout,
132 IdleTimeout: opt.IdleTimeout,
133 IdleCheckFrequency: opt.IdleCheckFrequency,
134 MinIdleConns: opt.MinIdleConns,
135 MaxConnAge: opt.MaxConnAge,
136
137 TLSConfig: opt.TLSConfig,
138 }
139}
140
141func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
142 return &ClusterOptions{
143 Dialer: opt.Dialer,
144 OnConnect: opt.OnConnect,
145
146 Username: opt.Username,
147 Password: opt.Password,
148
149 MaxRedirects: opt.MaxRetries,
150
151 RouteByLatency: opt.RouteByLatency,
152 RouteRandomly: opt.RouteRandomly,
153
154 MinRetryBackoff: opt.MinRetryBackoff,
155 MaxRetryBackoff: opt.MaxRetryBackoff,
156
157 DialTimeout: opt.DialTimeout,
158 ReadTimeout: opt.ReadTimeout,
159 WriteTimeout: opt.WriteTimeout,
160
161 PoolFIFO: opt.PoolFIFO,
162 PoolSize: opt.PoolSize,
163 PoolTimeout: opt.PoolTimeout,
164 IdleTimeout: opt.IdleTimeout,
165 IdleCheckFrequency: opt.IdleCheckFrequency,
166 MinIdleConns: opt.MinIdleConns,
167 MaxConnAge: opt.MaxConnAge,
168
169 TLSConfig: opt.TLSConfig,
170 }
171}
172
173// NewFailoverClient returns a Redis client that uses Redis Sentinel
174// for automatic failover. It's safe for concurrent use by multiple
175// goroutines.
176func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
177 if failoverOpt.RouteByLatency {
178 panic("to route commands by latency, use NewFailoverClusterClient")
179 }
180 if failoverOpt.RouteRandomly {
181 panic("to route commands randomly, use NewFailoverClusterClient")
182 }
183
184 sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
185 copy(sentinelAddrs, failoverOpt.SentinelAddrs)
186
187 rand.Shuffle(len(sentinelAddrs), func(i, j int) {
188 sentinelAddrs[i], sentinelAddrs[j] = sentinelAddrs[j], sentinelAddrs[i]
189 })
190
191 failover := &sentinelFailover{
192 opt: failoverOpt,
193 sentinelAddrs: sentinelAddrs,
194 }
195
196 opt := failoverOpt.clientOptions()
197 opt.Dialer = masterSlaveDialer(failover)
198 opt.init()
199
200 connPool := newConnPool(opt)
201
202 failover.mu.Lock()
203 failover.onFailover = func(ctx context.Context, addr string) {
204 _ = connPool.Filter(func(cn *pool.Conn) bool {
205 return cn.RemoteAddr().String() != addr
206 })
207 }
208 failover.mu.Unlock()
209
210 c := Client{
211 baseClient: newBaseClient(opt, connPool),
212 ctx: context.Background(),
213 }
214 c.cmdable = c.Process
215 c.onClose = failover.Close
216
217 return &c
218}
219
220func masterSlaveDialer(
221 failover *sentinelFailover,
222) func(ctx context.Context, network, addr string) (net.Conn, error) {
223 return func(ctx context.Context, network, _ string) (net.Conn, error) {
224 var addr string
225 var err error
226
227 if failover.opt.SlaveOnly {
228 addr, err = failover.RandomSlaveAddr(ctx)
229 } else {
230 addr, err = failover.MasterAddr(ctx)
231 if err == nil {
232 failover.trySwitchMaster(ctx, addr)
233 }
234 }
235 if err != nil {
236 return nil, err
237 }
238 if failover.opt.Dialer != nil {
239 return failover.opt.Dialer(ctx, network, addr)
240 }
241
242 netDialer := &net.Dialer{
243 Timeout: failover.opt.DialTimeout,
244 KeepAlive: 5 * time.Minute,
245 }
246 if failover.opt.TLSConfig == nil {
247 return netDialer.DialContext(ctx, network, addr)
248 }
249 return tls.DialWithDialer(netDialer, network, addr, failover.opt.TLSConfig)
250 }
251}
252
253//------------------------------------------------------------------------------
254
255// SentinelClient is a client for a Redis Sentinel.
256type SentinelClient struct {
257 *baseClient
258 hooks
259 ctx context.Context
260}
261
262func NewSentinelClient(opt *Options) *SentinelClient {
263 opt.init()
264 c := &SentinelClient{
265 baseClient: &baseClient{
266 opt: opt,
267 connPool: newConnPool(opt),
268 },
269 ctx: context.Background(),
270 }
271 return c
272}
273
274func (c *SentinelClient) Context() context.Context {
275 return c.ctx
276}
277
278func (c *SentinelClient) WithContext(ctx context.Context) *SentinelClient {
279 if ctx == nil {
280 panic("nil context")
281 }
282 clone := *c
283 clone.ctx = ctx
284 return &clone
285}
286
287func (c *SentinelClient) Process(ctx context.Context, cmd Cmder) error {
288 return c.hooks.process(ctx, cmd, c.baseClient.process)
289}
290
291func (c *SentinelClient) pubSub() *PubSub {
292 pubsub := &PubSub{
293 opt: c.opt,
294
295 newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
296 return c.newConn(ctx)
297 },
298 closeConn: c.connPool.CloseConn,
299 }
300 pubsub.init()
301 return pubsub
302}
303
304// Ping is used to test if a connection is still alive, or to
305// measure latency.
306func (c *SentinelClient) Ping(ctx context.Context) *StringCmd {
307 cmd := NewStringCmd(ctx, "ping")
308 _ = c.Process(ctx, cmd)
309 return cmd
310}
311
312// Subscribe subscribes the client to the specified channels.
313// Channels can be omitted to create empty subscription.
314func (c *SentinelClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
315 pubsub := c.pubSub()
316 if len(channels) > 0 {
317 _ = pubsub.Subscribe(ctx, channels...)
318 }
319 return pubsub
320}
321
322// PSubscribe subscribes the client to the given patterns.
323// Patterns can be omitted to create empty subscription.
324func (c *SentinelClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
325 pubsub := c.pubSub()
326 if len(channels) > 0 {
327 _ = pubsub.PSubscribe(ctx, channels...)
328 }
329 return pubsub
330}
331
332func (c *SentinelClient) GetMasterAddrByName(ctx context.Context, name string) *StringSliceCmd {
333 cmd := NewStringSliceCmd(ctx, "sentinel", "get-master-addr-by-name", name)
334 _ = c.Process(ctx, cmd)
335 return cmd
336}
337
338func (c *SentinelClient) Sentinels(ctx context.Context, name string) *SliceCmd {
339 cmd := NewSliceCmd(ctx, "sentinel", "sentinels", name)
340 _ = c.Process(ctx, cmd)
341 return cmd
342}
343
344// Failover forces a failover as if the master was not reachable, and without
345// asking for agreement to other Sentinels.
346func (c *SentinelClient) Failover(ctx context.Context, name string) *StatusCmd {
347 cmd := NewStatusCmd(ctx, "sentinel", "failover", name)
348 _ = c.Process(ctx, cmd)
349 return cmd
350}
351
352// Reset resets all the masters with matching name. The pattern argument is a
353// glob-style pattern. The reset process clears any previous state in a master
354// (including a failover in progress), and removes every slave and sentinel
355// already discovered and associated with the master.
356func (c *SentinelClient) Reset(ctx context.Context, pattern string) *IntCmd {
357 cmd := NewIntCmd(ctx, "sentinel", "reset", pattern)
358 _ = c.Process(ctx, cmd)
359 return cmd
360}
361
362// FlushConfig forces Sentinel to rewrite its configuration on disk, including
363// the current Sentinel state.
364func (c *SentinelClient) FlushConfig(ctx context.Context) *StatusCmd {
365 cmd := NewStatusCmd(ctx, "sentinel", "flushconfig")
366 _ = c.Process(ctx, cmd)
367 return cmd
368}
369
370// Master shows the state and info of the specified master.
371func (c *SentinelClient) Master(ctx context.Context, name string) *StringStringMapCmd {
372 cmd := NewStringStringMapCmd(ctx, "sentinel", "master", name)
373 _ = c.Process(ctx, cmd)
374 return cmd
375}
376
377// Masters shows a list of monitored masters and their state.
378func (c *SentinelClient) Masters(ctx context.Context) *SliceCmd {
379 cmd := NewSliceCmd(ctx, "sentinel", "masters")
380 _ = c.Process(ctx, cmd)
381 return cmd
382}
383
384// Slaves shows a list of slaves for the specified master and their state.
385func (c *SentinelClient) Slaves(ctx context.Context, name string) *SliceCmd {
386 cmd := NewSliceCmd(ctx, "sentinel", "slaves", name)
387 _ = c.Process(ctx, cmd)
388 return cmd
389}
390
391// CkQuorum checks if the current Sentinel configuration is able to reach the
392// quorum needed to failover a master, and the majority needed to authorize the
393// failover. This command should be used in monitoring systems to check if a
394// Sentinel deployment is ok.
395func (c *SentinelClient) CkQuorum(ctx context.Context, name string) *StringCmd {
396 cmd := NewStringCmd(ctx, "sentinel", "ckquorum", name)
397 _ = c.Process(ctx, cmd)
398 return cmd
399}
400
401// Monitor tells the Sentinel to start monitoring a new master with the specified
402// name, ip, port, and quorum.
403func (c *SentinelClient) Monitor(ctx context.Context, name, ip, port, quorum string) *StringCmd {
404 cmd := NewStringCmd(ctx, "sentinel", "monitor", name, ip, port, quorum)
405 _ = c.Process(ctx, cmd)
406 return cmd
407}
408
409// Set is used in order to change configuration parameters of a specific master.
410func (c *SentinelClient) Set(ctx context.Context, name, option, value string) *StringCmd {
411 cmd := NewStringCmd(ctx, "sentinel", "set", name, option, value)
412 _ = c.Process(ctx, cmd)
413 return cmd
414}
415
416// Remove is used in order to remove the specified master: the master will no
417// longer be monitored, and will totally be removed from the internal state of
418// the Sentinel.
419func (c *SentinelClient) Remove(ctx context.Context, name string) *StringCmd {
420 cmd := NewStringCmd(ctx, "sentinel", "remove", name)
421 _ = c.Process(ctx, cmd)
422 return cmd
423}
424
425//------------------------------------------------------------------------------
426
427type sentinelFailover struct {
428 opt *FailoverOptions
429
430 sentinelAddrs []string
431
432 onFailover func(ctx context.Context, addr string)
433 onUpdate func(ctx context.Context)
434
435 mu sync.RWMutex
436 _masterAddr string
437 sentinel *SentinelClient
438 pubsub *PubSub
439}
440
441func (c *sentinelFailover) Close() error {
442 c.mu.Lock()
443 defer c.mu.Unlock()
444 if c.sentinel != nil {
445 return c.closeSentinel()
446 }
447 return nil
448}
449
450func (c *sentinelFailover) closeSentinel() error {
451 firstErr := c.pubsub.Close()
452 c.pubsub = nil
453
454 err := c.sentinel.Close()
455 if err != nil && firstErr == nil {
456 firstErr = err
457 }
458 c.sentinel = nil
459
460 return firstErr
461}
462
463func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
464 if c.opt == nil {
465 return "", errors.New("opt is nil")
466 }
467
468 addresses, err := c.slaveAddrs(ctx, false)
469 if err != nil {
470 return "", err
471 }
472
473 if len(addresses) == 0 && c.opt.UseDisconnectedSlaves {
474 addresses, err = c.slaveAddrs(ctx, true)
475 if err != nil {
476 return "", err
477 }
478 }
479
480 if len(addresses) == 0 {
481 return c.MasterAddr(ctx)
482 }
483 return addresses[rand.Intn(len(addresses))], nil
484}
485
486func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
487 c.mu.RLock()
488 sentinel := c.sentinel
489 c.mu.RUnlock()
490
491 if sentinel != nil {
492 addr := c.getMasterAddr(ctx, sentinel)
493 if addr != "" {
494 return addr, nil
495 }
496 }
497
498 c.mu.Lock()
499 defer c.mu.Unlock()
500
501 if c.sentinel != nil {
502 addr := c.getMasterAddr(ctx, c.sentinel)
503 if addr != "" {
504 return addr, nil
505 }
506 _ = c.closeSentinel()
507 }
508
509 for i, sentinelAddr := range c.sentinelAddrs {
510 sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
511
512 masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
513 if err != nil {
514 internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
515 c.opt.MasterName, err)
516 _ = sentinel.Close()
517 continue
518 }
519
520 // Push working sentinel to the top.
521 c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
522 c.setSentinel(ctx, sentinel)
523
524 addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
525 return addr, nil
526 }
527
528 return "", errors.New("redis: all sentinels specified in configuration are unreachable")
529}
530
531func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
532 c.mu.RLock()
533 sentinel := c.sentinel
534 c.mu.RUnlock()
535
536 if sentinel != nil {
537 addrs := c.getSlaveAddrs(ctx, sentinel)
538 if len(addrs) > 0 {
539 return addrs, nil
540 }
541 }
542
543 c.mu.Lock()
544 defer c.mu.Unlock()
545
546 if c.sentinel != nil {
547 addrs := c.getSlaveAddrs(ctx, c.sentinel)
548 if len(addrs) > 0 {
549 return addrs, nil
550 }
551 _ = c.closeSentinel()
552 }
553
554 var sentinelReachable bool
555
556 for i, sentinelAddr := range c.sentinelAddrs {
557 sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
558
559 slaves, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
560 if err != nil {
561 internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s",
562 c.opt.MasterName, err)
563 _ = sentinel.Close()
564 continue
565 }
566 sentinelReachable = true
567 addrs := parseSlaveAddrs(slaves, useDisconnected)
568 if len(addrs) == 0 {
569 continue
570 }
571 // Push working sentinel to the top.
572 c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
573 c.setSentinel(ctx, sentinel)
574
575 return addrs, nil
576 }
577
578 if sentinelReachable {
579 return []string{}, nil
580 }
581 return []string{}, errors.New("redis: all sentinels specified in configuration are unreachable")
582}
583
584func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
585 addr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
586 if err != nil {
587 internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
588 c.opt.MasterName, err)
589 return ""
590 }
591 return net.JoinHostPort(addr[0], addr[1])
592}
593
594func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string {
595 addrs, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
596 if err != nil {
597 internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
598 c.opt.MasterName, err)
599 return []string{}
600 }
601 return parseSlaveAddrs(addrs, false)
602}
603
604func parseSlaveAddrs(addrs []interface{}, keepDisconnected bool) []string {
605 nodes := make([]string, 0, len(addrs))
606 for _, node := range addrs {
607 ip := ""
608 port := ""
609 flags := []string{}
610 lastkey := ""
611 isDown := false
612
613 for _, key := range node.([]interface{}) {
614 switch lastkey {
615 case "ip":
616 ip = key.(string)
617 case "port":
618 port = key.(string)
619 case "flags":
620 flags = strings.Split(key.(string), ",")
621 }
622 lastkey = key.(string)
623 }
624
625 for _, flag := range flags {
626 switch flag {
627 case "s_down", "o_down":
628 isDown = true
629 case "disconnected":
630 if !keepDisconnected {
631 isDown = true
632 }
633 }
634 }
635
636 if !isDown {
637 nodes = append(nodes, net.JoinHostPort(ip, port))
638 }
639 }
640
641 return nodes
642}
643
644func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
645 c.mu.RLock()
646 currentAddr := c._masterAddr //nolint:ifshort
647 c.mu.RUnlock()
648
649 if addr == currentAddr {
650 return
651 }
652
653 c.mu.Lock()
654 defer c.mu.Unlock()
655
656 if addr == c._masterAddr {
657 return
658 }
659 c._masterAddr = addr
660
661 internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
662 c.opt.MasterName, addr)
663 if c.onFailover != nil {
664 c.onFailover(ctx, addr)
665 }
666}
667
668func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) {
669 if c.sentinel != nil {
670 panic("not reached")
671 }
672 c.sentinel = sentinel
673 c.discoverSentinels(ctx)
674
675 c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+slave-reconf-done")
676 go c.listen(c.pubsub)
677}
678
679func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
680 sentinels, err := c.sentinel.Sentinels(ctx, c.opt.MasterName).Result()
681 if err != nil {
682 internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.opt.MasterName, err)
683 return
684 }
685 for _, sentinel := range sentinels {
686 vals := sentinel.([]interface{})
687 var ip, port string
688 for i := 0; i < len(vals); i += 2 {
689 key := vals[i].(string)
690 switch key {
691 case "ip":
692 ip = vals[i+1].(string)
693 case "port":
694 port = vals[i+1].(string)
695 }
696 }
697 if ip != "" && port != "" {
698 sentinelAddr := net.JoinHostPort(ip, port)
699 if !contains(c.sentinelAddrs, sentinelAddr) {
700 internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q",
701 sentinelAddr, c.opt.MasterName)
702 c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
703 }
704 }
705 }
706}
707
708func (c *sentinelFailover) listen(pubsub *PubSub) {
709 ctx := context.TODO()
710
711 if c.onUpdate != nil {
712 c.onUpdate(ctx)
713 }
714
715 ch := pubsub.Channel()
716 for msg := range ch {
717 if msg.Channel == "+switch-master" {
718 parts := strings.Split(msg.Payload, " ")
719 if parts[0] != c.opt.MasterName {
720 internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0])
721 continue
722 }
723 addr := net.JoinHostPort(parts[3], parts[4])
724 c.trySwitchMaster(pubsub.getContext(), addr)
725 }
726
727 if c.onUpdate != nil {
728 c.onUpdate(ctx)
729 }
730 }
731}
732
733func contains(slice []string, str string) bool {
734 for _, s := range slice {
735 if s == str {
736 return true
737 }
738 }
739 return false
740}
741
742//------------------------------------------------------------------------------
743
744// NewFailoverClusterClient returns a client that supports routing read-only commands
745// to a slave node.
746func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
747 sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
748 copy(sentinelAddrs, failoverOpt.SentinelAddrs)
749
750 failover := &sentinelFailover{
751 opt: failoverOpt,
752 sentinelAddrs: sentinelAddrs,
753 }
754
755 opt := failoverOpt.clusterOptions()
756 opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
757 masterAddr, err := failover.MasterAddr(ctx)
758 if err != nil {
759 return nil, err
760 }
761
762 nodes := []ClusterNode{{
763 Addr: masterAddr,
764 }}
765
766 slaveAddrs, err := failover.slaveAddrs(ctx, false)
767 if err != nil {
768 return nil, err
769 }
770
771 for _, slaveAddr := range slaveAddrs {
772 nodes = append(nodes, ClusterNode{
773 Addr: slaveAddr,
774 })
775 }
776
777 slots := []ClusterSlot{
778 {
779 Start: 0,
780 End: 16383,
781 Nodes: nodes,
782 },
783 }
784 return slots, nil
785 }
786
787 c := NewClusterClient(opt)
788
789 failover.mu.Lock()
790 failover.onUpdate = func(ctx context.Context) {
791 c.ReloadState(ctx)
792 }
793 failover.mu.Unlock()
794
795 return c
796}