blob: f24d2b1e7d4ae49858a37aee8cca957993875ba5 [file] [log] [blame]
Don Newton98fd8812019-09-23 15:15:02 -04001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import "fmt"
8
9// WriteScheduler is the interface implemented by HTTP/2 write schedulers.
10// Methods are never called concurrently.
11type WriteScheduler interface {
12 // OpenStream opens a new stream in the write scheduler.
13 // It is illegal to call this with streamID=0 or with a streamID that is
14 // already open -- the call may panic.
15 OpenStream(streamID uint32, options OpenStreamOptions)
16
17 // CloseStream closes a stream in the write scheduler. Any frames queued on
18 // this stream should be discarded. It is illegal to call this on a stream
19 // that is not open -- the call may panic.
20 CloseStream(streamID uint32)
21
22 // AdjustStream adjusts the priority of the given stream. This may be called
23 // on a stream that has not yet been opened or has been closed. Note that
24 // RFC 7540 allows PRIORITY frames to be sent on streams in any state. See:
25 // https://tools.ietf.org/html/rfc7540#section-5.1
26 AdjustStream(streamID uint32, priority PriorityParam)
27
28 // Push queues a frame in the scheduler. In most cases, this will not be
29 // called with wr.StreamID()!=0 unless that stream is currently open. The one
30 // exception is RST_STREAM frames, which may be sent on idle or closed streams.
31 Push(wr FrameWriteRequest)
32
33 // Pop dequeues the next frame to write. Returns false if no frames can
34 // be written. Frames with a given wr.StreamID() are Pop'd in the same
35 // order they are Push'd. No frames should be discarded except by CloseStream.
36 Pop() (wr FrameWriteRequest, ok bool)
37}
38
39// OpenStreamOptions specifies extra options for WriteScheduler.OpenStream.
40type OpenStreamOptions struct {
41 // PusherID is zero if the stream was initiated by the client. Otherwise,
42 // PusherID names the stream that pushed the newly opened stream.
43 PusherID uint32
44}
45
46// FrameWriteRequest is a request to write a frame.
47type FrameWriteRequest struct {
48 // write is the interface value that does the writing, once the
49 // WriteScheduler has selected this frame to write. The write
50 // functions are all defined in write.go.
51 write writeFramer
52
53 // stream is the stream on which this frame will be written.
54 // nil for non-stream frames like PING and SETTINGS.
55 stream *stream
56
57 // done, if non-nil, must be a buffered channel with space for
58 // 1 message and is sent the return value from write (or an
59 // earlier error) when the frame has been written.
60 done chan error
61}
62
63// StreamID returns the id of the stream this frame will be written to.
64// 0 is used for non-stream frames such as PING and SETTINGS.
65func (wr FrameWriteRequest) StreamID() uint32 {
66 if wr.stream == nil {
67 if se, ok := wr.write.(StreamError); ok {
68 // (*serverConn).resetStream doesn't set
69 // stream because it doesn't necessarily have
70 // one. So special case this type of write
71 // message.
72 return se.StreamID
73 }
74 return 0
75 }
76 return wr.stream.id
77}
78
79// isControl reports whether wr is a control frame for MaxQueuedControlFrames
80// purposes. That includes non-stream frames and RST_STREAM frames.
81func (wr FrameWriteRequest) isControl() bool {
82 return wr.stream == nil
83}
84
85// DataSize returns the number of flow control bytes that must be consumed
86// to write this entire frame. This is 0 for non-DATA frames.
87func (wr FrameWriteRequest) DataSize() int {
88 if wd, ok := wr.write.(*writeData); ok {
89 return len(wd.p)
90 }
91 return 0
92}
93
94// Consume consumes min(n, available) bytes from this frame, where available
95// is the number of flow control bytes available on the stream. Consume returns
96// 0, 1, or 2 frames, where the integer return value gives the number of frames
97// returned.
98//
99// If flow control prevents consuming any bytes, this returns (_, _, 0). If
100// the entire frame was consumed, this returns (wr, _, 1). Otherwise, this
101// returns (consumed, rest, 2), where 'consumed' contains the consumed bytes and
102// 'rest' contains the remaining bytes. The consumed bytes are deducted from the
103// underlying stream's flow control budget.
104func (wr FrameWriteRequest) Consume(n int32) (FrameWriteRequest, FrameWriteRequest, int) {
105 var empty FrameWriteRequest
106
107 // Non-DATA frames are always consumed whole.
108 wd, ok := wr.write.(*writeData)
109 if !ok || len(wd.p) == 0 {
110 return wr, empty, 1
111 }
112
113 // Might need to split after applying limits.
114 allowed := wr.stream.flow.available()
115 if n < allowed {
116 allowed = n
117 }
118 if wr.stream.sc.maxFrameSize < allowed {
119 allowed = wr.stream.sc.maxFrameSize
120 }
121 if allowed <= 0 {
122 return empty, empty, 0
123 }
124 if len(wd.p) > int(allowed) {
125 wr.stream.flow.take(allowed)
126 consumed := FrameWriteRequest{
127 stream: wr.stream,
128 write: &writeData{
129 streamID: wd.streamID,
130 p: wd.p[:allowed],
131 // Even if the original had endStream set, there
132 // are bytes remaining because len(wd.p) > allowed,
133 // so we know endStream is false.
134 endStream: false,
135 },
136 // Our caller is blocking on the final DATA frame, not
137 // this intermediate frame, so no need to wait.
138 done: nil,
139 }
140 rest := FrameWriteRequest{
141 stream: wr.stream,
142 write: &writeData{
143 streamID: wd.streamID,
144 p: wd.p[allowed:],
145 endStream: wd.endStream,
146 },
147 done: wr.done,
148 }
149 return consumed, rest, 2
150 }
151
152 // The frame is consumed whole.
153 // NB: This cast cannot overflow because allowed is <= math.MaxInt32.
154 wr.stream.flow.take(int32(len(wd.p)))
155 return wr, empty, 1
156}
157
158// String is for debugging only.
159func (wr FrameWriteRequest) String() string {
160 var des string
161 if s, ok := wr.write.(fmt.Stringer); ok {
162 des = s.String()
163 } else {
164 des = fmt.Sprintf("%T", wr.write)
165 }
166 return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des)
167}
168
169// replyToWriter sends err to wr.done and panics if the send must block
170// This does nothing if wr.done is nil.
171func (wr *FrameWriteRequest) replyToWriter(err error) {
172 if wr.done == nil {
173 return
174 }
175 select {
176 case wr.done <- err:
177 default:
178 panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
179 }
180 wr.write = nil // prevent use (assume it's tainted after wr.done send)
181}
182
183// writeQueue is used by implementations of WriteScheduler.
184type writeQueue struct {
185 s []FrameWriteRequest
186}
187
188func (q *writeQueue) empty() bool { return len(q.s) == 0 }
189
190func (q *writeQueue) push(wr FrameWriteRequest) {
191 q.s = append(q.s, wr)
192}
193
194func (q *writeQueue) shift() FrameWriteRequest {
195 if len(q.s) == 0 {
196 panic("invalid use of queue")
197 }
198 wr := q.s[0]
199 // TODO: less copy-happy queue.
200 copy(q.s, q.s[1:])
201 q.s[len(q.s)-1] = FrameWriteRequest{}
202 q.s = q.s[:len(q.s)-1]
203 return wr
204}
205
206// consume consumes up to n bytes from q.s[0]. If the frame is
207// entirely consumed, it is removed from the queue. If the frame
208// is partially consumed, the frame is kept with the consumed
209// bytes removed. Returns true iff any bytes were consumed.
210func (q *writeQueue) consume(n int32) (FrameWriteRequest, bool) {
211 if len(q.s) == 0 {
212 return FrameWriteRequest{}, false
213 }
214 consumed, rest, numresult := q.s[0].Consume(n)
215 switch numresult {
216 case 0:
217 return FrameWriteRequest{}, false
218 case 1:
219 q.shift()
220 case 2:
221 q.s[0] = rest
222 }
223 return consumed, true
224}
225
226type writeQueuePool []*writeQueue
227
228// put inserts an unused writeQueue into the pool.
229func (p *writeQueuePool) put(q *writeQueue) {
230 for i := range q.s {
231 q.s[i] = FrameWriteRequest{}
232 }
233 q.s = q.s[:0]
234 *p = append(*p, q)
235}
236
237// get returns an empty writeQueue.
238func (p *writeQueuePool) get() *writeQueue {
239 ln := len(*p)
240 if ln == 0 {
241 return new(writeQueue)
242 }
243 x := ln - 1
244 q := (*p)[x]
245 (*p)[x] = nil
246 *p = (*p)[:x]
247 return q
248}