blob: 8c9d87202aed7d2d7b2aa2efa0273c6f07258948 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301package redis
2
3import (
4 "context"
5
6 "github.com/go-redis/redis/v8/internal/pool"
7 "github.com/go-redis/redis/v8/internal/proto"
8)
9
10// TxFailedErr transaction redis failed.
11const TxFailedErr = proto.RedisError("redis: transaction failed")
12
13// Tx implements Redis transactions as described in
14// http://redis.io/topics/transactions. It's NOT safe for concurrent use
15// by multiple goroutines, because Exec resets list of watched keys.
16//
17// If you don't need WATCH, use Pipeline instead.
18type Tx struct {
19 baseClient
20 cmdable
21 statefulCmdable
22 hooks
23 ctx context.Context
24}
25
26func (c *Client) newTx(ctx context.Context) *Tx {
27 tx := Tx{
28 baseClient: baseClient{
29 opt: c.opt,
30 connPool: pool.NewStickyConnPool(c.connPool),
31 },
32 hooks: c.hooks.clone(),
33 ctx: ctx,
34 }
35 tx.init()
36 return &tx
37}
38
39func (c *Tx) init() {
40 c.cmdable = c.Process
41 c.statefulCmdable = c.Process
42}
43
44func (c *Tx) Context() context.Context {
45 return c.ctx
46}
47
48func (c *Tx) WithContext(ctx context.Context) *Tx {
49 if ctx == nil {
50 panic("nil context")
51 }
52 clone := *c
53 clone.init()
54 clone.hooks.lock()
55 clone.ctx = ctx
56 return &clone
57}
58
59func (c *Tx) Process(ctx context.Context, cmd Cmder) error {
60 return c.hooks.process(ctx, cmd, c.baseClient.process)
61}
62
63// Watch prepares a transaction and marks the keys to be watched
64// for conditional execution if there are any keys.
65//
66// The transaction is automatically closed when fn exits.
67func (c *Client) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
68 tx := c.newTx(ctx)
69 defer tx.Close(ctx)
70 if len(keys) > 0 {
71 if err := tx.Watch(ctx, keys...).Err(); err != nil {
72 return err
73 }
74 }
75 return fn(tx)
76}
77
78// Close closes the transaction, releasing any open resources.
79func (c *Tx) Close(ctx context.Context) error {
80 _ = c.Unwatch(ctx).Err()
81 return c.baseClient.Close()
82}
83
84// Watch marks the keys to be watched for conditional execution
85// of a transaction.
86func (c *Tx) Watch(ctx context.Context, keys ...string) *StatusCmd {
87 args := make([]interface{}, 1+len(keys))
88 args[0] = "watch"
89 for i, key := range keys {
90 args[1+i] = key
91 }
92 cmd := NewStatusCmd(ctx, args...)
93 _ = c.Process(ctx, cmd)
94 return cmd
95}
96
97// Unwatch flushes all the previously watched keys for a transaction.
98func (c *Tx) Unwatch(ctx context.Context, keys ...string) *StatusCmd {
99 args := make([]interface{}, 1+len(keys))
100 args[0] = "unwatch"
101 for i, key := range keys {
102 args[1+i] = key
103 }
104 cmd := NewStatusCmd(ctx, args...)
105 _ = c.Process(ctx, cmd)
106 return cmd
107}
108
109// Pipeline creates a pipeline. Usually it is more convenient to use Pipelined.
110func (c *Tx) Pipeline() Pipeliner {
111 pipe := Pipeline{
112 ctx: c.ctx,
113 exec: func(ctx context.Context, cmds []Cmder) error {
114 return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline)
115 },
116 }
117 pipe.init()
118 return &pipe
119}
120
121// Pipelined executes commands queued in the fn outside of the transaction.
122// Use TxPipelined if you need transactional behavior.
123func (c *Tx) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
124 return c.Pipeline().Pipelined(ctx, fn)
125}
126
127// TxPipelined executes commands queued in the fn in the transaction.
128//
129// When using WATCH, EXEC will execute commands only if the watched keys
130// were not modified, allowing for a check-and-set mechanism.
131//
132// Exec always returns list of commands. If transaction fails
133// TxFailedErr is returned. Otherwise Exec returns an error of the first
134// failed command or nil.
135func (c *Tx) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
136 return c.TxPipeline().Pipelined(ctx, fn)
137}
138
139// TxPipeline creates a pipeline. Usually it is more convenient to use TxPipelined.
140func (c *Tx) TxPipeline() Pipeliner {
141 pipe := Pipeline{
142 ctx: c.ctx,
143 exec: func(ctx context.Context, cmds []Cmder) error {
144 return c.hooks.processTxPipeline(ctx, cmds, c.baseClient.processTxPipeline)
145 },
146 }
147 pipe.init()
148 return &pipe
149}