blob: 684d984fd96affc31c04a21493204e52c1b0f583 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8 "errors"
9 "io"
10 "sync"
11)
12
13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15// underlying buffer is an interface. (io.Pipe is always unbuffered)
16type pipe struct {
17 mu sync.Mutex
18 c sync.Cond // c.L lazily initialized to &p.mu
19 b pipeBuffer // nil when done reading
khenaidoo106c61a2021-08-11 18:05:46 -040020 unread int // bytes unread when done
William Kurkianea869482019-04-09 15:16:11 -040021 err error // read error once empty. non-nil means closed.
22 breakErr error // immediate read error (caller doesn't see rest of b)
23 donec chan struct{} // closed on error
24 readFn func() // optional code to run in Read before error
25}
26
27type pipeBuffer interface {
28 Len() int
29 io.Writer
30 io.Reader
31}
32
Akash Reddy Kankanalac6b6ca12025-06-12 14:26:57 +053033// setBuffer initializes the pipe buffer.
34// It has no effect if the pipe is already closed.
35func (p *pipe) setBuffer(b pipeBuffer) {
36 p.mu.Lock()
37 defer p.mu.Unlock()
38 if p.err != nil || p.breakErr != nil {
39 return
40 }
41 p.b = b
42}
43
William Kurkianea869482019-04-09 15:16:11 -040044func (p *pipe) Len() int {
45 p.mu.Lock()
46 defer p.mu.Unlock()
47 if p.b == nil {
khenaidoo106c61a2021-08-11 18:05:46 -040048 return p.unread
William Kurkianea869482019-04-09 15:16:11 -040049 }
50 return p.b.Len()
51}
52
53// Read waits until data is available and copies bytes
54// from the buffer into p.
55func (p *pipe) Read(d []byte) (n int, err error) {
56 p.mu.Lock()
57 defer p.mu.Unlock()
58 if p.c.L == nil {
59 p.c.L = &p.mu
60 }
61 for {
62 if p.breakErr != nil {
63 return 0, p.breakErr
64 }
65 if p.b != nil && p.b.Len() > 0 {
66 return p.b.Read(d)
67 }
68 if p.err != nil {
69 if p.readFn != nil {
70 p.readFn() // e.g. copy trailers
71 p.readFn = nil // not sticky like p.err
72 }
73 p.b = nil
74 return 0, p.err
75 }
76 p.c.Wait()
77 }
78}
79
80var errClosedPipeWrite = errors.New("write on closed buffer")
81
82// Write copies bytes from p into the buffer and wakes a reader.
83// It is an error to write more data than the buffer can hold.
84func (p *pipe) Write(d []byte) (n int, err error) {
85 p.mu.Lock()
86 defer p.mu.Unlock()
87 if p.c.L == nil {
88 p.c.L = &p.mu
89 }
90 defer p.c.Signal()
Akash Reddy Kankanalac6b6ca12025-06-12 14:26:57 +053091 if p.err != nil || p.breakErr != nil {
William Kurkianea869482019-04-09 15:16:11 -040092 return 0, errClosedPipeWrite
93 }
William Kurkianea869482019-04-09 15:16:11 -040094 return p.b.Write(d)
95}
96
97// CloseWithError causes the next Read (waking up a current blocked
98// Read if needed) to return the provided err after all data has been
99// read.
100//
101// The error must be non-nil.
102func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
103
104// BreakWithError causes the next Read (waking up a current blocked
105// Read if needed) to return the provided err immediately, without
106// waiting for unread data.
107func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
108
109// closeWithErrorAndCode is like CloseWithError but also sets some code to run
110// in the caller's goroutine before returning the error.
111func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
112
113func (p *pipe) closeWithError(dst *error, err error, fn func()) {
114 if err == nil {
115 panic("err must be non-nil")
116 }
117 p.mu.Lock()
118 defer p.mu.Unlock()
119 if p.c.L == nil {
120 p.c.L = &p.mu
121 }
122 defer p.c.Signal()
123 if *dst != nil {
124 // Already been done.
125 return
126 }
127 p.readFn = fn
128 if dst == &p.breakErr {
khenaidoo106c61a2021-08-11 18:05:46 -0400129 if p.b != nil {
130 p.unread += p.b.Len()
131 }
William Kurkianea869482019-04-09 15:16:11 -0400132 p.b = nil
133 }
134 *dst = err
135 p.closeDoneLocked()
136}
137
138// requires p.mu be held.
139func (p *pipe) closeDoneLocked() {
140 if p.donec == nil {
141 return
142 }
143 // Close if unclosed. This isn't racy since we always
144 // hold p.mu while closing.
145 select {
146 case <-p.donec:
147 default:
148 close(p.donec)
149 }
150}
151
152// Err returns the error (if any) first set by BreakWithError or CloseWithError.
153func (p *pipe) Err() error {
154 p.mu.Lock()
155 defer p.mu.Unlock()
156 if p.breakErr != nil {
157 return p.breakErr
158 }
159 return p.err
160}
161
162// Done returns a channel which is closed if and when this pipe is closed
163// with CloseWithError.
164func (p *pipe) Done() <-chan struct{} {
165 p.mu.Lock()
166 defer p.mu.Unlock()
167 if p.donec == nil {
168 p.donec = make(chan struct{})
169 if p.err != nil || p.breakErr != nil {
170 // Already hit an error.
171 p.closeDoneLocked()
172 }
173 }
174 return p.donec
175}