blob: 2a5399ec4a0453f64bbddb359de0ab6f5f3c6f03 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8 "errors"
9 "io"
10 "sync"
11)
12
13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15// underlying buffer is an interface. (io.Pipe is always unbuffered)
16type pipe struct {
17 mu sync.Mutex
18 c sync.Cond // c.L lazily initialized to &p.mu
19 b pipeBuffer // nil when done reading
khenaidoo26721882021-08-11 17:42:52 -040020 unread int // bytes unread when done
Scott Baker2c1c4822019-10-16 11:02:41 -070021 err error // read error once empty. non-nil means closed.
22 breakErr error // immediate read error (caller doesn't see rest of b)
23 donec chan struct{} // closed on error
24 readFn func() // optional code to run in Read before error
25}
26
27type pipeBuffer interface {
28 Len() int
29 io.Writer
30 io.Reader
31}
32
33func (p *pipe) Len() int {
34 p.mu.Lock()
35 defer p.mu.Unlock()
36 if p.b == nil {
khenaidoo26721882021-08-11 17:42:52 -040037 return p.unread
Scott Baker2c1c4822019-10-16 11:02:41 -070038 }
39 return p.b.Len()
40}
41
42// Read waits until data is available and copies bytes
43// from the buffer into p.
44func (p *pipe) Read(d []byte) (n int, err error) {
45 p.mu.Lock()
46 defer p.mu.Unlock()
47 if p.c.L == nil {
48 p.c.L = &p.mu
49 }
50 for {
51 if p.breakErr != nil {
52 return 0, p.breakErr
53 }
54 if p.b != nil && p.b.Len() > 0 {
55 return p.b.Read(d)
56 }
57 if p.err != nil {
58 if p.readFn != nil {
59 p.readFn() // e.g. copy trailers
60 p.readFn = nil // not sticky like p.err
61 }
62 p.b = nil
63 return 0, p.err
64 }
65 p.c.Wait()
66 }
67}
68
69var errClosedPipeWrite = errors.New("write on closed buffer")
70
71// Write copies bytes from p into the buffer and wakes a reader.
72// It is an error to write more data than the buffer can hold.
73func (p *pipe) Write(d []byte) (n int, err error) {
74 p.mu.Lock()
75 defer p.mu.Unlock()
76 if p.c.L == nil {
77 p.c.L = &p.mu
78 }
79 defer p.c.Signal()
80 if p.err != nil {
81 return 0, errClosedPipeWrite
82 }
83 if p.breakErr != nil {
khenaidoo26721882021-08-11 17:42:52 -040084 p.unread += len(d)
Scott Baker2c1c4822019-10-16 11:02:41 -070085 return len(d), nil // discard when there is no reader
86 }
87 return p.b.Write(d)
88}
89
90// CloseWithError causes the next Read (waking up a current blocked
91// Read if needed) to return the provided err after all data has been
92// read.
93//
94// The error must be non-nil.
95func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
96
97// BreakWithError causes the next Read (waking up a current blocked
98// Read if needed) to return the provided err immediately, without
99// waiting for unread data.
100func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
101
102// closeWithErrorAndCode is like CloseWithError but also sets some code to run
103// in the caller's goroutine before returning the error.
104func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
105
106func (p *pipe) closeWithError(dst *error, err error, fn func()) {
107 if err == nil {
108 panic("err must be non-nil")
109 }
110 p.mu.Lock()
111 defer p.mu.Unlock()
112 if p.c.L == nil {
113 p.c.L = &p.mu
114 }
115 defer p.c.Signal()
116 if *dst != nil {
117 // Already been done.
118 return
119 }
120 p.readFn = fn
121 if dst == &p.breakErr {
khenaidoo26721882021-08-11 17:42:52 -0400122 if p.b != nil {
123 p.unread += p.b.Len()
124 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700125 p.b = nil
126 }
127 *dst = err
128 p.closeDoneLocked()
129}
130
131// requires p.mu be held.
132func (p *pipe) closeDoneLocked() {
133 if p.donec == nil {
134 return
135 }
136 // Close if unclosed. This isn't racy since we always
137 // hold p.mu while closing.
138 select {
139 case <-p.donec:
140 default:
141 close(p.donec)
142 }
143}
144
145// Err returns the error (if any) first set by BreakWithError or CloseWithError.
146func (p *pipe) Err() error {
147 p.mu.Lock()
148 defer p.mu.Unlock()
149 if p.breakErr != nil {
150 return p.breakErr
151 }
152 return p.err
153}
154
155// Done returns a channel which is closed if and when this pipe is closed
156// with CloseWithError.
157func (p *pipe) Done() <-chan struct{} {
158 p.mu.Lock()
159 defer p.mu.Unlock()
160 if p.donec == nil {
161 p.donec = make(chan struct{})
162 if p.err != nil || p.breakErr != nil {
163 // Already hit an error.
164 p.closeDoneLocked()
165 }
166 }
167 return p.donec
168}