blob: 7db984373f939fedf13daf83c9edecc02e24a8c0 [file] [log] [blame]
serkant.uluderyae5afeff2021-02-23 18:00:23 +03001package redis
2
3import (
4 "context"
5 "crypto/tls"
6 "errors"
7 "net"
8 "strings"
9 "sync"
10 "time"
11
12 "github.com/go-redis/redis/v8/internal"
13 "github.com/go-redis/redis/v8/internal/pool"
14 "github.com/go-redis/redis/v8/internal/rand"
15)
16
17//------------------------------------------------------------------------------
18
19// FailoverOptions are used to configure a failover client and should
20// be passed to NewFailoverClient.
21type FailoverOptions struct {
22 // The master name.
23 MasterName string
24 // A seed list of host:port addresses of sentinel nodes.
25 SentinelAddrs []string
26 // Sentinel password from "requirepass <password>" (if enabled) in Sentinel configuration
27 SentinelPassword string
28
29 // Allows routing read-only commands to the closest master or slave node.
30 // This option only works with NewFailoverClusterClient.
31 RouteByLatency bool
32 // Allows routing read-only commands to the random master or slave node.
33 // This option only works with NewFailoverClusterClient.
34 RouteRandomly bool
35
36 // Route all commands to slave read-only nodes.
37 SlaveOnly bool
38
39 // Following options are copied from Options struct.
40
41 Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
42 OnConnect func(ctx context.Context, cn *Conn) error
43
44 Username string
45 Password string
46 DB int
47
48 MaxRetries int
49 MinRetryBackoff time.Duration
50 MaxRetryBackoff time.Duration
51
52 DialTimeout time.Duration
53 ReadTimeout time.Duration
54 WriteTimeout time.Duration
55
56 PoolSize int
57 MinIdleConns int
58 MaxConnAge time.Duration
59 PoolTimeout time.Duration
60 IdleTimeout time.Duration
61 IdleCheckFrequency time.Duration
62
63 TLSConfig *tls.Config
64}
65
66func (opt *FailoverOptions) clientOptions() *Options {
67 return &Options{
68 Addr: "FailoverClient",
69
70 Dialer: opt.Dialer,
71 OnConnect: opt.OnConnect,
72
73 DB: opt.DB,
74 Username: opt.Username,
75 Password: opt.Password,
76
77 MaxRetries: opt.MaxRetries,
78 MinRetryBackoff: opt.MinRetryBackoff,
79 MaxRetryBackoff: opt.MaxRetryBackoff,
80
81 DialTimeout: opt.DialTimeout,
82 ReadTimeout: opt.ReadTimeout,
83 WriteTimeout: opt.WriteTimeout,
84
85 PoolSize: opt.PoolSize,
86 PoolTimeout: opt.PoolTimeout,
87 IdleTimeout: opt.IdleTimeout,
88 IdleCheckFrequency: opt.IdleCheckFrequency,
89 MinIdleConns: opt.MinIdleConns,
90 MaxConnAge: opt.MaxConnAge,
91
92 TLSConfig: opt.TLSConfig,
93 }
94}
95
96func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
97 return &Options{
98 Addr: addr,
99
100 Dialer: opt.Dialer,
101 OnConnect: opt.OnConnect,
102
103 DB: 0,
104 Password: opt.SentinelPassword,
105
106 MaxRetries: opt.MaxRetries,
107 MinRetryBackoff: opt.MinRetryBackoff,
108 MaxRetryBackoff: opt.MaxRetryBackoff,
109
110 DialTimeout: opt.DialTimeout,
111 ReadTimeout: opt.ReadTimeout,
112 WriteTimeout: opt.WriteTimeout,
113
114 PoolSize: opt.PoolSize,
115 PoolTimeout: opt.PoolTimeout,
116 IdleTimeout: opt.IdleTimeout,
117 IdleCheckFrequency: opt.IdleCheckFrequency,
118 MinIdleConns: opt.MinIdleConns,
119 MaxConnAge: opt.MaxConnAge,
120
121 TLSConfig: opt.TLSConfig,
122 }
123}
124
125func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
126 return &ClusterOptions{
127 Dialer: opt.Dialer,
128 OnConnect: opt.OnConnect,
129
130 Username: opt.Username,
131 Password: opt.Password,
132
133 MaxRedirects: opt.MaxRetries,
134
135 RouteByLatency: opt.RouteByLatency,
136 RouteRandomly: opt.RouteRandomly,
137
138 MinRetryBackoff: opt.MinRetryBackoff,
139 MaxRetryBackoff: opt.MaxRetryBackoff,
140
141 DialTimeout: opt.DialTimeout,
142 ReadTimeout: opt.ReadTimeout,
143 WriteTimeout: opt.WriteTimeout,
144
145 PoolSize: opt.PoolSize,
146 PoolTimeout: opt.PoolTimeout,
147 IdleTimeout: opt.IdleTimeout,
148 IdleCheckFrequency: opt.IdleCheckFrequency,
149 MinIdleConns: opt.MinIdleConns,
150 MaxConnAge: opt.MaxConnAge,
151
152 TLSConfig: opt.TLSConfig,
153 }
154}
155
156// NewFailoverClient returns a Redis client that uses Redis Sentinel
157// for automatic failover. It's safe for concurrent use by multiple
158// goroutines.
159func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
160 if failoverOpt.RouteByLatency {
161 panic("to route commands by latency, use NewFailoverClusterClient")
162 }
163 if failoverOpt.RouteRandomly {
164 panic("to route commands randomly, use NewFailoverClusterClient")
165 }
166
167 sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
168 copy(sentinelAddrs, failoverOpt.SentinelAddrs)
169
170 failover := &sentinelFailover{
171 opt: failoverOpt,
172 sentinelAddrs: sentinelAddrs,
173 }
174
175 opt := failoverOpt.clientOptions()
176 opt.Dialer = masterSlaveDialer(failover)
177 opt.init()
178
179 connPool := newConnPool(opt)
180 failover.onFailover = func(ctx context.Context, addr string) {
181 _ = connPool.Filter(func(cn *pool.Conn) bool {
182 return cn.RemoteAddr().String() != addr
183 })
184 }
185
186 c := Client{
187 baseClient: newBaseClient(opt, connPool),
188 ctx: context.Background(),
189 }
190 c.cmdable = c.Process
191 c.onClose = failover.Close
192
193 return &c
194}
195
196func masterSlaveDialer(
197 failover *sentinelFailover,
198) func(ctx context.Context, network, addr string) (net.Conn, error) {
199 return func(ctx context.Context, network, _ string) (net.Conn, error) {
200 var addr string
201 var err error
202
203 if failover.opt.SlaveOnly {
204 addr, err = failover.RandomSlaveAddr(ctx)
205 } else {
206 addr, err = failover.MasterAddr(ctx)
207 if err == nil {
208 failover.trySwitchMaster(ctx, addr)
209 }
210 }
211
212 if err != nil {
213 return nil, err
214 }
215 if failover.opt.Dialer != nil {
216 return failover.opt.Dialer(ctx, network, addr)
217 }
218 return net.DialTimeout("tcp", addr, failover.opt.DialTimeout)
219 }
220}
221
222//------------------------------------------------------------------------------
223
224// SentinelClient is a client for a Redis Sentinel.
225type SentinelClient struct {
226 *baseClient
227 hooks
228 ctx context.Context
229}
230
231func NewSentinelClient(opt *Options) *SentinelClient {
232 opt.init()
233 c := &SentinelClient{
234 baseClient: &baseClient{
235 opt: opt,
236 connPool: newConnPool(opt),
237 },
238 ctx: context.Background(),
239 }
240 return c
241}
242
243func (c *SentinelClient) Context() context.Context {
244 return c.ctx
245}
246
247func (c *SentinelClient) WithContext(ctx context.Context) *SentinelClient {
248 if ctx == nil {
249 panic("nil context")
250 }
251 clone := *c
252 clone.ctx = ctx
253 return &clone
254}
255
256func (c *SentinelClient) Process(ctx context.Context, cmd Cmder) error {
257 return c.hooks.process(ctx, cmd, c.baseClient.process)
258}
259
260func (c *SentinelClient) pubSub() *PubSub {
261 pubsub := &PubSub{
262 opt: c.opt,
263
264 newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
265 return c.newConn(ctx)
266 },
267 closeConn: c.connPool.CloseConn,
268 }
269 pubsub.init()
270 return pubsub
271}
272
273// Ping is used to test if a connection is still alive, or to
274// measure latency.
275func (c *SentinelClient) Ping(ctx context.Context) *StringCmd {
276 cmd := NewStringCmd(ctx, "ping")
277 _ = c.Process(ctx, cmd)
278 return cmd
279}
280
281// Subscribe subscribes the client to the specified channels.
282// Channels can be omitted to create empty subscription.
283func (c *SentinelClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
284 pubsub := c.pubSub()
285 if len(channels) > 0 {
286 _ = pubsub.Subscribe(ctx, channels...)
287 }
288 return pubsub
289}
290
291// PSubscribe subscribes the client to the given patterns.
292// Patterns can be omitted to create empty subscription.
293func (c *SentinelClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
294 pubsub := c.pubSub()
295 if len(channels) > 0 {
296 _ = pubsub.PSubscribe(ctx, channels...)
297 }
298 return pubsub
299}
300
301func (c *SentinelClient) GetMasterAddrByName(ctx context.Context, name string) *StringSliceCmd {
302 cmd := NewStringSliceCmd(ctx, "sentinel", "get-master-addr-by-name", name)
303 _ = c.Process(ctx, cmd)
304 return cmd
305}
306
307func (c *SentinelClient) Sentinels(ctx context.Context, name string) *SliceCmd {
308 cmd := NewSliceCmd(ctx, "sentinel", "sentinels", name)
309 _ = c.Process(ctx, cmd)
310 return cmd
311}
312
313// Failover forces a failover as if the master was not reachable, and without
314// asking for agreement to other Sentinels.
315func (c *SentinelClient) Failover(ctx context.Context, name string) *StatusCmd {
316 cmd := NewStatusCmd(ctx, "sentinel", "failover", name)
317 _ = c.Process(ctx, cmd)
318 return cmd
319}
320
321// Reset resets all the masters with matching name. The pattern argument is a
322// glob-style pattern. The reset process clears any previous state in a master
323// (including a failover in progress), and removes every slave and sentinel
324// already discovered and associated with the master.
325func (c *SentinelClient) Reset(ctx context.Context, pattern string) *IntCmd {
326 cmd := NewIntCmd(ctx, "sentinel", "reset", pattern)
327 _ = c.Process(ctx, cmd)
328 return cmd
329}
330
331// FlushConfig forces Sentinel to rewrite its configuration on disk, including
332// the current Sentinel state.
333func (c *SentinelClient) FlushConfig(ctx context.Context) *StatusCmd {
334 cmd := NewStatusCmd(ctx, "sentinel", "flushconfig")
335 _ = c.Process(ctx, cmd)
336 return cmd
337}
338
339// Master shows the state and info of the specified master.
340func (c *SentinelClient) Master(ctx context.Context, name string) *StringStringMapCmd {
341 cmd := NewStringStringMapCmd(ctx, "sentinel", "master", name)
342 _ = c.Process(ctx, cmd)
343 return cmd
344}
345
346// Masters shows a list of monitored masters and their state.
347func (c *SentinelClient) Masters(ctx context.Context) *SliceCmd {
348 cmd := NewSliceCmd(ctx, "sentinel", "masters")
349 _ = c.Process(ctx, cmd)
350 return cmd
351}
352
353// Slaves shows a list of slaves for the specified master and their state.
354func (c *SentinelClient) Slaves(ctx context.Context, name string) *SliceCmd {
355 cmd := NewSliceCmd(ctx, "sentinel", "slaves", name)
356 _ = c.Process(ctx, cmd)
357 return cmd
358}
359
360// CkQuorum checks if the current Sentinel configuration is able to reach the
361// quorum needed to failover a master, and the majority needed to authorize the
362// failover. This command should be used in monitoring systems to check if a
363// Sentinel deployment is ok.
364func (c *SentinelClient) CkQuorum(ctx context.Context, name string) *StringCmd {
365 cmd := NewStringCmd(ctx, "sentinel", "ckquorum", name)
366 _ = c.Process(ctx, cmd)
367 return cmd
368}
369
370// Monitor tells the Sentinel to start monitoring a new master with the specified
371// name, ip, port, and quorum.
372func (c *SentinelClient) Monitor(ctx context.Context, name, ip, port, quorum string) *StringCmd {
373 cmd := NewStringCmd(ctx, "sentinel", "monitor", name, ip, port, quorum)
374 _ = c.Process(ctx, cmd)
375 return cmd
376}
377
378// Set is used in order to change configuration parameters of a specific master.
379func (c *SentinelClient) Set(ctx context.Context, name, option, value string) *StringCmd {
380 cmd := NewStringCmd(ctx, "sentinel", "set", name, option, value)
381 _ = c.Process(ctx, cmd)
382 return cmd
383}
384
385// Remove is used in order to remove the specified master: the master will no
386// longer be monitored, and will totally be removed from the internal state of
387// the Sentinel.
388func (c *SentinelClient) Remove(ctx context.Context, name string) *StringCmd {
389 cmd := NewStringCmd(ctx, "sentinel", "remove", name)
390 _ = c.Process(ctx, cmd)
391 return cmd
392}
393
394//------------------------------------------------------------------------------
395
396type sentinelFailover struct {
397 opt *FailoverOptions
398
399 sentinelAddrs []string
400
401 onFailover func(ctx context.Context, addr string)
402 onUpdate func(ctx context.Context)
403
404 mu sync.RWMutex
405 _masterAddr string
406 sentinel *SentinelClient
407 pubsub *PubSub
408}
409
410func (c *sentinelFailover) Close() error {
411 c.mu.Lock()
412 defer c.mu.Unlock()
413 if c.sentinel != nil {
414 return c.closeSentinel()
415 }
416 return nil
417}
418
419func (c *sentinelFailover) closeSentinel() error {
420 firstErr := c.pubsub.Close()
421 c.pubsub = nil
422
423 err := c.sentinel.Close()
424 if err != nil && firstErr == nil {
425 firstErr = err
426 }
427 c.sentinel = nil
428
429 return firstErr
430}
431
432func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
433 addresses, err := c.slaveAddrs(ctx)
434 if err != nil {
435 return "", err
436 }
437 if len(addresses) == 0 {
438 return c.MasterAddr(ctx)
439 }
440 return addresses[rand.Intn(len(addresses))], nil
441}
442
443func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
444 c.mu.RLock()
445 sentinel := c.sentinel
446 c.mu.RUnlock()
447
448 if sentinel != nil {
449 addr := c.getMasterAddr(ctx, sentinel)
450 if addr != "" {
451 return addr, nil
452 }
453 }
454
455 c.mu.Lock()
456 defer c.mu.Unlock()
457
458 if c.sentinel != nil {
459 addr := c.getMasterAddr(ctx, c.sentinel)
460 if addr != "" {
461 return addr, nil
462 }
463 _ = c.closeSentinel()
464 }
465
466 for i, sentinelAddr := range c.sentinelAddrs {
467 sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
468
469 masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
470 if err != nil {
471 internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
472 c.opt.MasterName, err)
473 _ = sentinel.Close()
474 continue
475 }
476
477 // Push working sentinel to the top.
478 c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
479 c.setSentinel(ctx, sentinel)
480
481 addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
482 return addr, nil
483 }
484
485 return "", errors.New("redis: all sentinels are unreachable")
486}
487
488func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) {
489 c.mu.RLock()
490 sentinel := c.sentinel
491 c.mu.RUnlock()
492
493 if sentinel != nil {
494 addrs := c.getSlaveAddrs(ctx, sentinel)
495 if len(addrs) > 0 {
496 return addrs, nil
497 }
498 }
499
500 c.mu.Lock()
501 defer c.mu.Unlock()
502
503 if c.sentinel != nil {
504 addrs := c.getSlaveAddrs(ctx, c.sentinel)
505 if len(addrs) > 0 {
506 return addrs, nil
507 }
508 _ = c.closeSentinel()
509 }
510
511 for i, sentinelAddr := range c.sentinelAddrs {
512 sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
513
514 slaves, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
515 if err != nil {
516 internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s",
517 c.opt.MasterName, err)
518 _ = sentinel.Close()
519 continue
520 }
521
522 // Push working sentinel to the top.
523 c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
524 c.setSentinel(ctx, sentinel)
525
526 addrs := parseSlaveAddrs(slaves)
527 return addrs, nil
528 }
529
530 return []string{}, errors.New("redis: all sentinels are unreachable")
531}
532
533func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
534 addr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
535 if err != nil {
536 internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
537 c.opt.MasterName, err)
538 return ""
539 }
540 return net.JoinHostPort(addr[0], addr[1])
541}
542
543func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string {
544 addrs, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
545 if err != nil {
546 internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
547 c.opt.MasterName, err)
548 return []string{}
549 }
550 return parseSlaveAddrs(addrs)
551}
552
553func parseSlaveAddrs(addrs []interface{}) []string {
554 nodes := make([]string, 0, len(addrs))
555
556 for _, node := range addrs {
557 ip := ""
558 port := ""
559 flags := []string{}
560 lastkey := ""
561 isDown := false
562
563 for _, key := range node.([]interface{}) {
564 switch lastkey {
565 case "ip":
566 ip = key.(string)
567 case "port":
568 port = key.(string)
569 case "flags":
570 flags = strings.Split(key.(string), ",")
571 }
572 lastkey = key.(string)
573 }
574
575 for _, flag := range flags {
576 switch flag {
577 case "s_down", "o_down", "disconnected":
578 isDown = true
579 }
580 }
581
582 if !isDown {
583 nodes = append(nodes, net.JoinHostPort(ip, port))
584 }
585 }
586
587 return nodes
588}
589
590func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
591 c.mu.RLock()
592 currentAddr := c._masterAddr
593 c.mu.RUnlock()
594
595 if addr == currentAddr {
596 return
597 }
598
599 c.mu.Lock()
600 defer c.mu.Unlock()
601
602 if addr == c._masterAddr {
603 return
604 }
605 c._masterAddr = addr
606
607 internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
608 c.opt.MasterName, addr)
609 if c.onFailover != nil {
610 c.onFailover(ctx, addr)
611 }
612}
613
614func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) {
615 if c.sentinel != nil {
616 panic("not reached")
617 }
618 c.sentinel = sentinel
619 c.discoverSentinels(ctx)
620
621 c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+slave-reconf-done")
622 go c.listen(c.pubsub)
623}
624
625func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
626 sentinels, err := c.sentinel.Sentinels(ctx, c.opt.MasterName).Result()
627 if err != nil {
628 internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.opt.MasterName, err)
629 return
630 }
631 for _, sentinel := range sentinels {
632 vals := sentinel.([]interface{})
633 for i := 0; i < len(vals); i += 2 {
634 key := vals[i].(string)
635 if key == "name" {
636 sentinelAddr := vals[i+1].(string)
637 if !contains(c.sentinelAddrs, sentinelAddr) {
638 internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q",
639 sentinelAddr, c.opt.MasterName)
640 c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
641 }
642 }
643 }
644 }
645}
646
647func (c *sentinelFailover) listen(pubsub *PubSub) {
648 ctx := context.TODO()
649 if c.onUpdate != nil {
650 c.onUpdate(ctx)
651 }
652
653 ch := pubsub.Channel()
654 for msg := range ch {
655 if msg.Channel == "+switch-master" {
656 parts := strings.Split(msg.Payload, " ")
657 if parts[0] != c.opt.MasterName {
658 internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0])
659 continue
660 }
661 addr := net.JoinHostPort(parts[3], parts[4])
662 c.trySwitchMaster(pubsub.getContext(), addr)
663 }
664
665 if c.onUpdate != nil {
666 c.onUpdate(ctx)
667 }
668 }
669}
670
671func contains(slice []string, str string) bool {
672 for _, s := range slice {
673 if s == str {
674 return true
675 }
676 }
677 return false
678}
679
680//------------------------------------------------------------------------------
681
682// NewFailoverClusterClient returns a client that supports routing read-only commands
683// to a slave node.
684func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
685 sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
686 copy(sentinelAddrs, failoverOpt.SentinelAddrs)
687
688 failover := &sentinelFailover{
689 opt: failoverOpt,
690 sentinelAddrs: sentinelAddrs,
691 }
692
693 opt := failoverOpt.clusterOptions()
694 opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
695 masterAddr, err := failover.MasterAddr(ctx)
696 if err != nil {
697 return nil, err
698 }
699
700 nodes := []ClusterNode{{
701 Addr: masterAddr,
702 }}
703
704 slaveAddrs, err := failover.slaveAddrs(ctx)
705 if err != nil {
706 return nil, err
707 }
708
709 for _, slaveAddr := range slaveAddrs {
710 nodes = append(nodes, ClusterNode{
711 Addr: slaveAddr,
712 })
713 }
714
715 slots := []ClusterSlot{
716 {
717 Start: 0,
718 End: 16383,
719 Nodes: nodes,
720 },
721 }
722 return slots, nil
723 }
724
725 c := NewClusterClient(opt)
726 failover.onUpdate = func(ctx context.Context) {
727 c.ReloadState(ctx)
728 }
729
730 return c
731}