blob: 5661659865123d9cc64cdb1b72c9835181478a5d [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301package pool
2
3import (
4 "bufio"
5 "context"
6 "net"
7 "sync/atomic"
8 "time"
9
10 "github.com/go-redis/redis/v8/internal/proto"
11)
12
13var noDeadline = time.Time{}
14
15type Conn struct {
16 usedAt int64 // atomic
17 netConn net.Conn
18
19 rd *proto.Reader
20 bw *bufio.Writer
21 wr *proto.Writer
22
23 Inited bool
24 pooled bool
25 createdAt time.Time
26}
27
28func NewConn(netConn net.Conn) *Conn {
29 cn := &Conn{
30 netConn: netConn,
31 createdAt: time.Now(),
32 }
33 cn.rd = proto.NewReader(netConn)
34 cn.bw = bufio.NewWriter(netConn)
35 cn.wr = proto.NewWriter(cn.bw)
36 cn.SetUsedAt(time.Now())
37 return cn
38}
39
40func (cn *Conn) UsedAt() time.Time {
41 unix := atomic.LoadInt64(&cn.usedAt)
42 return time.Unix(unix, 0)
43}
44
45func (cn *Conn) SetUsedAt(tm time.Time) {
46 atomic.StoreInt64(&cn.usedAt, tm.Unix())
47}
48
49func (cn *Conn) SetNetConn(netConn net.Conn) {
50 cn.netConn = netConn
51 cn.rd.Reset(netConn)
52 cn.bw.Reset(netConn)
53}
54
55func (cn *Conn) Write(b []byte) (int, error) {
56 return cn.netConn.Write(b)
57}
58
59func (cn *Conn) RemoteAddr() net.Addr {
60 if cn.netConn != nil {
61 return cn.netConn.RemoteAddr()
62 }
63 return nil
64}
65
66func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
67 if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
68 return err
69 }
70 return fn(cn.rd)
71}
72
73func (cn *Conn) WithWriter(
74 ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
75) error {
76 if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil {
77 return err
78 }
79
80 if cn.bw.Buffered() > 0 {
81 cn.bw.Reset(cn.netConn)
82 }
83
84 if err := fn(cn.wr); err != nil {
85 return err
86 }
87
88 return cn.bw.Flush()
89}
90
91func (cn *Conn) Close() error {
92 return cn.netConn.Close()
93}
94
95func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
96 tm := time.Now()
97 cn.SetUsedAt(tm)
98
99 if timeout > 0 {
100 tm = tm.Add(timeout)
101 }
102
103 if ctx != nil {
104 deadline, ok := ctx.Deadline()
105 if ok {
106 if timeout == 0 {
107 return deadline
108 }
109 if deadline.Before(tm) {
110 return deadline
111 }
112 return tm
113 }
114 }
115
116 if timeout > 0 {
117 return tm
118 }
119
120 return noDeadline
121}