| package redis |
| |
| import ( |
| "context" |
| |
| "github.com/go-redis/redis/v8/internal/pool" |
| "github.com/go-redis/redis/v8/internal/proto" |
| ) |
| |
| // TxFailedErr transaction redis failed. |
| const TxFailedErr = proto.RedisError("redis: transaction failed") |
| |
| // Tx implements Redis transactions as described in |
| // http://redis.io/topics/transactions. It's NOT safe for concurrent use |
| // by multiple goroutines, because Exec resets list of watched keys. |
| // If you don't need WATCH it is better to use Pipeline. |
| type Tx struct { |
| baseClient |
| cmdable |
| statefulCmdable |
| hooks |
| ctx context.Context |
| } |
| |
| func (c *Client) newTx(ctx context.Context) *Tx { |
| tx := Tx{ |
| baseClient: baseClient{ |
| opt: c.opt, |
| connPool: pool.NewStickyConnPool(c.connPool), |
| }, |
| hooks: c.hooks.clone(), |
| ctx: ctx, |
| } |
| tx.init() |
| return &tx |
| } |
| |
| func (c *Tx) init() { |
| c.cmdable = c.Process |
| c.statefulCmdable = c.Process |
| } |
| |
| func (c *Tx) Context() context.Context { |
| return c.ctx |
| } |
| |
| func (c *Tx) WithContext(ctx context.Context) *Tx { |
| if ctx == nil { |
| panic("nil context") |
| } |
| clone := *c |
| clone.init() |
| clone.hooks.lock() |
| clone.ctx = ctx |
| return &clone |
| } |
| |
| func (c *Tx) Process(ctx context.Context, cmd Cmder) error { |
| return c.hooks.process(ctx, cmd, c.baseClient.process) |
| } |
| |
| // Watch prepares a transaction and marks the keys to be watched |
| // for conditional execution if there are any keys. |
| // |
| // The transaction is automatically closed when fn exits. |
| func (c *Client) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error { |
| tx := c.newTx(ctx) |
| if len(keys) > 0 { |
| if err := tx.Watch(ctx, keys...).Err(); err != nil { |
| _ = tx.Close(ctx) |
| return err |
| } |
| } |
| |
| err := fn(tx) |
| _ = tx.Close(ctx) |
| return err |
| } |
| |
| // Close closes the transaction, releasing any open resources. |
| func (c *Tx) Close(ctx context.Context) error { |
| _ = c.Unwatch(ctx).Err() |
| return c.baseClient.Close() |
| } |
| |
| // Watch marks the keys to be watched for conditional execution |
| // of a transaction. |
| func (c *Tx) Watch(ctx context.Context, keys ...string) *StatusCmd { |
| args := make([]interface{}, 1+len(keys)) |
| args[0] = "watch" |
| for i, key := range keys { |
| args[1+i] = key |
| } |
| cmd := NewStatusCmd(ctx, args...) |
| _ = c.Process(ctx, cmd) |
| return cmd |
| } |
| |
| // Unwatch flushes all the previously watched keys for a transaction. |
| func (c *Tx) Unwatch(ctx context.Context, keys ...string) *StatusCmd { |
| args := make([]interface{}, 1+len(keys)) |
| args[0] = "unwatch" |
| for i, key := range keys { |
| args[1+i] = key |
| } |
| cmd := NewStatusCmd(ctx, args...) |
| _ = c.Process(ctx, cmd) |
| return cmd |
| } |
| |
| // Pipeline creates a pipeline. Usually it is more convenient to use Pipelined. |
| func (c *Tx) Pipeline() Pipeliner { |
| pipe := Pipeline{ |
| ctx: c.ctx, |
| exec: func(ctx context.Context, cmds []Cmder) error { |
| return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline) |
| }, |
| } |
| pipe.init() |
| return &pipe |
| } |
| |
| // Pipelined executes commands queued in the fn outside of the transaction. |
| // Use TxPipelined if you need transactional behavior. |
| func (c *Tx) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { |
| return c.Pipeline().Pipelined(ctx, fn) |
| } |
| |
| // TxPipelined executes commands queued in the fn in the transaction. |
| // |
| // When using WATCH, EXEC will execute commands only if the watched keys |
| // were not modified, allowing for a check-and-set mechanism. |
| // |
| // Exec always returns list of commands. If transaction fails |
| // TxFailedErr is returned. Otherwise Exec returns an error of the first |
| // failed command or nil. |
| func (c *Tx) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { |
| return c.TxPipeline().Pipelined(ctx, fn) |
| } |
| |
| // TxPipeline creates a pipeline. Usually it is more convenient to use TxPipelined. |
| func (c *Tx) TxPipeline() Pipeliner { |
| pipe := Pipeline{ |
| ctx: c.ctx, |
| exec: func(ctx context.Context, cmds []Cmder) error { |
| return c.hooks.processTxPipeline(ctx, cmds, c.baseClient.processTxPipeline) |
| }, |
| } |
| pipe.init() |
| return &pipe |
| } |