blob: c6ec3409980f4524431e23b69511fae723bdf098 [file] [log] [blame]
Joey Armstronge8c091f2023-01-17 16:56:26 -05001package redis
2
3import (
4 "context"
5 "sync"
6
7 "github.com/go-redis/redis/v8/internal/pool"
8)
9
10type pipelineExecer func(context.Context, []Cmder) error
11
12// Pipeliner is an mechanism to realise Redis Pipeline technique.
13//
14// Pipelining is a technique to extremely speed up processing by packing
15// operations to batches, send them at once to Redis and read a replies in a
16// singe step.
17// See https://redis.io/topics/pipelining
18//
19// Pay attention, that Pipeline is not a transaction, so you can get unexpected
20// results in case of big pipelines and small read/write timeouts.
21// Redis client has retransmission logic in case of timeouts, pipeline
22// can be retransmitted and commands can be executed more then once.
23// To avoid this: it is good idea to use reasonable bigger read/write timeouts
24// depends of your batch size and/or use TxPipeline.
25type Pipeliner interface {
26 StatefulCmdable
27 Do(ctx context.Context, args ...interface{}) *Cmd
28 Process(ctx context.Context, cmd Cmder) error
29 Close() error
30 Discard() error
31 Exec(ctx context.Context) ([]Cmder, error)
32}
33
34var _ Pipeliner = (*Pipeline)(nil)
35
36// Pipeline implements pipelining as described in
37// http://redis.io/topics/pipelining. It's safe for concurrent use
38// by multiple goroutines.
39type Pipeline struct {
40 cmdable
41 statefulCmdable
42
43 ctx context.Context
44 exec pipelineExecer
45
46 mu sync.Mutex
47 cmds []Cmder
48 closed bool
49}
50
51func (c *Pipeline) init() {
52 c.cmdable = c.Process
53 c.statefulCmdable = c.Process
54}
55
56func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd {
57 cmd := NewCmd(ctx, args...)
58 _ = c.Process(ctx, cmd)
59 return cmd
60}
61
62// Process queues the cmd for later execution.
63func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
64 c.mu.Lock()
65 c.cmds = append(c.cmds, cmd)
66 c.mu.Unlock()
67 return nil
68}
69
70// Close closes the pipeline, releasing any open resources.
71func (c *Pipeline) Close() error {
72 c.mu.Lock()
73 _ = c.discard()
74 c.closed = true
75 c.mu.Unlock()
76 return nil
77}
78
79// Discard resets the pipeline and discards queued commands.
80func (c *Pipeline) Discard() error {
81 c.mu.Lock()
82 err := c.discard()
83 c.mu.Unlock()
84 return err
85}
86
87func (c *Pipeline) discard() error {
88 if c.closed {
89 return pool.ErrClosed
90 }
91 c.cmds = c.cmds[:0]
92 return nil
93}
94
95// Exec executes all previously queued commands using one
96// client-server roundtrip.
97//
98// Exec always returns list of commands and error of the first failed
99// command if any.
100func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) {
101 c.mu.Lock()
102 defer c.mu.Unlock()
103
104 if c.closed {
105 return nil, pool.ErrClosed
106 }
107
108 if len(c.cmds) == 0 {
109 return nil, nil
110 }
111
112 cmds := c.cmds
113 c.cmds = nil
114
115 return cmds, c.exec(ctx, cmds)
116}
117
118func (c *Pipeline) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
119 if err := fn(c); err != nil {
120 return nil, err
121 }
122 cmds, err := c.Exec(ctx)
123 _ = c.Close()
124 return cmds, err
125}
126
127func (c *Pipeline) Pipeline() Pipeliner {
128 return c
129}
130
131func (c *Pipeline) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
132 return c.Pipelined(ctx, fn)
133}
134
135func (c *Pipeline) TxPipeline() Pipeliner {
136 return c
137}