blob: 3b9f06b96244cc9b218e228e41d03bad5f071ddc [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8 "errors"
9 "io"
10 "sync"
11)
12
13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15// underlying buffer is an interface. (io.Pipe is always unbuffered)
16type pipe struct {
17 mu sync.Mutex
18 c sync.Cond // c.L lazily initialized to &p.mu
19 b pipeBuffer // nil when done reading
20 unread int // bytes unread when done
21 err error // read error once empty. non-nil means closed.
22 breakErr error // immediate read error (caller doesn't see rest of b)
23 donec chan struct{} // closed on error
24 readFn func() // optional code to run in Read before error
25}
26
27type pipeBuffer interface {
28 Len() int
29 io.Writer
30 io.Reader
31}
32
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +053033// setBuffer initializes the pipe buffer.
34// It has no effect if the pipe is already closed.
35func (p *pipe) setBuffer(b pipeBuffer) {
36 p.mu.Lock()
37 defer p.mu.Unlock()
38 if p.err != nil || p.breakErr != nil {
39 return
40 }
41 p.b = b
42}
43
Holger Hildebrandtfa074992020-03-27 15:42:06 +000044func (p *pipe) Len() int {
45 p.mu.Lock()
46 defer p.mu.Unlock()
47 if p.b == nil {
48 return p.unread
49 }
50 return p.b.Len()
51}
52
53// Read waits until data is available and copies bytes
54// from the buffer into p.
55func (p *pipe) Read(d []byte) (n int, err error) {
56 p.mu.Lock()
57 defer p.mu.Unlock()
58 if p.c.L == nil {
59 p.c.L = &p.mu
60 }
61 for {
62 if p.breakErr != nil {
63 return 0, p.breakErr
64 }
65 if p.b != nil && p.b.Len() > 0 {
66 return p.b.Read(d)
67 }
68 if p.err != nil {
69 if p.readFn != nil {
70 p.readFn() // e.g. copy trailers
71 p.readFn = nil // not sticky like p.err
72 }
73 p.b = nil
74 return 0, p.err
75 }
76 p.c.Wait()
77 }
78}
79
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +053080var (
81 errClosedPipeWrite = errors.New("write on closed buffer")
82 errUninitializedPipeWrite = errors.New("write on uninitialized buffer")
83)
Holger Hildebrandtfa074992020-03-27 15:42:06 +000084
85// Write copies bytes from p into the buffer and wakes a reader.
86// It is an error to write more data than the buffer can hold.
87func (p *pipe) Write(d []byte) (n int, err error) {
88 p.mu.Lock()
89 defer p.mu.Unlock()
90 if p.c.L == nil {
91 p.c.L = &p.mu
92 }
93 defer p.c.Signal()
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +053094 if p.err != nil || p.breakErr != nil {
Holger Hildebrandtfa074992020-03-27 15:42:06 +000095 return 0, errClosedPipeWrite
96 }
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +053097 // pipe.setBuffer is never invoked, leaving the buffer uninitialized.
98 // We shouldn't try to write to an uninitialized pipe,
99 // but returning an error is better than panicking.
100 if p.b == nil {
101 return 0, errUninitializedPipeWrite
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000102 }
103 return p.b.Write(d)
104}
105
106// CloseWithError causes the next Read (waking up a current blocked
107// Read if needed) to return the provided err after all data has been
108// read.
109//
110// The error must be non-nil.
111func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
112
113// BreakWithError causes the next Read (waking up a current blocked
114// Read if needed) to return the provided err immediately, without
115// waiting for unread data.
116func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
117
118// closeWithErrorAndCode is like CloseWithError but also sets some code to run
119// in the caller's goroutine before returning the error.
120func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
121
122func (p *pipe) closeWithError(dst *error, err error, fn func()) {
123 if err == nil {
124 panic("err must be non-nil")
125 }
126 p.mu.Lock()
127 defer p.mu.Unlock()
128 if p.c.L == nil {
129 p.c.L = &p.mu
130 }
131 defer p.c.Signal()
132 if *dst != nil {
133 // Already been done.
134 return
135 }
136 p.readFn = fn
137 if dst == &p.breakErr {
138 if p.b != nil {
139 p.unread += p.b.Len()
140 }
141 p.b = nil
142 }
143 *dst = err
144 p.closeDoneLocked()
145}
146
147// requires p.mu be held.
148func (p *pipe) closeDoneLocked() {
149 if p.donec == nil {
150 return
151 }
152 // Close if unclosed. This isn't racy since we always
153 // hold p.mu while closing.
154 select {
155 case <-p.donec:
156 default:
157 close(p.donec)
158 }
159}
160
161// Err returns the error (if any) first set by BreakWithError or CloseWithError.
162func (p *pipe) Err() error {
163 p.mu.Lock()
164 defer p.mu.Unlock()
165 if p.breakErr != nil {
166 return p.breakErr
167 }
168 return p.err
169}
170
171// Done returns a channel which is closed if and when this pipe is closed
172// with CloseWithError.
173func (p *pipe) Done() <-chan struct{} {
174 p.mu.Lock()
175 defer p.mu.Unlock()
176 if p.donec == nil {
177 p.donec = make(chan struct{})
178 if p.err != nil || p.breakErr != nil {
179 // Already hit an error.
180 p.closeDoneLocked()
181 }
182 }
183 return p.donec
184}