blob: a6140099cb37c0a8a0cef7bddf5aea7dce7d6c87 [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8 "errors"
9 "io"
10 "sync"
11)
12
13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15// underlying buffer is an interface. (io.Pipe is always unbuffered)
16type pipe struct {
17 mu sync.Mutex
18 c sync.Cond // c.L lazily initialized to &p.mu
19 b pipeBuffer // nil when done reading
20 err error // read error once empty. non-nil means closed.
21 breakErr error // immediate read error (caller doesn't see rest of b)
22 donec chan struct{} // closed on error
23 readFn func() // optional code to run in Read before error
24}
25
26type pipeBuffer interface {
27 Len() int
28 io.Writer
29 io.Reader
30}
31
32func (p *pipe) Len() int {
33 p.mu.Lock()
34 defer p.mu.Unlock()
35 if p.b == nil {
36 return 0
37 }
38 return p.b.Len()
39}
40
41// Read waits until data is available and copies bytes
42// from the buffer into p.
43func (p *pipe) Read(d []byte) (n int, err error) {
44 p.mu.Lock()
45 defer p.mu.Unlock()
46 if p.c.L == nil {
47 p.c.L = &p.mu
48 }
49 for {
50 if p.breakErr != nil {
51 return 0, p.breakErr
52 }
53 if p.b != nil && p.b.Len() > 0 {
54 return p.b.Read(d)
55 }
56 if p.err != nil {
57 if p.readFn != nil {
58 p.readFn() // e.g. copy trailers
59 p.readFn = nil // not sticky like p.err
60 }
61 p.b = nil
62 return 0, p.err
63 }
64 p.c.Wait()
65 }
66}
67
68var errClosedPipeWrite = errors.New("write on closed buffer")
69
70// Write copies bytes from p into the buffer and wakes a reader.
71// It is an error to write more data than the buffer can hold.
72func (p *pipe) Write(d []byte) (n int, err error) {
73 p.mu.Lock()
74 defer p.mu.Unlock()
75 if p.c.L == nil {
76 p.c.L = &p.mu
77 }
78 defer p.c.Signal()
79 if p.err != nil {
80 return 0, errClosedPipeWrite
81 }
82 if p.breakErr != nil {
83 return len(d), nil // discard when there is no reader
84 }
85 return p.b.Write(d)
86}
87
88// CloseWithError causes the next Read (waking up a current blocked
89// Read if needed) to return the provided err after all data has been
90// read.
91//
92// The error must be non-nil.
93func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
94
95// BreakWithError causes the next Read (waking up a current blocked
96// Read if needed) to return the provided err immediately, without
97// waiting for unread data.
98func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
99
100// closeWithErrorAndCode is like CloseWithError but also sets some code to run
101// in the caller's goroutine before returning the error.
102func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
103
104func (p *pipe) closeWithError(dst *error, err error, fn func()) {
105 if err == nil {
106 panic("err must be non-nil")
107 }
108 p.mu.Lock()
109 defer p.mu.Unlock()
110 if p.c.L == nil {
111 p.c.L = &p.mu
112 }
113 defer p.c.Signal()
114 if *dst != nil {
115 // Already been done.
116 return
117 }
118 p.readFn = fn
119 if dst == &p.breakErr {
120 p.b = nil
121 }
122 *dst = err
123 p.closeDoneLocked()
124}
125
126// requires p.mu be held.
127func (p *pipe) closeDoneLocked() {
128 if p.donec == nil {
129 return
130 }
131 // Close if unclosed. This isn't racy since we always
132 // hold p.mu while closing.
133 select {
134 case <-p.donec:
135 default:
136 close(p.donec)
137 }
138}
139
140// Err returns the error (if any) first set by BreakWithError or CloseWithError.
141func (p *pipe) Err() error {
142 p.mu.Lock()
143 defer p.mu.Unlock()
144 if p.breakErr != nil {
145 return p.breakErr
146 }
147 return p.err
148}
149
150// Done returns a channel which is closed if and when this pipe is closed
151// with CloseWithError.
152func (p *pipe) Done() <-chan struct{} {
153 p.mu.Lock()
154 defer p.mu.Unlock()
155 if p.donec == nil {
156 p.donec = make(chan struct{})
157 if p.err != nil || p.breakErr != nil {
158 // Already hit an error.
159 p.closeDoneLocked()
160 }
161 }
162 return p.donec
163}