blob: 31bab971e63fd2f15db681347574053262303b37 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301package redis
2
3import (
4 "context"
5 "sync"
6
7 "github.com/go-redis/redis/v8/internal/pool"
8)
9
10type pipelineExecer func(context.Context, []Cmder) error
11
12// Pipeliner is an mechanism to realise Redis Pipeline technique.
13//
14// Pipelining is a technique to extremely speed up processing by packing
15// operations to batches, send them at once to Redis and read a replies in a
16// singe step.
17// See https://redis.io/topics/pipelining
18//
19// Pay attention, that Pipeline is not a transaction, so you can get unexpected
20// results in case of big pipelines and small read/write timeouts.
21// Redis client has retransmission logic in case of timeouts, pipeline
22// can be retransmitted and commands can be executed more then once.
23// To avoid this: it is good idea to use reasonable bigger read/write timeouts
24// depends of your batch size and/or use TxPipeline.
25type Pipeliner interface {
26 StatefulCmdable
27 Len() int
28 Do(ctx context.Context, args ...interface{}) *Cmd
29 Process(ctx context.Context, cmd Cmder) error
30 Close() error
31 Discard() error
32 Exec(ctx context.Context) ([]Cmder, error)
33}
34
35var _ Pipeliner = (*Pipeline)(nil)
36
37// Pipeline implements pipelining as described in
38// http://redis.io/topics/pipelining. It's safe for concurrent use
39// by multiple goroutines.
40type Pipeline struct {
41 cmdable
42 statefulCmdable
43
44 ctx context.Context
45 exec pipelineExecer
46
47 mu sync.Mutex
48 cmds []Cmder
49 closed bool
50}
51
52func (c *Pipeline) init() {
53 c.cmdable = c.Process
54 c.statefulCmdable = c.Process
55}
56
57// Len returns the number of queued commands.
58func (c *Pipeline) Len() int {
59 c.mu.Lock()
60 ln := len(c.cmds)
61 c.mu.Unlock()
62 return ln
63}
64
65// Do queues the custom command for later execution.
66func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd {
67 cmd := NewCmd(ctx, args...)
68 _ = c.Process(ctx, cmd)
69 return cmd
70}
71
72// Process queues the cmd for later execution.
73func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
74 c.mu.Lock()
75 c.cmds = append(c.cmds, cmd)
76 c.mu.Unlock()
77 return nil
78}
79
80// Close closes the pipeline, releasing any open resources.
81func (c *Pipeline) Close() error {
82 c.mu.Lock()
83 _ = c.discard()
84 c.closed = true
85 c.mu.Unlock()
86 return nil
87}
88
89// Discard resets the pipeline and discards queued commands.
90func (c *Pipeline) Discard() error {
91 c.mu.Lock()
92 err := c.discard()
93 c.mu.Unlock()
94 return err
95}
96
97func (c *Pipeline) discard() error {
98 if c.closed {
99 return pool.ErrClosed
100 }
101 c.cmds = c.cmds[:0]
102 return nil
103}
104
105// Exec executes all previously queued commands using one
106// client-server roundtrip.
107//
108// Exec always returns list of commands and error of the first failed
109// command if any.
110func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) {
111 c.mu.Lock()
112 defer c.mu.Unlock()
113
114 if c.closed {
115 return nil, pool.ErrClosed
116 }
117
118 if len(c.cmds) == 0 {
119 return nil, nil
120 }
121
122 cmds := c.cmds
123 c.cmds = nil
124
125 return cmds, c.exec(ctx, cmds)
126}
127
128func (c *Pipeline) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
129 if err := fn(c); err != nil {
130 return nil, err
131 }
132 cmds, err := c.Exec(ctx)
133 _ = c.Close()
134 return cmds, err
135}
136
137func (c *Pipeline) Pipeline() Pipeliner {
138 return c
139}
140
141func (c *Pipeline) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
142 return c.Pipelined(ctx, fn)
143}
144
145func (c *Pipeline) TxPipeline() Pipeliner {
146 return c
147}