blob: ad825c610caca8f9ac8198570e28a6dcbd80c75a [file] [log] [blame]
Joey Armstronga6af1522023-01-17 16:06:16 -05001package redis
2
3import (
4 "context"
5
6 "github.com/go-redis/redis/v8/internal/pool"
7 "github.com/go-redis/redis/v8/internal/proto"
8)
9
10// TxFailedErr transaction redis failed.
11const TxFailedErr = proto.RedisError("redis: transaction failed")
12
13// Tx implements Redis transactions as described in
14// http://redis.io/topics/transactions. It's NOT safe for concurrent use
15// by multiple goroutines, because Exec resets list of watched keys.
16// If you don't need WATCH it is better to use Pipeline.
17type Tx struct {
18 baseClient
19 cmdable
20 statefulCmdable
21 hooks
22 ctx context.Context
23}
24
25func (c *Client) newTx(ctx context.Context) *Tx {
26 tx := Tx{
27 baseClient: baseClient{
28 opt: c.opt,
29 connPool: pool.NewStickyConnPool(c.connPool),
30 },
31 hooks: c.hooks.clone(),
32 ctx: ctx,
33 }
34 tx.init()
35 return &tx
36}
37
38func (c *Tx) init() {
39 c.cmdable = c.Process
40 c.statefulCmdable = c.Process
41}
42
43func (c *Tx) Context() context.Context {
44 return c.ctx
45}
46
47func (c *Tx) WithContext(ctx context.Context) *Tx {
48 if ctx == nil {
49 panic("nil context")
50 }
51 clone := *c
52 clone.init()
53 clone.hooks.lock()
54 clone.ctx = ctx
55 return &clone
56}
57
58func (c *Tx) Process(ctx context.Context, cmd Cmder) error {
59 return c.hooks.process(ctx, cmd, c.baseClient.process)
60}
61
62// Watch prepares a transaction and marks the keys to be watched
63// for conditional execution if there are any keys.
64//
65// The transaction is automatically closed when fn exits.
66func (c *Client) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
67 tx := c.newTx(ctx)
68 if len(keys) > 0 {
69 if err := tx.Watch(ctx, keys...).Err(); err != nil {
70 _ = tx.Close(ctx)
71 return err
72 }
73 }
74
75 err := fn(tx)
76 _ = tx.Close(ctx)
77 return err
78}
79
80// Close closes the transaction, releasing any open resources.
81func (c *Tx) Close(ctx context.Context) error {
82 _ = c.Unwatch(ctx).Err()
83 return c.baseClient.Close()
84}
85
86// Watch marks the keys to be watched for conditional execution
87// of a transaction.
88func (c *Tx) Watch(ctx context.Context, keys ...string) *StatusCmd {
89 args := make([]interface{}, 1+len(keys))
90 args[0] = "watch"
91 for i, key := range keys {
92 args[1+i] = key
93 }
94 cmd := NewStatusCmd(ctx, args...)
95 _ = c.Process(ctx, cmd)
96 return cmd
97}
98
99// Unwatch flushes all the previously watched keys for a transaction.
100func (c *Tx) Unwatch(ctx context.Context, keys ...string) *StatusCmd {
101 args := make([]interface{}, 1+len(keys))
102 args[0] = "unwatch"
103 for i, key := range keys {
104 args[1+i] = key
105 }
106 cmd := NewStatusCmd(ctx, args...)
107 _ = c.Process(ctx, cmd)
108 return cmd
109}
110
111// Pipeline creates a pipeline. Usually it is more convenient to use Pipelined.
112func (c *Tx) Pipeline() Pipeliner {
113 pipe := Pipeline{
114 ctx: c.ctx,
115 exec: func(ctx context.Context, cmds []Cmder) error {
116 return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline)
117 },
118 }
119 pipe.init()
120 return &pipe
121}
122
123// Pipelined executes commands queued in the fn outside of the transaction.
124// Use TxPipelined if you need transactional behavior.
125func (c *Tx) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
126 return c.Pipeline().Pipelined(ctx, fn)
127}
128
129// TxPipelined executes commands queued in the fn in the transaction.
130//
131// When using WATCH, EXEC will execute commands only if the watched keys
132// were not modified, allowing for a check-and-set mechanism.
133//
134// Exec always returns list of commands. If transaction fails
135// TxFailedErr is returned. Otherwise Exec returns an error of the first
136// failed command or nil.
137func (c *Tx) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
138 return c.TxPipeline().Pipelined(ctx, fn)
139}
140
141// TxPipeline creates a pipeline. Usually it is more convenient to use TxPipelined.
142func (c *Tx) TxPipeline() Pipeliner {
143 pipe := Pipeline{
144 ctx: c.ctx,
145 exec: func(ctx context.Context, cmds []Cmder) error {
146 return c.hooks.processTxPipeline(ctx, cmds, c.baseClient.processTxPipeline)
147 },
148 }
149 pipe.init()
150 return &pipe
151}