blob: 08a20717006d97317dfd81b8d5370b314202381b [file] [log] [blame]
Joey Armstrong5f51f2e2023-01-17 17:06:26 -05001package pool
2
3import (
4 "bufio"
5 "context"
6 "net"
7 "sync/atomic"
8 "time"
9
10 "github.com/go-redis/redis/v8/internal"
11 "github.com/go-redis/redis/v8/internal/proto"
12 "go.opentelemetry.io/otel/api/trace"
13)
14
15var noDeadline = time.Time{}
16
17type Conn struct {
18 usedAt int64 // atomic
19 netConn net.Conn
20
21 rd *proto.Reader
22 bw *bufio.Writer
23 wr *proto.Writer
24
25 Inited bool
26 pooled bool
27 createdAt time.Time
28}
29
30func NewConn(netConn net.Conn) *Conn {
31 cn := &Conn{
32 netConn: netConn,
33 createdAt: time.Now(),
34 }
35 cn.rd = proto.NewReader(netConn)
36 cn.bw = bufio.NewWriter(netConn)
37 cn.wr = proto.NewWriter(cn.bw)
38 cn.SetUsedAt(time.Now())
39 return cn
40}
41
42func (cn *Conn) UsedAt() time.Time {
43 unix := atomic.LoadInt64(&cn.usedAt)
44 return time.Unix(unix, 0)
45}
46
47func (cn *Conn) SetUsedAt(tm time.Time) {
48 atomic.StoreInt64(&cn.usedAt, tm.Unix())
49}
50
51func (cn *Conn) SetNetConn(netConn net.Conn) {
52 cn.netConn = netConn
53 cn.rd.Reset(netConn)
54 cn.bw.Reset(netConn)
55}
56
57func (cn *Conn) Write(b []byte) (int, error) {
58 return cn.netConn.Write(b)
59}
60
61func (cn *Conn) RemoteAddr() net.Addr {
62 if cn.netConn != nil {
63 return cn.netConn.RemoteAddr()
64 }
65 return nil
66}
67
68func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
69 return internal.WithSpan(ctx, "redis.with_reader", func(ctx context.Context, span trace.Span) error {
70 if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
71 return internal.RecordError(ctx, err)
72 }
73 if err := fn(cn.rd); err != nil {
74 return internal.RecordError(ctx, err)
75 }
76 return nil
77 })
78}
79
80func (cn *Conn) WithWriter(
81 ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
82) error {
83 return internal.WithSpan(ctx, "redis.with_writer", func(ctx context.Context, span trace.Span) error {
84 if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil {
85 return internal.RecordError(ctx, err)
86 }
87
88 if cn.bw.Buffered() > 0 {
89 cn.bw.Reset(cn.netConn)
90 }
91
92 if err := fn(cn.wr); err != nil {
93 return internal.RecordError(ctx, err)
94 }
95
96 if err := cn.bw.Flush(); err != nil {
97 return internal.RecordError(ctx, err)
98 }
99
100 internal.WritesCounter.Add(ctx, 1)
101
102 return nil
103 })
104}
105
106func (cn *Conn) Close() error {
107 return cn.netConn.Close()
108}
109
110func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
111 tm := time.Now()
112 cn.SetUsedAt(tm)
113
114 if timeout > 0 {
115 tm = tm.Add(timeout)
116 }
117
118 if ctx != nil {
119 deadline, ok := ctx.Deadline()
120 if ok {
121 if timeout == 0 {
122 return deadline
123 }
124 if deadline.Before(tm) {
125 return deadline
126 }
127 return tm
128 }
129 }
130
131 if timeout > 0 {
132 return tm
133 }
134
135 return noDeadline
136}