blob: c15b8a7719b5c51c545aac668fe97fee0b8bbb2b [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8 "errors"
9 "io"
10 "sync"
11)
12
13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15// underlying buffer is an interface. (io.Pipe is always unbuffered)
16type pipe struct {
17 mu sync.Mutex
18 c sync.Cond // c.L lazily initialized to &p.mu
19 b pipeBuffer // nil when done reading
Andrea Campanella764f1ed2022-03-24 11:46:38 +010020 unread int // bytes unread when done
kesavand2cde6582020-06-22 04:56:23 -040021 err error // read error once empty. non-nil means closed.
22 breakErr error // immediate read error (caller doesn't see rest of b)
23 donec chan struct{} // closed on error
24 readFn func() // optional code to run in Read before error
25}
26
27type pipeBuffer interface {
28 Len() int
29 io.Writer
30 io.Reader
31}
32
kesavandc71914f2022-03-25 11:19:03 +053033// setBuffer initializes the pipe buffer.
34// It has no effect if the pipe is already closed.
35func (p *pipe) setBuffer(b pipeBuffer) {
36 p.mu.Lock()
37 defer p.mu.Unlock()
38 if p.err != nil || p.breakErr != nil {
39 return
40 }
41 p.b = b
42}
43
kesavand2cde6582020-06-22 04:56:23 -040044func (p *pipe) Len() int {
45 p.mu.Lock()
46 defer p.mu.Unlock()
47 if p.b == nil {
Andrea Campanella764f1ed2022-03-24 11:46:38 +010048 return p.unread
kesavand2cde6582020-06-22 04:56:23 -040049 }
50 return p.b.Len()
51}
52
53// Read waits until data is available and copies bytes
54// from the buffer into p.
55func (p *pipe) Read(d []byte) (n int, err error) {
56 p.mu.Lock()
57 defer p.mu.Unlock()
58 if p.c.L == nil {
59 p.c.L = &p.mu
60 }
61 for {
62 if p.breakErr != nil {
63 return 0, p.breakErr
64 }
65 if p.b != nil && p.b.Len() > 0 {
66 return p.b.Read(d)
67 }
68 if p.err != nil {
69 if p.readFn != nil {
70 p.readFn() // e.g. copy trailers
71 p.readFn = nil // not sticky like p.err
72 }
73 p.b = nil
74 return 0, p.err
75 }
76 p.c.Wait()
77 }
78}
79
80var errClosedPipeWrite = errors.New("write on closed buffer")
81
82// Write copies bytes from p into the buffer and wakes a reader.
83// It is an error to write more data than the buffer can hold.
84func (p *pipe) Write(d []byte) (n int, err error) {
85 p.mu.Lock()
86 defer p.mu.Unlock()
87 if p.c.L == nil {
88 p.c.L = &p.mu
89 }
90 defer p.c.Signal()
91 if p.err != nil {
92 return 0, errClosedPipeWrite
93 }
94 if p.breakErr != nil {
Andrea Campanella764f1ed2022-03-24 11:46:38 +010095 p.unread += len(d)
kesavand2cde6582020-06-22 04:56:23 -040096 return len(d), nil // discard when there is no reader
97 }
98 return p.b.Write(d)
99}
100
101// CloseWithError causes the next Read (waking up a current blocked
102// Read if needed) to return the provided err after all data has been
103// read.
104//
105// The error must be non-nil.
106func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
107
108// BreakWithError causes the next Read (waking up a current blocked
109// Read if needed) to return the provided err immediately, without
110// waiting for unread data.
111func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
112
113// closeWithErrorAndCode is like CloseWithError but also sets some code to run
114// in the caller's goroutine before returning the error.
115func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
116
117func (p *pipe) closeWithError(dst *error, err error, fn func()) {
118 if err == nil {
119 panic("err must be non-nil")
120 }
121 p.mu.Lock()
122 defer p.mu.Unlock()
123 if p.c.L == nil {
124 p.c.L = &p.mu
125 }
126 defer p.c.Signal()
127 if *dst != nil {
128 // Already been done.
129 return
130 }
131 p.readFn = fn
132 if dst == &p.breakErr {
Andrea Campanella764f1ed2022-03-24 11:46:38 +0100133 if p.b != nil {
134 p.unread += p.b.Len()
135 }
kesavand2cde6582020-06-22 04:56:23 -0400136 p.b = nil
137 }
138 *dst = err
139 p.closeDoneLocked()
140}
141
142// requires p.mu be held.
143func (p *pipe) closeDoneLocked() {
144 if p.donec == nil {
145 return
146 }
147 // Close if unclosed. This isn't racy since we always
148 // hold p.mu while closing.
149 select {
150 case <-p.donec:
151 default:
152 close(p.donec)
153 }
154}
155
156// Err returns the error (if any) first set by BreakWithError or CloseWithError.
157func (p *pipe) Err() error {
158 p.mu.Lock()
159 defer p.mu.Unlock()
160 if p.breakErr != nil {
161 return p.breakErr
162 }
163 return p.err
164}
165
166// Done returns a channel which is closed if and when this pipe is closed
167// with CloseWithError.
168func (p *pipe) Done() <-chan struct{} {
169 p.mu.Lock()
170 defer p.mu.Unlock()
171 if p.donec == nil {
172 p.donec = make(chan struct{})
173 if p.err != nil || p.breakErr != nil {
174 // Already hit an error.
175 p.closeDoneLocked()
176 }
177 }
178 return p.donec
179}