blob: a4abe32c3a04e76984b6a07feca6a8d393acdb89 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301package redis
2
3import (
4 "context"
5 "crypto/tls"
6 "errors"
7 "fmt"
8 "net"
9 "net/url"
10 "runtime"
11 "sort"
12 "strconv"
13 "strings"
14 "time"
15
16 "github.com/go-redis/redis/v8/internal/pool"
17)
18
19// Limiter is the interface of a rate limiter or a circuit breaker.
20type Limiter interface {
21 // Allow returns nil if operation is allowed or an error otherwise.
22 // If operation is allowed client must ReportResult of the operation
23 // whether it is a success or a failure.
24 Allow() error
25 // ReportResult reports the result of the previously allowed operation.
26 // nil indicates a success, non-nil error usually indicates a failure.
27 ReportResult(result error)
28}
29
30// Options keeps the settings to setup redis connection.
31type Options struct {
32 // The network type, either tcp or unix.
33 // Default is tcp.
34 Network string
35 // host:port address.
36 Addr string
37
38 // Dialer creates new network connection and has priority over
39 // Network and Addr options.
40 Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
41
42 // Hook that is called when new connection is established.
43 OnConnect func(ctx context.Context, cn *Conn) error
44
45 // Use the specified Username to authenticate the current connection
46 // with one of the connections defined in the ACL list when connecting
47 // to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
48 Username string
49 // Optional password. Must match the password specified in the
50 // requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower),
51 // or the User Password when connecting to a Redis 6.0 instance, or greater,
52 // that is using the Redis ACL system.
53 Password string
54
55 // Database to be selected after connecting to the server.
56 DB int
57
58 // Maximum number of retries before giving up.
59 // Default is 3 retries; -1 (not 0) disables retries.
60 MaxRetries int
61 // Minimum backoff between each retry.
62 // Default is 8 milliseconds; -1 disables backoff.
63 MinRetryBackoff time.Duration
64 // Maximum backoff between each retry.
65 // Default is 512 milliseconds; -1 disables backoff.
66 MaxRetryBackoff time.Duration
67
68 // Dial timeout for establishing new connections.
69 // Default is 5 seconds.
70 DialTimeout time.Duration
71 // Timeout for socket reads. If reached, commands will fail
72 // with a timeout instead of blocking. Use value -1 for no timeout and 0 for default.
73 // Default is 3 seconds.
74 ReadTimeout time.Duration
75 // Timeout for socket writes. If reached, commands will fail
76 // with a timeout instead of blocking.
77 // Default is ReadTimeout.
78 WriteTimeout time.Duration
79
80 // Type of connection pool.
81 // true for FIFO pool, false for LIFO pool.
82 // Note that fifo has higher overhead compared to lifo.
83 PoolFIFO bool
84 // Maximum number of socket connections.
85 // Default is 10 connections per every available CPU as reported by runtime.GOMAXPROCS.
86 PoolSize int
87 // Minimum number of idle connections which is useful when establishing
88 // new connection is slow.
89 MinIdleConns int
90 // Connection age at which client retires (closes) the connection.
91 // Default is to not close aged connections.
92 MaxConnAge time.Duration
93 // Amount of time client waits for connection if all connections
94 // are busy before returning an error.
95 // Default is ReadTimeout + 1 second.
96 PoolTimeout time.Duration
97 // Amount of time after which client closes idle connections.
98 // Should be less than server's timeout.
99 // Default is 5 minutes. -1 disables idle timeout check.
100 IdleTimeout time.Duration
101 // Frequency of idle checks made by idle connections reaper.
102 // Default is 1 minute. -1 disables idle connections reaper,
103 // but idle connections are still discarded by the client
104 // if IdleTimeout is set.
105 IdleCheckFrequency time.Duration
106
107 // Enables read only queries on slave nodes.
108 readOnly bool
109
110 // TLS Config to use. When set TLS will be negotiated.
111 TLSConfig *tls.Config
112
113 // Limiter interface used to implemented circuit breaker or rate limiter.
114 Limiter Limiter
115}
116
117func (opt *Options) init() {
118 if opt.Addr == "" {
119 opt.Addr = "localhost:6379"
120 }
121 if opt.Network == "" {
122 if strings.HasPrefix(opt.Addr, "/") {
123 opt.Network = "unix"
124 } else {
125 opt.Network = "tcp"
126 }
127 }
128 if opt.DialTimeout == 0 {
129 opt.DialTimeout = 5 * time.Second
130 }
131 if opt.Dialer == nil {
132 opt.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
133 netDialer := &net.Dialer{
134 Timeout: opt.DialTimeout,
135 KeepAlive: 5 * time.Minute,
136 }
137 if opt.TLSConfig == nil {
138 return netDialer.DialContext(ctx, network, addr)
139 }
140 return tls.DialWithDialer(netDialer, network, addr, opt.TLSConfig)
141 }
142 }
143 if opt.PoolSize == 0 {
144 opt.PoolSize = 10 * runtime.GOMAXPROCS(0)
145 }
146 switch opt.ReadTimeout {
147 case -1:
148 opt.ReadTimeout = 0
149 case 0:
150 opt.ReadTimeout = 3 * time.Second
151 }
152 switch opt.WriteTimeout {
153 case -1:
154 opt.WriteTimeout = 0
155 case 0:
156 opt.WriteTimeout = opt.ReadTimeout
157 }
158 if opt.PoolTimeout == 0 {
159 opt.PoolTimeout = opt.ReadTimeout + time.Second
160 }
161 if opt.IdleTimeout == 0 {
162 opt.IdleTimeout = 5 * time.Minute
163 }
164 if opt.IdleCheckFrequency == 0 {
165 opt.IdleCheckFrequency = time.Minute
166 }
167
168 if opt.MaxRetries == -1 {
169 opt.MaxRetries = 0
170 } else if opt.MaxRetries == 0 {
171 opt.MaxRetries = 3
172 }
173 switch opt.MinRetryBackoff {
174 case -1:
175 opt.MinRetryBackoff = 0
176 case 0:
177 opt.MinRetryBackoff = 8 * time.Millisecond
178 }
179 switch opt.MaxRetryBackoff {
180 case -1:
181 opt.MaxRetryBackoff = 0
182 case 0:
183 opt.MaxRetryBackoff = 512 * time.Millisecond
184 }
185}
186
187func (opt *Options) clone() *Options {
188 clone := *opt
189 return &clone
190}
191
192// ParseURL parses an URL into Options that can be used to connect to Redis.
193// Scheme is required.
194// There are two connection types: by tcp socket and by unix socket.
195// Tcp connection:
196// redis://<user>:<password>@<host>:<port>/<db_number>
197// Unix connection:
198// unix://<user>:<password>@</path/to/redis.sock>?db=<db_number>
199// Most Option fields can be set using query parameters, with the following restrictions:
200// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
201// - only scalar type fields are supported (bool, int, time.Duration)
202// - for time.Duration fields, values must be a valid input for time.ParseDuration();
203// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
204// - to disable a duration field, use value less than or equal to 0; to use the default
205// value, leave the value blank or remove the parameter
206// - only the last value is interpreted if a parameter is given multiple times
207// - fields "network", "addr", "username" and "password" can only be set using other
208// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
209// names will be treated as unknown parameters
210// - unknown parameter names will result in an error
211// Examples:
212// redis://user:password@localhost:6789/3?dial_timeout=3&db=1&read_timeout=6s&max_retries=2
213// is equivalent to:
214// &Options{
215// Network: "tcp",
216// Addr: "localhost:6789",
217// DB: 1, // path "/3" was overridden by "&db=1"
218// DialTimeout: 3 * time.Second, // no time unit = seconds
219// ReadTimeout: 6 * time.Second,
220// MaxRetries: 2,
221// }
222func ParseURL(redisURL string) (*Options, error) {
223 u, err := url.Parse(redisURL)
224 if err != nil {
225 return nil, err
226 }
227
228 switch u.Scheme {
229 case "redis", "rediss":
230 return setupTCPConn(u)
231 case "unix":
232 return setupUnixConn(u)
233 default:
234 return nil, fmt.Errorf("redis: invalid URL scheme: %s", u.Scheme)
235 }
236}
237
238func setupTCPConn(u *url.URL) (*Options, error) {
239 o := &Options{Network: "tcp"}
240
241 o.Username, o.Password = getUserPassword(u)
242
243 h, p, err := net.SplitHostPort(u.Host)
244 if err != nil {
245 h = u.Host
246 }
247 if h == "" {
248 h = "localhost"
249 }
250 if p == "" {
251 p = "6379"
252 }
253 o.Addr = net.JoinHostPort(h, p)
254
255 f := strings.FieldsFunc(u.Path, func(r rune) bool {
256 return r == '/'
257 })
258 switch len(f) {
259 case 0:
260 o.DB = 0
261 case 1:
262 if o.DB, err = strconv.Atoi(f[0]); err != nil {
263 return nil, fmt.Errorf("redis: invalid database number: %q", f[0])
264 }
265 default:
266 return nil, fmt.Errorf("redis: invalid URL path: %s", u.Path)
267 }
268
269 if u.Scheme == "rediss" {
270 o.TLSConfig = &tls.Config{ServerName: h}
271 }
272
273 return setupConnParams(u, o)
274}
275
276func setupUnixConn(u *url.URL) (*Options, error) {
277 o := &Options{
278 Network: "unix",
279 }
280
281 if strings.TrimSpace(u.Path) == "" { // path is required with unix connection
282 return nil, errors.New("redis: empty unix socket path")
283 }
284 o.Addr = u.Path
285 o.Username, o.Password = getUserPassword(u)
286 return setupConnParams(u, o)
287}
288
289type queryOptions struct {
290 q url.Values
291 err error
292}
293
294func (o *queryOptions) string(name string) string {
295 vs := o.q[name]
296 if len(vs) == 0 {
297 return ""
298 }
299 delete(o.q, name) // enable detection of unknown parameters
300 return vs[len(vs)-1]
301}
302
303func (o *queryOptions) int(name string) int {
304 s := o.string(name)
305 if s == "" {
306 return 0
307 }
308 i, err := strconv.Atoi(s)
309 if err == nil {
310 return i
311 }
312 if o.err == nil {
313 o.err = fmt.Errorf("redis: invalid %s number: %s", name, err)
314 }
315 return 0
316}
317
318func (o *queryOptions) duration(name string) time.Duration {
319 s := o.string(name)
320 if s == "" {
321 return 0
322 }
323 // try plain number first
324 if i, err := strconv.Atoi(s); err == nil {
325 if i <= 0 {
326 // disable timeouts
327 return -1
328 }
329 return time.Duration(i) * time.Second
330 }
331 dur, err := time.ParseDuration(s)
332 if err == nil {
333 return dur
334 }
335 if o.err == nil {
336 o.err = fmt.Errorf("redis: invalid %s duration: %w", name, err)
337 }
338 return 0
339}
340
341func (o *queryOptions) bool(name string) bool {
342 switch s := o.string(name); s {
343 case "true", "1":
344 return true
345 case "false", "0", "":
346 return false
347 default:
348 if o.err == nil {
349 o.err = fmt.Errorf("redis: invalid %s boolean: expected true/false/1/0 or an empty string, got %q", name, s)
350 }
351 return false
352 }
353}
354
355func (o *queryOptions) remaining() []string {
356 if len(o.q) == 0 {
357 return nil
358 }
359 keys := make([]string, 0, len(o.q))
360 for k := range o.q {
361 keys = append(keys, k)
362 }
363 sort.Strings(keys)
364 return keys
365}
366
367// setupConnParams converts query parameters in u to option value in o.
368func setupConnParams(u *url.URL, o *Options) (*Options, error) {
369 q := queryOptions{q: u.Query()}
370
371 // compat: a future major release may use q.int("db")
372 if tmp := q.string("db"); tmp != "" {
373 db, err := strconv.Atoi(tmp)
374 if err != nil {
375 return nil, fmt.Errorf("redis: invalid database number: %w", err)
376 }
377 o.DB = db
378 }
379
380 o.MaxRetries = q.int("max_retries")
381 o.MinRetryBackoff = q.duration("min_retry_backoff")
382 o.MaxRetryBackoff = q.duration("max_retry_backoff")
383 o.DialTimeout = q.duration("dial_timeout")
384 o.ReadTimeout = q.duration("read_timeout")
385 o.WriteTimeout = q.duration("write_timeout")
386 o.PoolFIFO = q.bool("pool_fifo")
387 o.PoolSize = q.int("pool_size")
388 o.MinIdleConns = q.int("min_idle_conns")
389 o.MaxConnAge = q.duration("max_conn_age")
390 o.PoolTimeout = q.duration("pool_timeout")
391 o.IdleTimeout = q.duration("idle_timeout")
392 o.IdleCheckFrequency = q.duration("idle_check_frequency")
393 if q.err != nil {
394 return nil, q.err
395 }
396
397 // any parameters left?
398 if r := q.remaining(); len(r) > 0 {
399 return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(r, ", "))
400 }
401
402 return o, nil
403}
404
405func getUserPassword(u *url.URL) (string, string) {
406 var user, password string
407 if u.User != nil {
408 user = u.User.Username()
409 if p, ok := u.User.Password(); ok {
410 password = p
411 }
412 }
413 return user, password
414}
415
416func newConnPool(opt *Options) *pool.ConnPool {
417 return pool.NewConnPool(&pool.Options{
418 Dialer: func(ctx context.Context) (net.Conn, error) {
419 return opt.Dialer(ctx, opt.Network, opt.Addr)
420 },
421 PoolFIFO: opt.PoolFIFO,
422 PoolSize: opt.PoolSize,
423 MinIdleConns: opt.MinIdleConns,
424 MaxConnAge: opt.MaxConnAge,
425 PoolTimeout: opt.PoolTimeout,
426 IdleTimeout: opt.IdleTimeout,
427 IdleCheckFrequency: opt.IdleCheckFrequency,
428 })
429}