blob: b7524ba268bdbd4b472e002a71bae5ecf882f87c [file] [log] [blame]
Scott Baker2d897982019-09-24 11:50:08 -07001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// TODO: turn off the serve goroutine when idle, so
6// an idle conn only has the readFrames goroutine active. (which could
7// also be optimized probably to pin less memory in crypto/tls). This
8// would involve tracking when the serve goroutine is active (atomic
9// int32 read/CAS probably?) and starting it up when frames arrive,
10// and shutting it down when all handlers exit. the occasional PING
11// packets could use time.AfterFunc to call sc.wakeStartServeLoop()
12// (which is a no-op if already running) and then queue the PING write
13// as normal. The serve loop would then exit in most cases (if no
14// Handlers running) and not be woken up again until the PING packet
15// returns.
16
17// TODO (maybe): add a mechanism for Handlers to going into
18// half-closed-local mode (rw.(io.Closer) test?) but not exit their
19// handler, and continue to be able to read from the
20// Request.Body. This would be a somewhat semantic change from HTTP/1
21// (or at least what we expose in net/http), so I'd probably want to
22// add it there too. For now, this package says that returning from
23// the Handler ServeHTTP function means you're both done reading and
24// done writing, without a way to stop just one or the other.
25
26package http2
27
28import (
29 "bufio"
30 "bytes"
31 "context"
32 "crypto/tls"
33 "errors"
34 "fmt"
35 "io"
36 "log"
37 "math"
38 "net"
39 "net/http"
40 "net/textproto"
41 "net/url"
42 "os"
43 "reflect"
44 "runtime"
45 "strconv"
46 "strings"
47 "sync"
48 "time"
49
50 "golang.org/x/net/http/httpguts"
51 "golang.org/x/net/http2/hpack"
52)
53
54const (
Scott Baker8487c5d2019-10-18 12:49:46 -070055 prefaceTimeout = 10 * time.Second
56 firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
57 handlerChunkWriteSize = 4 << 10
58 defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
59 maxQueuedControlFrames = 10000
Scott Baker2d897982019-09-24 11:50:08 -070060)
61
62var (
63 errClientDisconnected = errors.New("client disconnected")
64 errClosedBody = errors.New("body closed by handler")
65 errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
66 errStreamClosed = errors.New("http2: stream closed")
67)
68
69var responseWriterStatePool = sync.Pool{
70 New: func() interface{} {
71 rws := &responseWriterState{}
72 rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
73 return rws
74 },
75}
76
77// Test hooks.
78var (
79 testHookOnConn func()
80 testHookGetServerConn func(*serverConn)
81 testHookOnPanicMu *sync.Mutex // nil except in tests
82 testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool)
83)
84
85// Server is an HTTP/2 server.
86type Server struct {
87 // MaxHandlers limits the number of http.Handler ServeHTTP goroutines
88 // which may run at a time over all connections.
89 // Negative or zero no limit.
90 // TODO: implement
91 MaxHandlers int
92
93 // MaxConcurrentStreams optionally specifies the number of
94 // concurrent streams that each client may have open at a
95 // time. This is unrelated to the number of http.Handler goroutines
96 // which may be active globally, which is MaxHandlers.
97 // If zero, MaxConcurrentStreams defaults to at least 100, per
98 // the HTTP/2 spec's recommendations.
99 MaxConcurrentStreams uint32
100
101 // MaxReadFrameSize optionally specifies the largest frame
102 // this server is willing to read. A valid value is between
103 // 16k and 16M, inclusive. If zero or otherwise invalid, a
104 // default value is used.
105 MaxReadFrameSize uint32
106
107 // PermitProhibitedCipherSuites, if true, permits the use of
108 // cipher suites prohibited by the HTTP/2 spec.
109 PermitProhibitedCipherSuites bool
110
111 // IdleTimeout specifies how long until idle clients should be
112 // closed with a GOAWAY frame. PING frames are not considered
113 // activity for the purposes of IdleTimeout.
114 IdleTimeout time.Duration
115
116 // MaxUploadBufferPerConnection is the size of the initial flow
117 // control window for each connections. The HTTP/2 spec does not
118 // allow this to be smaller than 65535 or larger than 2^32-1.
119 // If the value is outside this range, a default value will be
120 // used instead.
121 MaxUploadBufferPerConnection int32
122
123 // MaxUploadBufferPerStream is the size of the initial flow control
124 // window for each stream. The HTTP/2 spec does not allow this to
125 // be larger than 2^32-1. If the value is zero or larger than the
126 // maximum, a default value will be used instead.
127 MaxUploadBufferPerStream int32
128
129 // NewWriteScheduler constructs a write scheduler for a connection.
130 // If nil, a default scheduler is chosen.
131 NewWriteScheduler func() WriteScheduler
132
133 // Internal state. This is a pointer (rather than embedded directly)
134 // so that we don't embed a Mutex in this struct, which will make the
135 // struct non-copyable, which might break some callers.
136 state *serverInternalState
137}
138
139func (s *Server) initialConnRecvWindowSize() int32 {
140 if s.MaxUploadBufferPerConnection > initialWindowSize {
141 return s.MaxUploadBufferPerConnection
142 }
143 return 1 << 20
144}
145
146func (s *Server) initialStreamRecvWindowSize() int32 {
147 if s.MaxUploadBufferPerStream > 0 {
148 return s.MaxUploadBufferPerStream
149 }
150 return 1 << 20
151}
152
153func (s *Server) maxReadFrameSize() uint32 {
154 if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
155 return v
156 }
157 return defaultMaxReadFrameSize
158}
159
160func (s *Server) maxConcurrentStreams() uint32 {
161 if v := s.MaxConcurrentStreams; v > 0 {
162 return v
163 }
164 return defaultMaxStreams
165}
166
Scott Baker8487c5d2019-10-18 12:49:46 -0700167// maxQueuedControlFrames is the maximum number of control frames like
168// SETTINGS, PING and RST_STREAM that will be queued for writing before
169// the connection is closed to prevent memory exhaustion attacks.
170func (s *Server) maxQueuedControlFrames() int {
171 // TODO: if anybody asks, add a Server field, and remember to define the
172 // behavior of negative values.
173 return maxQueuedControlFrames
174}
175
Scott Baker2d897982019-09-24 11:50:08 -0700176type serverInternalState struct {
177 mu sync.Mutex
178 activeConns map[*serverConn]struct{}
179}
180
181func (s *serverInternalState) registerConn(sc *serverConn) {
182 if s == nil {
183 return // if the Server was used without calling ConfigureServer
184 }
185 s.mu.Lock()
186 s.activeConns[sc] = struct{}{}
187 s.mu.Unlock()
188}
189
190func (s *serverInternalState) unregisterConn(sc *serverConn) {
191 if s == nil {
192 return // if the Server was used without calling ConfigureServer
193 }
194 s.mu.Lock()
195 delete(s.activeConns, sc)
196 s.mu.Unlock()
197}
198
199func (s *serverInternalState) startGracefulShutdown() {
200 if s == nil {
201 return // if the Server was used without calling ConfigureServer
202 }
203 s.mu.Lock()
204 for sc := range s.activeConns {
205 sc.startGracefulShutdown()
206 }
207 s.mu.Unlock()
208}
209
210// ConfigureServer adds HTTP/2 support to a net/http Server.
211//
212// The configuration conf may be nil.
213//
214// ConfigureServer must be called before s begins serving.
215func ConfigureServer(s *http.Server, conf *Server) error {
216 if s == nil {
217 panic("nil *http.Server")
218 }
219 if conf == nil {
220 conf = new(Server)
221 }
222 conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
223 if h1, h2 := s, conf; h2.IdleTimeout == 0 {
224 if h1.IdleTimeout != 0 {
225 h2.IdleTimeout = h1.IdleTimeout
226 } else {
227 h2.IdleTimeout = h1.ReadTimeout
228 }
229 }
230 s.RegisterOnShutdown(conf.state.startGracefulShutdown)
231
232 if s.TLSConfig == nil {
233 s.TLSConfig = new(tls.Config)
234 } else if s.TLSConfig.CipherSuites != nil {
235 // If they already provided a CipherSuite list, return
236 // an error if it has a bad order or is missing
237 // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
238 haveRequired := false
239 sawBad := false
240 for i, cs := range s.TLSConfig.CipherSuites {
241 switch cs {
242 case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
243 // Alternative MTI cipher to not discourage ECDSA-only servers.
244 // See http://golang.org/cl/30721 for further information.
245 tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
246 haveRequired = true
247 }
248 if isBadCipher(cs) {
249 sawBad = true
250 } else if sawBad {
251 return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", i, cs)
252 }
253 }
254 if !haveRequired {
255 return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher.")
256 }
257 }
258
259 // Note: not setting MinVersion to tls.VersionTLS12,
260 // as we don't want to interfere with HTTP/1.1 traffic
261 // on the user's server. We enforce TLS 1.2 later once
262 // we accept a connection. Ideally this should be done
263 // during next-proto selection, but using TLS <1.2 with
264 // HTTP/2 is still the client's bug.
265
266 s.TLSConfig.PreferServerCipherSuites = true
267
268 haveNPN := false
269 for _, p := range s.TLSConfig.NextProtos {
270 if p == NextProtoTLS {
271 haveNPN = true
272 break
273 }
274 }
275 if !haveNPN {
276 s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS)
277 }
278
279 if s.TLSNextProto == nil {
280 s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
281 }
282 protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) {
283 if testHookOnConn != nil {
284 testHookOnConn()
285 }
Scott Baker8487c5d2019-10-18 12:49:46 -0700286 // The TLSNextProto interface predates contexts, so
287 // the net/http package passes down its per-connection
288 // base context via an exported but unadvertised
289 // method on the Handler. This is for internal
290 // net/http<=>http2 use only.
291 var ctx context.Context
292 type baseContexter interface {
293 BaseContext() context.Context
294 }
295 if bc, ok := h.(baseContexter); ok {
296 ctx = bc.BaseContext()
297 }
Scott Baker2d897982019-09-24 11:50:08 -0700298 conf.ServeConn(c, &ServeConnOpts{
Scott Baker8487c5d2019-10-18 12:49:46 -0700299 Context: ctx,
Scott Baker2d897982019-09-24 11:50:08 -0700300 Handler: h,
301 BaseConfig: hs,
302 })
303 }
304 s.TLSNextProto[NextProtoTLS] = protoHandler
305 return nil
306}
307
308// ServeConnOpts are options for the Server.ServeConn method.
309type ServeConnOpts struct {
Scott Baker8487c5d2019-10-18 12:49:46 -0700310 // Context is the base context to use.
311 // If nil, context.Background is used.
312 Context context.Context
313
Scott Baker2d897982019-09-24 11:50:08 -0700314 // BaseConfig optionally sets the base configuration
315 // for values. If nil, defaults are used.
316 BaseConfig *http.Server
317
318 // Handler specifies which handler to use for processing
319 // requests. If nil, BaseConfig.Handler is used. If BaseConfig
320 // or BaseConfig.Handler is nil, http.DefaultServeMux is used.
321 Handler http.Handler
322}
323
Scott Baker8487c5d2019-10-18 12:49:46 -0700324func (o *ServeConnOpts) context() context.Context {
325 if o != nil && o.Context != nil {
326 return o.Context
327 }
328 return context.Background()
329}
330
Scott Baker2d897982019-09-24 11:50:08 -0700331func (o *ServeConnOpts) baseConfig() *http.Server {
332 if o != nil && o.BaseConfig != nil {
333 return o.BaseConfig
334 }
335 return new(http.Server)
336}
337
338func (o *ServeConnOpts) handler() http.Handler {
339 if o != nil {
340 if o.Handler != nil {
341 return o.Handler
342 }
343 if o.BaseConfig != nil && o.BaseConfig.Handler != nil {
344 return o.BaseConfig.Handler
345 }
346 }
347 return http.DefaultServeMux
348}
349
350// ServeConn serves HTTP/2 requests on the provided connection and
351// blocks until the connection is no longer readable.
352//
353// ServeConn starts speaking HTTP/2 assuming that c has not had any
354// reads or writes. It writes its initial settings frame and expects
355// to be able to read the preface and settings frame from the
356// client. If c has a ConnectionState method like a *tls.Conn, the
357// ConnectionState is used to verify the TLS ciphersuite and to set
358// the Request.TLS field in Handlers.
359//
360// ServeConn does not support h2c by itself. Any h2c support must be
361// implemented in terms of providing a suitably-behaving net.Conn.
362//
363// The opts parameter is optional. If nil, default values are used.
364func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
365 baseCtx, cancel := serverConnBaseContext(c, opts)
366 defer cancel()
367
368 sc := &serverConn{
369 srv: s,
370 hs: opts.baseConfig(),
371 conn: c,
372 baseCtx: baseCtx,
373 remoteAddrStr: c.RemoteAddr().String(),
374 bw: newBufferedWriter(c),
375 handler: opts.handler(),
376 streams: make(map[uint32]*stream),
377 readFrameCh: make(chan readFrameResult),
378 wantWriteFrameCh: make(chan FrameWriteRequest, 8),
379 serveMsgCh: make(chan interface{}, 8),
380 wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
381 bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
382 doneServing: make(chan struct{}),
383 clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
384 advMaxStreams: s.maxConcurrentStreams(),
385 initialStreamSendWindowSize: initialWindowSize,
386 maxFrameSize: initialMaxFrameSize,
387 headerTableSize: initialHeaderTableSize,
388 serveG: newGoroutineLock(),
389 pushEnabled: true,
390 }
391
392 s.state.registerConn(sc)
393 defer s.state.unregisterConn(sc)
394
395 // The net/http package sets the write deadline from the
396 // http.Server.WriteTimeout during the TLS handshake, but then
397 // passes the connection off to us with the deadline already set.
398 // Write deadlines are set per stream in serverConn.newStream.
399 // Disarm the net.Conn write deadline here.
400 if sc.hs.WriteTimeout != 0 {
401 sc.conn.SetWriteDeadline(time.Time{})
402 }
403
404 if s.NewWriteScheduler != nil {
405 sc.writeSched = s.NewWriteScheduler()
406 } else {
407 sc.writeSched = NewRandomWriteScheduler()
408 }
409
410 // These start at the RFC-specified defaults. If there is a higher
411 // configured value for inflow, that will be updated when we send a
412 // WINDOW_UPDATE shortly after sending SETTINGS.
413 sc.flow.add(initialWindowSize)
414 sc.inflow.add(initialWindowSize)
415 sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
416
417 fr := NewFramer(sc.bw, c)
418 fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
419 fr.MaxHeaderListSize = sc.maxHeaderListSize()
420 fr.SetMaxReadFrameSize(s.maxReadFrameSize())
421 sc.framer = fr
422
423 if tc, ok := c.(connectionStater); ok {
424 sc.tlsState = new(tls.ConnectionState)
425 *sc.tlsState = tc.ConnectionState()
426 // 9.2 Use of TLS Features
427 // An implementation of HTTP/2 over TLS MUST use TLS
428 // 1.2 or higher with the restrictions on feature set
429 // and cipher suite described in this section. Due to
430 // implementation limitations, it might not be
431 // possible to fail TLS negotiation. An endpoint MUST
432 // immediately terminate an HTTP/2 connection that
433 // does not meet the TLS requirements described in
434 // this section with a connection error (Section
435 // 5.4.1) of type INADEQUATE_SECURITY.
436 if sc.tlsState.Version < tls.VersionTLS12 {
437 sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low")
438 return
439 }
440
441 if sc.tlsState.ServerName == "" {
442 // Client must use SNI, but we don't enforce that anymore,
443 // since it was causing problems when connecting to bare IP
444 // addresses during development.
445 //
446 // TODO: optionally enforce? Or enforce at the time we receive
447 // a new request, and verify the ServerName matches the :authority?
448 // But that precludes proxy situations, perhaps.
449 //
450 // So for now, do nothing here again.
451 }
452
453 if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
454 // "Endpoints MAY choose to generate a connection error
455 // (Section 5.4.1) of type INADEQUATE_SECURITY if one of
456 // the prohibited cipher suites are negotiated."
457 //
458 // We choose that. In my opinion, the spec is weak
459 // here. It also says both parties must support at least
460 // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no
461 // excuses here. If we really must, we could allow an
462 // "AllowInsecureWeakCiphers" option on the server later.
463 // Let's see how it plays out first.
464 sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite))
465 return
466 }
467 }
468
469 if hook := testHookGetServerConn; hook != nil {
470 hook(sc)
471 }
472 sc.serve()
473}
474
475func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) {
Scott Baker8487c5d2019-10-18 12:49:46 -0700476 ctx, cancel = context.WithCancel(opts.context())
Scott Baker2d897982019-09-24 11:50:08 -0700477 ctx = context.WithValue(ctx, http.LocalAddrContextKey, c.LocalAddr())
478 if hs := opts.baseConfig(); hs != nil {
479 ctx = context.WithValue(ctx, http.ServerContextKey, hs)
480 }
481 return
482}
483
484func (sc *serverConn) rejectConn(err ErrCode, debug string) {
485 sc.vlogf("http2: server rejecting conn: %v, %s", err, debug)
486 // ignoring errors. hanging up anyway.
487 sc.framer.WriteGoAway(0, err, []byte(debug))
488 sc.bw.Flush()
489 sc.conn.Close()
490}
491
492type serverConn struct {
493 // Immutable:
494 srv *Server
495 hs *http.Server
496 conn net.Conn
497 bw *bufferedWriter // writing to conn
498 handler http.Handler
499 baseCtx context.Context
500 framer *Framer
501 doneServing chan struct{} // closed when serverConn.serve ends
502 readFrameCh chan readFrameResult // written by serverConn.readFrames
503 wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
504 wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
505 bodyReadCh chan bodyReadMsg // from handlers -> serve
506 serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
507 flow flow // conn-wide (not stream-specific) outbound flow control
508 inflow flow // conn-wide inbound flow control
509 tlsState *tls.ConnectionState // shared by all handlers, like net/http
510 remoteAddrStr string
511 writeSched WriteScheduler
512
513 // Everything following is owned by the serve loop; use serveG.check():
514 serveG goroutineLock // used to verify funcs are on serve()
515 pushEnabled bool
516 sawFirstSettings bool // got the initial SETTINGS frame after the preface
517 needToSendSettingsAck bool
518 unackedSettings int // how many SETTINGS have we sent without ACKs?
Scott Baker8487c5d2019-10-18 12:49:46 -0700519 queuedControlFrames int // control frames in the writeSched queue
Scott Baker2d897982019-09-24 11:50:08 -0700520 clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
521 advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
522 curClientStreams uint32 // number of open streams initiated by the client
523 curPushedStreams uint32 // number of open streams initiated by server push
524 maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
525 maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
526 streams map[uint32]*stream
527 initialStreamSendWindowSize int32
528 maxFrameSize int32
529 headerTableSize uint32
530 peerMaxHeaderListSize uint32 // zero means unknown (default)
531 canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
532 writingFrame bool // started writing a frame (on serve goroutine or separate)
533 writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
534 needsFrameFlush bool // last frame write wasn't a flush
535 inGoAway bool // we've started to or sent GOAWAY
536 inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
537 needToSendGoAway bool // we need to schedule a GOAWAY frame write
538 goAwayCode ErrCode
539 shutdownTimer *time.Timer // nil until used
540 idleTimer *time.Timer // nil if unused
541
542 // Owned by the writeFrameAsync goroutine:
543 headerWriteBuf bytes.Buffer
544 hpackEncoder *hpack.Encoder
545
546 // Used by startGracefulShutdown.
547 shutdownOnce sync.Once
548}
549
550func (sc *serverConn) maxHeaderListSize() uint32 {
551 n := sc.hs.MaxHeaderBytes
552 if n <= 0 {
553 n = http.DefaultMaxHeaderBytes
554 }
555 // http2's count is in a slightly different unit and includes 32 bytes per pair.
556 // So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
557 const perFieldOverhead = 32 // per http2 spec
558 const typicalHeaders = 10 // conservative
559 return uint32(n + typicalHeaders*perFieldOverhead)
560}
561
562func (sc *serverConn) curOpenStreams() uint32 {
563 sc.serveG.check()
564 return sc.curClientStreams + sc.curPushedStreams
565}
566
567// stream represents a stream. This is the minimal metadata needed by
568// the serve goroutine. Most of the actual stream state is owned by
569// the http.Handler's goroutine in the responseWriter. Because the
570// responseWriter's responseWriterState is recycled at the end of a
571// handler, this struct intentionally has no pointer to the
572// *responseWriter{,State} itself, as the Handler ending nils out the
573// responseWriter's state field.
574type stream struct {
575 // immutable:
576 sc *serverConn
577 id uint32
578 body *pipe // non-nil if expecting DATA frames
579 cw closeWaiter // closed wait stream transitions to closed state
580 ctx context.Context
581 cancelCtx func()
582
583 // owned by serverConn's serve loop:
584 bodyBytes int64 // body bytes seen so far
585 declBodyBytes int64 // or -1 if undeclared
586 flow flow // limits writing from Handler to client
587 inflow flow // what the client is allowed to POST/etc to us
588 parent *stream // or nil
589 numTrailerValues int64
590 weight uint8
591 state streamState
592 resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
593 gotTrailerHeader bool // HEADER frame for trailers was seen
594 wroteHeaders bool // whether we wrote headers (not status 100)
595 writeDeadline *time.Timer // nil if unused
596
597 trailer http.Header // accumulated trailers
598 reqTrailer http.Header // handler's Request.Trailer
599}
600
601func (sc *serverConn) Framer() *Framer { return sc.framer }
602func (sc *serverConn) CloseConn() error { return sc.conn.Close() }
603func (sc *serverConn) Flush() error { return sc.bw.Flush() }
604func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
605 return sc.hpackEncoder, &sc.headerWriteBuf
606}
607
608func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
609 sc.serveG.check()
610 // http://tools.ietf.org/html/rfc7540#section-5.1
611 if st, ok := sc.streams[streamID]; ok {
612 return st.state, st
613 }
614 // "The first use of a new stream identifier implicitly closes all
615 // streams in the "idle" state that might have been initiated by
616 // that peer with a lower-valued stream identifier. For example, if
617 // a client sends a HEADERS frame on stream 7 without ever sending a
618 // frame on stream 5, then stream 5 transitions to the "closed"
619 // state when the first frame for stream 7 is sent or received."
620 if streamID%2 == 1 {
621 if streamID <= sc.maxClientStreamID {
622 return stateClosed, nil
623 }
624 } else {
625 if streamID <= sc.maxPushPromiseID {
626 return stateClosed, nil
627 }
628 }
629 return stateIdle, nil
630}
631
632// setConnState calls the net/http ConnState hook for this connection, if configured.
633// Note that the net/http package does StateNew and StateClosed for us.
634// There is currently no plan for StateHijacked or hijacking HTTP/2 connections.
635func (sc *serverConn) setConnState(state http.ConnState) {
636 if sc.hs.ConnState != nil {
637 sc.hs.ConnState(sc.conn, state)
638 }
639}
640
641func (sc *serverConn) vlogf(format string, args ...interface{}) {
642 if VerboseLogs {
643 sc.logf(format, args...)
644 }
645}
646
647func (sc *serverConn) logf(format string, args ...interface{}) {
648 if lg := sc.hs.ErrorLog; lg != nil {
649 lg.Printf(format, args...)
650 } else {
651 log.Printf(format, args...)
652 }
653}
654
655// errno returns v's underlying uintptr, else 0.
656//
657// TODO: remove this helper function once http2 can use build
658// tags. See comment in isClosedConnError.
659func errno(v error) uintptr {
660 if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr {
661 return uintptr(rv.Uint())
662 }
663 return 0
664}
665
666// isClosedConnError reports whether err is an error from use of a closed
667// network connection.
668func isClosedConnError(err error) bool {
669 if err == nil {
670 return false
671 }
672
673 // TODO: remove this string search and be more like the Windows
674 // case below. That might involve modifying the standard library
675 // to return better error types.
676 str := err.Error()
677 if strings.Contains(str, "use of closed network connection") {
678 return true
679 }
680
681 // TODO(bradfitz): x/tools/cmd/bundle doesn't really support
682 // build tags, so I can't make an http2_windows.go file with
683 // Windows-specific stuff. Fix that and move this, once we
684 // have a way to bundle this into std's net/http somehow.
685 if runtime.GOOS == "windows" {
686 if oe, ok := err.(*net.OpError); ok && oe.Op == "read" {
687 if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" {
688 const WSAECONNABORTED = 10053
689 const WSAECONNRESET = 10054
690 if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED {
691 return true
692 }
693 }
694 }
695 }
696 return false
697}
698
699func (sc *serverConn) condlogf(err error, format string, args ...interface{}) {
700 if err == nil {
701 return
702 }
703 if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout {
704 // Boring, expected errors.
705 sc.vlogf(format, args...)
706 } else {
707 sc.logf(format, args...)
708 }
709}
710
711func (sc *serverConn) canonicalHeader(v string) string {
712 sc.serveG.check()
713 buildCommonHeaderMapsOnce()
714 cv, ok := commonCanonHeader[v]
715 if ok {
716 return cv
717 }
718 cv, ok = sc.canonHeader[v]
719 if ok {
720 return cv
721 }
722 if sc.canonHeader == nil {
723 sc.canonHeader = make(map[string]string)
724 }
725 cv = http.CanonicalHeaderKey(v)
726 sc.canonHeader[v] = cv
727 return cv
728}
729
730type readFrameResult struct {
731 f Frame // valid until readMore is called
732 err error
733
734 // readMore should be called once the consumer no longer needs or
735 // retains f. After readMore, f is invalid and more frames can be
736 // read.
737 readMore func()
738}
739
740// readFrames is the loop that reads incoming frames.
741// It takes care to only read one frame at a time, blocking until the
742// consumer is done with the frame.
743// It's run on its own goroutine.
744func (sc *serverConn) readFrames() {
745 gate := make(gate)
746 gateDone := gate.Done
747 for {
748 f, err := sc.framer.ReadFrame()
749 select {
750 case sc.readFrameCh <- readFrameResult{f, err, gateDone}:
751 case <-sc.doneServing:
752 return
753 }
754 select {
755 case <-gate:
756 case <-sc.doneServing:
757 return
758 }
759 if terminalReadFrameError(err) {
760 return
761 }
762 }
763}
764
765// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine.
766type frameWriteResult struct {
767 wr FrameWriteRequest // what was written (or attempted)
768 err error // result of the writeFrame call
769}
770
771// writeFrameAsync runs in its own goroutine and writes a single frame
772// and then reports when it's done.
773// At most one goroutine can be running writeFrameAsync at a time per
774// serverConn.
775func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) {
776 err := wr.write.writeFrame(sc)
777 sc.wroteFrameCh <- frameWriteResult{wr, err}
778}
779
780func (sc *serverConn) closeAllStreamsOnConnClose() {
781 sc.serveG.check()
782 for _, st := range sc.streams {
783 sc.closeStream(st, errClientDisconnected)
784 }
785}
786
787func (sc *serverConn) stopShutdownTimer() {
788 sc.serveG.check()
789 if t := sc.shutdownTimer; t != nil {
790 t.Stop()
791 }
792}
793
794func (sc *serverConn) notePanic() {
795 // Note: this is for serverConn.serve panicking, not http.Handler code.
796 if testHookOnPanicMu != nil {
797 testHookOnPanicMu.Lock()
798 defer testHookOnPanicMu.Unlock()
799 }
800 if testHookOnPanic != nil {
801 if e := recover(); e != nil {
802 if testHookOnPanic(sc, e) {
803 panic(e)
804 }
805 }
806 }
807}
808
809func (sc *serverConn) serve() {
810 sc.serveG.check()
811 defer sc.notePanic()
812 defer sc.conn.Close()
813 defer sc.closeAllStreamsOnConnClose()
814 defer sc.stopShutdownTimer()
815 defer close(sc.doneServing) // unblocks handlers trying to send
816
817 if VerboseLogs {
818 sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
819 }
820
821 sc.writeFrame(FrameWriteRequest{
822 write: writeSettings{
823 {SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
824 {SettingMaxConcurrentStreams, sc.advMaxStreams},
825 {SettingMaxHeaderListSize, sc.maxHeaderListSize()},
826 {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
827 },
828 })
829 sc.unackedSettings++
830
831 // Each connection starts with intialWindowSize inflow tokens.
832 // If a higher value is configured, we add more tokens.
833 if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
834 sc.sendWindowUpdate(nil, int(diff))
835 }
836
837 if err := sc.readPreface(); err != nil {
838 sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
839 return
840 }
841 // Now that we've got the preface, get us out of the
842 // "StateNew" state. We can't go directly to idle, though.
843 // Active means we read some data and anticipate a request. We'll
844 // do another Active when we get a HEADERS frame.
845 sc.setConnState(http.StateActive)
846 sc.setConnState(http.StateIdle)
847
848 if sc.srv.IdleTimeout != 0 {
849 sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
850 defer sc.idleTimer.Stop()
851 }
852
853 go sc.readFrames() // closed by defer sc.conn.Close above
854
855 settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
856 defer settingsTimer.Stop()
857
858 loopNum := 0
859 for {
860 loopNum++
861 select {
862 case wr := <-sc.wantWriteFrameCh:
863 if se, ok := wr.write.(StreamError); ok {
864 sc.resetStream(se)
865 break
866 }
867 sc.writeFrame(wr)
868 case res := <-sc.wroteFrameCh:
869 sc.wroteFrame(res)
870 case res := <-sc.readFrameCh:
871 if !sc.processFrameFromReader(res) {
872 return
873 }
874 res.readMore()
875 if settingsTimer != nil {
876 settingsTimer.Stop()
877 settingsTimer = nil
878 }
879 case m := <-sc.bodyReadCh:
880 sc.noteBodyRead(m.st, m.n)
881 case msg := <-sc.serveMsgCh:
882 switch v := msg.(type) {
883 case func(int):
884 v(loopNum) // for testing
885 case *serverMessage:
886 switch v {
887 case settingsTimerMsg:
888 sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
889 return
890 case idleTimerMsg:
891 sc.vlogf("connection is idle")
892 sc.goAway(ErrCodeNo)
893 case shutdownTimerMsg:
894 sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
895 return
896 case gracefulShutdownMsg:
897 sc.startGracefulShutdownInternal()
898 default:
899 panic("unknown timer")
900 }
901 case *startPushRequest:
902 sc.startPush(v)
903 default:
904 panic(fmt.Sprintf("unexpected type %T", v))
905 }
906 }
907
Scott Baker8487c5d2019-10-18 12:49:46 -0700908 // If the peer is causing us to generate a lot of control frames,
909 // but not reading them from us, assume they are trying to make us
910 // run out of memory.
911 if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() {
912 sc.vlogf("http2: too many control frames in send queue, closing connection")
913 return
914 }
915
Scott Baker2d897982019-09-24 11:50:08 -0700916 // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY
917 // with no error code (graceful shutdown), don't start the timer until
918 // all open streams have been completed.
919 sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame
920 gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0
921 if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) {
922 sc.shutDownIn(goAwayTimeout)
923 }
924 }
925}
926
927func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
928 select {
929 case <-sc.doneServing:
930 case <-sharedCh:
931 close(privateCh)
932 }
933}
934
935type serverMessage int
936
937// Message values sent to serveMsgCh.
938var (
939 settingsTimerMsg = new(serverMessage)
940 idleTimerMsg = new(serverMessage)
941 shutdownTimerMsg = new(serverMessage)
942 gracefulShutdownMsg = new(serverMessage)
943)
944
945func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
946func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) }
947func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
948
949func (sc *serverConn) sendServeMsg(msg interface{}) {
950 sc.serveG.checkNotOn() // NOT
951 select {
952 case sc.serveMsgCh <- msg:
953 case <-sc.doneServing:
954 }
955}
956
957var errPrefaceTimeout = errors.New("timeout waiting for client preface")
958
959// readPreface reads the ClientPreface greeting from the peer or
960// returns errPrefaceTimeout on timeout, or an error if the greeting
961// is invalid.
962func (sc *serverConn) readPreface() error {
963 errc := make(chan error, 1)
964 go func() {
965 // Read the client preface
966 buf := make([]byte, len(ClientPreface))
967 if _, err := io.ReadFull(sc.conn, buf); err != nil {
968 errc <- err
969 } else if !bytes.Equal(buf, clientPreface) {
970 errc <- fmt.Errorf("bogus greeting %q", buf)
971 } else {
972 errc <- nil
973 }
974 }()
975 timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
976 defer timer.Stop()
977 select {
978 case <-timer.C:
979 return errPrefaceTimeout
980 case err := <-errc:
981 if err == nil {
982 if VerboseLogs {
983 sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr())
984 }
985 }
986 return err
987 }
988}
989
990var errChanPool = sync.Pool{
991 New: func() interface{} { return make(chan error, 1) },
992}
993
994var writeDataPool = sync.Pool{
995 New: func() interface{} { return new(writeData) },
996}
997
998// writeDataFromHandler writes DATA response frames from a handler on
999// the given stream.
1000func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
1001 ch := errChanPool.Get().(chan error)
1002 writeArg := writeDataPool.Get().(*writeData)
1003 *writeArg = writeData{stream.id, data, endStream}
1004 err := sc.writeFrameFromHandler(FrameWriteRequest{
1005 write: writeArg,
1006 stream: stream,
1007 done: ch,
1008 })
1009 if err != nil {
1010 return err
1011 }
1012 var frameWriteDone bool // the frame write is done (successfully or not)
1013 select {
1014 case err = <-ch:
1015 frameWriteDone = true
1016 case <-sc.doneServing:
1017 return errClientDisconnected
1018 case <-stream.cw:
1019 // If both ch and stream.cw were ready (as might
1020 // happen on the final Write after an http.Handler
1021 // ends), prefer the write result. Otherwise this
1022 // might just be us successfully closing the stream.
1023 // The writeFrameAsync and serve goroutines guarantee
1024 // that the ch send will happen before the stream.cw
1025 // close.
1026 select {
1027 case err = <-ch:
1028 frameWriteDone = true
1029 default:
1030 return errStreamClosed
1031 }
1032 }
1033 errChanPool.Put(ch)
1034 if frameWriteDone {
1035 writeDataPool.Put(writeArg)
1036 }
1037 return err
1038}
1039
1040// writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts
1041// if the connection has gone away.
1042//
1043// This must not be run from the serve goroutine itself, else it might
1044// deadlock writing to sc.wantWriteFrameCh (which is only mildly
1045// buffered and is read by serve itself). If you're on the serve
1046// goroutine, call writeFrame instead.
1047func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
1048 sc.serveG.checkNotOn() // NOT
1049 select {
1050 case sc.wantWriteFrameCh <- wr:
1051 return nil
1052 case <-sc.doneServing:
1053 // Serve loop is gone.
1054 // Client has closed their connection to the server.
1055 return errClientDisconnected
1056 }
1057}
1058
1059// writeFrame schedules a frame to write and sends it if there's nothing
1060// already being written.
1061//
1062// There is no pushback here (the serve goroutine never blocks). It's
1063// the http.Handlers that block, waiting for their previous frames to
1064// make it onto the wire
1065//
1066// If you're not on the serve goroutine, use writeFrameFromHandler instead.
1067func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
1068 sc.serveG.check()
1069
1070 // If true, wr will not be written and wr.done will not be signaled.
1071 var ignoreWrite bool
1072
1073 // We are not allowed to write frames on closed streams. RFC 7540 Section
1074 // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
1075 // a closed stream." Our server never sends PRIORITY, so that exception
1076 // does not apply.
1077 //
1078 // The serverConn might close an open stream while the stream's handler
1079 // is still running. For example, the server might close a stream when it
1080 // receives bad data from the client. If this happens, the handler might
1081 // attempt to write a frame after the stream has been closed (since the
1082 // handler hasn't yet been notified of the close). In this case, we simply
1083 // ignore the frame. The handler will notice that the stream is closed when
1084 // it waits for the frame to be written.
1085 //
1086 // As an exception to this rule, we allow sending RST_STREAM after close.
1087 // This allows us to immediately reject new streams without tracking any
1088 // state for those streams (except for the queued RST_STREAM frame). This
1089 // may result in duplicate RST_STREAMs in some cases, but the client should
1090 // ignore those.
1091 if wr.StreamID() != 0 {
1092 _, isReset := wr.write.(StreamError)
1093 if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
1094 ignoreWrite = true
1095 }
1096 }
1097
1098 // Don't send a 100-continue response if we've already sent headers.
1099 // See golang.org/issue/14030.
1100 switch wr.write.(type) {
1101 case *writeResHeaders:
1102 wr.stream.wroteHeaders = true
1103 case write100ContinueHeadersFrame:
1104 if wr.stream.wroteHeaders {
1105 // We do not need to notify wr.done because this frame is
1106 // never written with wr.done != nil.
1107 if wr.done != nil {
1108 panic("wr.done != nil for write100ContinueHeadersFrame")
1109 }
1110 ignoreWrite = true
1111 }
1112 }
1113
1114 if !ignoreWrite {
Scott Baker8487c5d2019-10-18 12:49:46 -07001115 if wr.isControl() {
1116 sc.queuedControlFrames++
1117 // For extra safety, detect wraparounds, which should not happen,
1118 // and pull the plug.
1119 if sc.queuedControlFrames < 0 {
1120 sc.conn.Close()
1121 }
1122 }
Scott Baker2d897982019-09-24 11:50:08 -07001123 sc.writeSched.Push(wr)
1124 }
1125 sc.scheduleFrameWrite()
1126}
1127
1128// startFrameWrite starts a goroutine to write wr (in a separate
1129// goroutine since that might block on the network), and updates the
1130// serve goroutine's state about the world, updated from info in wr.
1131func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
1132 sc.serveG.check()
1133 if sc.writingFrame {
1134 panic("internal error: can only be writing one frame at a time")
1135 }
1136
1137 st := wr.stream
1138 if st != nil {
1139 switch st.state {
1140 case stateHalfClosedLocal:
1141 switch wr.write.(type) {
1142 case StreamError, handlerPanicRST, writeWindowUpdate:
1143 // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
1144 // in this state. (We never send PRIORITY from the server, so that is not checked.)
1145 default:
1146 panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
1147 }
1148 case stateClosed:
1149 panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr))
1150 }
1151 }
1152 if wpp, ok := wr.write.(*writePushPromise); ok {
1153 var err error
1154 wpp.promisedID, err = wpp.allocatePromisedID()
1155 if err != nil {
1156 sc.writingFrameAsync = false
1157 wr.replyToWriter(err)
1158 return
1159 }
1160 }
1161
1162 sc.writingFrame = true
1163 sc.needsFrameFlush = true
1164 if wr.write.staysWithinBuffer(sc.bw.Available()) {
1165 sc.writingFrameAsync = false
1166 err := wr.write.writeFrame(sc)
1167 sc.wroteFrame(frameWriteResult{wr, err})
1168 } else {
1169 sc.writingFrameAsync = true
1170 go sc.writeFrameAsync(wr)
1171 }
1172}
1173
1174// errHandlerPanicked is the error given to any callers blocked in a read from
1175// Request.Body when the main goroutine panics. Since most handlers read in the
1176// main ServeHTTP goroutine, this will show up rarely.
1177var errHandlerPanicked = errors.New("http2: handler panicked")
1178
1179// wroteFrame is called on the serve goroutine with the result of
1180// whatever happened on writeFrameAsync.
1181func (sc *serverConn) wroteFrame(res frameWriteResult) {
1182 sc.serveG.check()
1183 if !sc.writingFrame {
1184 panic("internal error: expected to be already writing a frame")
1185 }
1186 sc.writingFrame = false
1187 sc.writingFrameAsync = false
1188
1189 wr := res.wr
1190
1191 if writeEndsStream(wr.write) {
1192 st := wr.stream
1193 if st == nil {
1194 panic("internal error: expecting non-nil stream")
1195 }
1196 switch st.state {
1197 case stateOpen:
1198 // Here we would go to stateHalfClosedLocal in
1199 // theory, but since our handler is done and
1200 // the net/http package provides no mechanism
1201 // for closing a ResponseWriter while still
1202 // reading data (see possible TODO at top of
1203 // this file), we go into closed state here
1204 // anyway, after telling the peer we're
1205 // hanging up on them. We'll transition to
1206 // stateClosed after the RST_STREAM frame is
1207 // written.
1208 st.state = stateHalfClosedLocal
1209 // Section 8.1: a server MAY request that the client abort
1210 // transmission of a request without error by sending a
1211 // RST_STREAM with an error code of NO_ERROR after sending
1212 // a complete response.
1213 sc.resetStream(streamError(st.id, ErrCodeNo))
1214 case stateHalfClosedRemote:
1215 sc.closeStream(st, errHandlerComplete)
1216 }
1217 } else {
1218 switch v := wr.write.(type) {
1219 case StreamError:
1220 // st may be unknown if the RST_STREAM was generated to reject bad input.
1221 if st, ok := sc.streams[v.StreamID]; ok {
1222 sc.closeStream(st, v)
1223 }
1224 case handlerPanicRST:
1225 sc.closeStream(wr.stream, errHandlerPanicked)
1226 }
1227 }
1228
1229 // Reply (if requested) to unblock the ServeHTTP goroutine.
1230 wr.replyToWriter(res.err)
1231
1232 sc.scheduleFrameWrite()
1233}
1234
1235// scheduleFrameWrite tickles the frame writing scheduler.
1236//
1237// If a frame is already being written, nothing happens. This will be called again
1238// when the frame is done being written.
1239//
Scott Baker8487c5d2019-10-18 12:49:46 -07001240// If a frame isn't being written and we need to send one, the best frame
1241// to send is selected by writeSched.
Scott Baker2d897982019-09-24 11:50:08 -07001242//
1243// If a frame isn't being written and there's nothing else to send, we
1244// flush the write buffer.
1245func (sc *serverConn) scheduleFrameWrite() {
1246 sc.serveG.check()
1247 if sc.writingFrame || sc.inFrameScheduleLoop {
1248 return
1249 }
1250 sc.inFrameScheduleLoop = true
1251 for !sc.writingFrameAsync {
1252 if sc.needToSendGoAway {
1253 sc.needToSendGoAway = false
1254 sc.startFrameWrite(FrameWriteRequest{
1255 write: &writeGoAway{
1256 maxStreamID: sc.maxClientStreamID,
1257 code: sc.goAwayCode,
1258 },
1259 })
1260 continue
1261 }
1262 if sc.needToSendSettingsAck {
1263 sc.needToSendSettingsAck = false
1264 sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
1265 continue
1266 }
1267 if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
1268 if wr, ok := sc.writeSched.Pop(); ok {
Scott Baker8487c5d2019-10-18 12:49:46 -07001269 if wr.isControl() {
1270 sc.queuedControlFrames--
1271 }
Scott Baker2d897982019-09-24 11:50:08 -07001272 sc.startFrameWrite(wr)
1273 continue
1274 }
1275 }
1276 if sc.needsFrameFlush {
1277 sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
1278 sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
1279 continue
1280 }
1281 break
1282 }
1283 sc.inFrameScheduleLoop = false
1284}
1285
1286// startGracefulShutdown gracefully shuts down a connection. This
1287// sends GOAWAY with ErrCodeNo to tell the client we're gracefully
1288// shutting down. The connection isn't closed until all current
1289// streams are done.
1290//
1291// startGracefulShutdown returns immediately; it does not wait until
1292// the connection has shut down.
1293func (sc *serverConn) startGracefulShutdown() {
1294 sc.serveG.checkNotOn() // NOT
1295 sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) })
1296}
1297
1298// After sending GOAWAY, the connection will close after goAwayTimeout.
1299// If we close the connection immediately after sending GOAWAY, there may
1300// be unsent data in our kernel receive buffer, which will cause the kernel
1301// to send a TCP RST on close() instead of a FIN. This RST will abort the
1302// connection immediately, whether or not the client had received the GOAWAY.
1303//
1304// Ideally we should delay for at least 1 RTT + epsilon so the client has
1305// a chance to read the GOAWAY and stop sending messages. Measuring RTT
1306// is hard, so we approximate with 1 second. See golang.org/issue/18701.
1307//
1308// This is a var so it can be shorter in tests, where all requests uses the
1309// loopback interface making the expected RTT very small.
1310//
1311// TODO: configurable?
1312var goAwayTimeout = 1 * time.Second
1313
1314func (sc *serverConn) startGracefulShutdownInternal() {
1315 sc.goAway(ErrCodeNo)
1316}
1317
1318func (sc *serverConn) goAway(code ErrCode) {
1319 sc.serveG.check()
1320 if sc.inGoAway {
1321 return
1322 }
1323 sc.inGoAway = true
1324 sc.needToSendGoAway = true
1325 sc.goAwayCode = code
1326 sc.scheduleFrameWrite()
1327}
1328
1329func (sc *serverConn) shutDownIn(d time.Duration) {
1330 sc.serveG.check()
1331 sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
1332}
1333
1334func (sc *serverConn) resetStream(se StreamError) {
1335 sc.serveG.check()
1336 sc.writeFrame(FrameWriteRequest{write: se})
1337 if st, ok := sc.streams[se.StreamID]; ok {
1338 st.resetQueued = true
1339 }
1340}
1341
1342// processFrameFromReader processes the serve loop's read from readFrameCh from the
1343// frame-reading goroutine.
1344// processFrameFromReader returns whether the connection should be kept open.
1345func (sc *serverConn) processFrameFromReader(res readFrameResult) bool {
1346 sc.serveG.check()
1347 err := res.err
1348 if err != nil {
1349 if err == ErrFrameTooLarge {
1350 sc.goAway(ErrCodeFrameSize)
1351 return true // goAway will close the loop
1352 }
1353 clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err)
1354 if clientGone {
1355 // TODO: could we also get into this state if
1356 // the peer does a half close
1357 // (e.g. CloseWrite) because they're done
1358 // sending frames but they're still wanting
1359 // our open replies? Investigate.
1360 // TODO: add CloseWrite to crypto/tls.Conn first
1361 // so we have a way to test this? I suppose
1362 // just for testing we could have a non-TLS mode.
1363 return false
1364 }
1365 } else {
1366 f := res.f
1367 if VerboseLogs {
1368 sc.vlogf("http2: server read frame %v", summarizeFrame(f))
1369 }
1370 err = sc.processFrame(f)
1371 if err == nil {
1372 return true
1373 }
1374 }
1375
1376 switch ev := err.(type) {
1377 case StreamError:
1378 sc.resetStream(ev)
1379 return true
1380 case goAwayFlowError:
1381 sc.goAway(ErrCodeFlowControl)
1382 return true
1383 case ConnectionError:
1384 sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
1385 sc.goAway(ErrCode(ev))
1386 return true // goAway will handle shutdown
1387 default:
1388 if res.err != nil {
1389 sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
1390 } else {
1391 sc.logf("http2: server closing client connection: %v", err)
1392 }
1393 return false
1394 }
1395}
1396
1397func (sc *serverConn) processFrame(f Frame) error {
1398 sc.serveG.check()
1399
1400 // First frame received must be SETTINGS.
1401 if !sc.sawFirstSettings {
1402 if _, ok := f.(*SettingsFrame); !ok {
1403 return ConnectionError(ErrCodeProtocol)
1404 }
1405 sc.sawFirstSettings = true
1406 }
1407
1408 switch f := f.(type) {
1409 case *SettingsFrame:
1410 return sc.processSettings(f)
1411 case *MetaHeadersFrame:
1412 return sc.processHeaders(f)
1413 case *WindowUpdateFrame:
1414 return sc.processWindowUpdate(f)
1415 case *PingFrame:
1416 return sc.processPing(f)
1417 case *DataFrame:
1418 return sc.processData(f)
1419 case *RSTStreamFrame:
1420 return sc.processResetStream(f)
1421 case *PriorityFrame:
1422 return sc.processPriority(f)
1423 case *GoAwayFrame:
1424 return sc.processGoAway(f)
1425 case *PushPromiseFrame:
1426 // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
1427 // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1428 return ConnectionError(ErrCodeProtocol)
1429 default:
1430 sc.vlogf("http2: server ignoring frame: %v", f.Header())
1431 return nil
1432 }
1433}
1434
1435func (sc *serverConn) processPing(f *PingFrame) error {
1436 sc.serveG.check()
1437 if f.IsAck() {
1438 // 6.7 PING: " An endpoint MUST NOT respond to PING frames
1439 // containing this flag."
1440 return nil
1441 }
1442 if f.StreamID != 0 {
1443 // "PING frames are not associated with any individual
1444 // stream. If a PING frame is received with a stream
1445 // identifier field value other than 0x0, the recipient MUST
1446 // respond with a connection error (Section 5.4.1) of type
1447 // PROTOCOL_ERROR."
1448 return ConnectionError(ErrCodeProtocol)
1449 }
1450 if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1451 return nil
1452 }
1453 sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
1454 return nil
1455}
1456
1457func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
1458 sc.serveG.check()
1459 switch {
1460 case f.StreamID != 0: // stream-level flow control
1461 state, st := sc.state(f.StreamID)
1462 if state == stateIdle {
1463 // Section 5.1: "Receiving any frame other than HEADERS
1464 // or PRIORITY on a stream in this state MUST be
1465 // treated as a connection error (Section 5.4.1) of
1466 // type PROTOCOL_ERROR."
1467 return ConnectionError(ErrCodeProtocol)
1468 }
1469 if st == nil {
1470 // "WINDOW_UPDATE can be sent by a peer that has sent a
1471 // frame bearing the END_STREAM flag. This means that a
1472 // receiver could receive a WINDOW_UPDATE frame on a "half
1473 // closed (remote)" or "closed" stream. A receiver MUST
1474 // NOT treat this as an error, see Section 5.1."
1475 return nil
1476 }
1477 if !st.flow.add(int32(f.Increment)) {
1478 return streamError(f.StreamID, ErrCodeFlowControl)
1479 }
1480 default: // connection-level flow control
1481 if !sc.flow.add(int32(f.Increment)) {
1482 return goAwayFlowError{}
1483 }
1484 }
1485 sc.scheduleFrameWrite()
1486 return nil
1487}
1488
1489func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
1490 sc.serveG.check()
1491
1492 state, st := sc.state(f.StreamID)
1493 if state == stateIdle {
1494 // 6.4 "RST_STREAM frames MUST NOT be sent for a
1495 // stream in the "idle" state. If a RST_STREAM frame
1496 // identifying an idle stream is received, the
1497 // recipient MUST treat this as a connection error
1498 // (Section 5.4.1) of type PROTOCOL_ERROR.
1499 return ConnectionError(ErrCodeProtocol)
1500 }
1501 if st != nil {
1502 st.cancelCtx()
1503 sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
1504 }
1505 return nil
1506}
1507
1508func (sc *serverConn) closeStream(st *stream, err error) {
1509 sc.serveG.check()
1510 if st.state == stateIdle || st.state == stateClosed {
1511 panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
1512 }
1513 st.state = stateClosed
1514 if st.writeDeadline != nil {
1515 st.writeDeadline.Stop()
1516 }
1517 if st.isPushed() {
1518 sc.curPushedStreams--
1519 } else {
1520 sc.curClientStreams--
1521 }
1522 delete(sc.streams, st.id)
1523 if len(sc.streams) == 0 {
1524 sc.setConnState(http.StateIdle)
1525 if sc.srv.IdleTimeout != 0 {
1526 sc.idleTimer.Reset(sc.srv.IdleTimeout)
1527 }
1528 if h1ServerKeepAlivesDisabled(sc.hs) {
1529 sc.startGracefulShutdownInternal()
1530 }
1531 }
1532 if p := st.body; p != nil {
1533 // Return any buffered unread bytes worth of conn-level flow control.
1534 // See golang.org/issue/16481
1535 sc.sendWindowUpdate(nil, p.Len())
1536
1537 p.CloseWithError(err)
1538 }
1539 st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
1540 sc.writeSched.CloseStream(st.id)
1541}
1542
1543func (sc *serverConn) processSettings(f *SettingsFrame) error {
1544 sc.serveG.check()
1545 if f.IsAck() {
1546 sc.unackedSettings--
1547 if sc.unackedSettings < 0 {
1548 // Why is the peer ACKing settings we never sent?
1549 // The spec doesn't mention this case, but
1550 // hang up on them anyway.
1551 return ConnectionError(ErrCodeProtocol)
1552 }
1553 return nil
1554 }
1555 if f.NumSettings() > 100 || f.HasDuplicates() {
1556 // This isn't actually in the spec, but hang up on
1557 // suspiciously large settings frames or those with
1558 // duplicate entries.
1559 return ConnectionError(ErrCodeProtocol)
1560 }
1561 if err := f.ForeachSetting(sc.processSetting); err != nil {
1562 return err
1563 }
Scott Baker8487c5d2019-10-18 12:49:46 -07001564 // TODO: judging by RFC 7540, Section 6.5.3 each SETTINGS frame should be
1565 // acknowledged individually, even if multiple are received before the ACK.
Scott Baker2d897982019-09-24 11:50:08 -07001566 sc.needToSendSettingsAck = true
1567 sc.scheduleFrameWrite()
1568 return nil
1569}
1570
1571func (sc *serverConn) processSetting(s Setting) error {
1572 sc.serveG.check()
1573 if err := s.Valid(); err != nil {
1574 return err
1575 }
1576 if VerboseLogs {
1577 sc.vlogf("http2: server processing setting %v", s)
1578 }
1579 switch s.ID {
1580 case SettingHeaderTableSize:
1581 sc.headerTableSize = s.Val
1582 sc.hpackEncoder.SetMaxDynamicTableSize(s.Val)
1583 case SettingEnablePush:
1584 sc.pushEnabled = s.Val != 0
1585 case SettingMaxConcurrentStreams:
1586 sc.clientMaxStreams = s.Val
1587 case SettingInitialWindowSize:
1588 return sc.processSettingInitialWindowSize(s.Val)
1589 case SettingMaxFrameSize:
1590 sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
1591 case SettingMaxHeaderListSize:
1592 sc.peerMaxHeaderListSize = s.Val
1593 default:
1594 // Unknown setting: "An endpoint that receives a SETTINGS
1595 // frame with any unknown or unsupported identifier MUST
1596 // ignore that setting."
1597 if VerboseLogs {
1598 sc.vlogf("http2: server ignoring unknown setting %v", s)
1599 }
1600 }
1601 return nil
1602}
1603
1604func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
1605 sc.serveG.check()
1606 // Note: val already validated to be within range by
1607 // processSetting's Valid call.
1608
1609 // "A SETTINGS frame can alter the initial flow control window
1610 // size for all current streams. When the value of
1611 // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
1612 // adjust the size of all stream flow control windows that it
1613 // maintains by the difference between the new value and the
1614 // old value."
1615 old := sc.initialStreamSendWindowSize
1616 sc.initialStreamSendWindowSize = int32(val)
1617 growth := int32(val) - old // may be negative
1618 for _, st := range sc.streams {
1619 if !st.flow.add(growth) {
1620 // 6.9.2 Initial Flow Control Window Size
1621 // "An endpoint MUST treat a change to
1622 // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
1623 // control window to exceed the maximum size as a
1624 // connection error (Section 5.4.1) of type
1625 // FLOW_CONTROL_ERROR."
1626 return ConnectionError(ErrCodeFlowControl)
1627 }
1628 }
1629 return nil
1630}
1631
1632func (sc *serverConn) processData(f *DataFrame) error {
1633 sc.serveG.check()
1634 if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1635 return nil
1636 }
1637 data := f.Data()
1638
1639 // "If a DATA frame is received whose stream is not in "open"
1640 // or "half closed (local)" state, the recipient MUST respond
1641 // with a stream error (Section 5.4.2) of type STREAM_CLOSED."
1642 id := f.Header().StreamID
1643 state, st := sc.state(id)
1644 if id == 0 || state == stateIdle {
1645 // Section 5.1: "Receiving any frame other than HEADERS
1646 // or PRIORITY on a stream in this state MUST be
1647 // treated as a connection error (Section 5.4.1) of
1648 // type PROTOCOL_ERROR."
1649 return ConnectionError(ErrCodeProtocol)
1650 }
1651 if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
1652 // This includes sending a RST_STREAM if the stream is
1653 // in stateHalfClosedLocal (which currently means that
1654 // the http.Handler returned, so it's done reading &
1655 // done writing). Try to stop the client from sending
1656 // more DATA.
1657
1658 // But still enforce their connection-level flow control,
1659 // and return any flow control bytes since we're not going
1660 // to consume them.
1661 if sc.inflow.available() < int32(f.Length) {
1662 return streamError(id, ErrCodeFlowControl)
1663 }
1664 // Deduct the flow control from inflow, since we're
1665 // going to immediately add it back in
1666 // sendWindowUpdate, which also schedules sending the
1667 // frames.
1668 sc.inflow.take(int32(f.Length))
1669 sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
1670
1671 if st != nil && st.resetQueued {
1672 // Already have a stream error in flight. Don't send another.
1673 return nil
1674 }
1675 return streamError(id, ErrCodeStreamClosed)
1676 }
1677 if st.body == nil {
1678 panic("internal error: should have a body in this state")
1679 }
1680
1681 // Sender sending more than they'd declared?
1682 if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
1683 st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
1684 // RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
1685 // value of a content-length header field does not equal the sum of the
1686 // DATA frame payload lengths that form the body.
1687 return streamError(id, ErrCodeProtocol)
1688 }
1689 if f.Length > 0 {
1690 // Check whether the client has flow control quota.
1691 if st.inflow.available() < int32(f.Length) {
1692 return streamError(id, ErrCodeFlowControl)
1693 }
1694 st.inflow.take(int32(f.Length))
1695
1696 if len(data) > 0 {
1697 wrote, err := st.body.Write(data)
1698 if err != nil {
1699 return streamError(id, ErrCodeStreamClosed)
1700 }
1701 if wrote != len(data) {
1702 panic("internal error: bad Writer")
1703 }
1704 st.bodyBytes += int64(len(data))
1705 }
1706
1707 // Return any padded flow control now, since we won't
1708 // refund it later on body reads.
1709 if pad := int32(f.Length) - int32(len(data)); pad > 0 {
1710 sc.sendWindowUpdate32(nil, pad)
1711 sc.sendWindowUpdate32(st, pad)
1712 }
1713 }
1714 if f.StreamEnded() {
1715 st.endStream()
1716 }
1717 return nil
1718}
1719
1720func (sc *serverConn) processGoAway(f *GoAwayFrame) error {
1721 sc.serveG.check()
1722 if f.ErrCode != ErrCodeNo {
1723 sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1724 } else {
1725 sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1726 }
1727 sc.startGracefulShutdownInternal()
1728 // http://tools.ietf.org/html/rfc7540#section-6.8
1729 // We should not create any new streams, which means we should disable push.
1730 sc.pushEnabled = false
1731 return nil
1732}
1733
1734// isPushed reports whether the stream is server-initiated.
1735func (st *stream) isPushed() bool {
1736 return st.id%2 == 0
1737}
1738
1739// endStream closes a Request.Body's pipe. It is called when a DATA
1740// frame says a request body is over (or after trailers).
1741func (st *stream) endStream() {
1742 sc := st.sc
1743 sc.serveG.check()
1744
1745 if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
1746 st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
1747 st.declBodyBytes, st.bodyBytes))
1748 } else {
1749 st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest)
1750 st.body.CloseWithError(io.EOF)
1751 }
1752 st.state = stateHalfClosedRemote
1753}
1754
1755// copyTrailersToHandlerRequest is run in the Handler's goroutine in
1756// its Request.Body.Read just before it gets io.EOF.
1757func (st *stream) copyTrailersToHandlerRequest() {
1758 for k, vv := range st.trailer {
1759 if _, ok := st.reqTrailer[k]; ok {
1760 // Only copy it over it was pre-declared.
1761 st.reqTrailer[k] = vv
1762 }
1763 }
1764}
1765
1766// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
1767// when the stream's WriteTimeout has fired.
1768func (st *stream) onWriteTimeout() {
1769 st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)})
1770}
1771
1772func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
1773 sc.serveG.check()
1774 id := f.StreamID
1775 if sc.inGoAway {
1776 // Ignore.
1777 return nil
1778 }
1779 // http://tools.ietf.org/html/rfc7540#section-5.1.1
1780 // Streams initiated by a client MUST use odd-numbered stream
1781 // identifiers. [...] An endpoint that receives an unexpected
1782 // stream identifier MUST respond with a connection error
1783 // (Section 5.4.1) of type PROTOCOL_ERROR.
1784 if id%2 != 1 {
1785 return ConnectionError(ErrCodeProtocol)
1786 }
1787 // A HEADERS frame can be used to create a new stream or
1788 // send a trailer for an open one. If we already have a stream
1789 // open, let it process its own HEADERS frame (trailers at this
1790 // point, if it's valid).
1791 if st := sc.streams[f.StreamID]; st != nil {
1792 if st.resetQueued {
1793 // We're sending RST_STREAM to close the stream, so don't bother
1794 // processing this frame.
1795 return nil
1796 }
1797 // RFC 7540, sec 5.1: If an endpoint receives additional frames, other than
1798 // WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in
1799 // this state, it MUST respond with a stream error (Section 5.4.2) of
1800 // type STREAM_CLOSED.
1801 if st.state == stateHalfClosedRemote {
1802 return streamError(id, ErrCodeStreamClosed)
1803 }
1804 return st.processTrailerHeaders(f)
1805 }
1806
1807 // [...] The identifier of a newly established stream MUST be
1808 // numerically greater than all streams that the initiating
1809 // endpoint has opened or reserved. [...] An endpoint that
1810 // receives an unexpected stream identifier MUST respond with
1811 // a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1812 if id <= sc.maxClientStreamID {
1813 return ConnectionError(ErrCodeProtocol)
1814 }
1815 sc.maxClientStreamID = id
1816
1817 if sc.idleTimer != nil {
1818 sc.idleTimer.Stop()
1819 }
1820
1821 // http://tools.ietf.org/html/rfc7540#section-5.1.2
1822 // [...] Endpoints MUST NOT exceed the limit set by their peer. An
1823 // endpoint that receives a HEADERS frame that causes their
1824 // advertised concurrent stream limit to be exceeded MUST treat
1825 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
1826 // or REFUSED_STREAM.
1827 if sc.curClientStreams+1 > sc.advMaxStreams {
1828 if sc.unackedSettings == 0 {
1829 // They should know better.
1830 return streamError(id, ErrCodeProtocol)
1831 }
1832 // Assume it's a network race, where they just haven't
1833 // received our last SETTINGS update. But actually
1834 // this can't happen yet, because we don't yet provide
1835 // a way for users to adjust server parameters at
1836 // runtime.
1837 return streamError(id, ErrCodeRefusedStream)
1838 }
1839
1840 initialState := stateOpen
1841 if f.StreamEnded() {
1842 initialState = stateHalfClosedRemote
1843 }
1844 st := sc.newStream(id, 0, initialState)
1845
1846 if f.HasPriority() {
1847 if err := checkPriority(f.StreamID, f.Priority); err != nil {
1848 return err
1849 }
1850 sc.writeSched.AdjustStream(st.id, f.Priority)
1851 }
1852
1853 rw, req, err := sc.newWriterAndRequest(st, f)
1854 if err != nil {
1855 return err
1856 }
1857 st.reqTrailer = req.Trailer
1858 if st.reqTrailer != nil {
1859 st.trailer = make(http.Header)
1860 }
1861 st.body = req.Body.(*requestBody).pipe // may be nil
1862 st.declBodyBytes = req.ContentLength
1863
1864 handler := sc.handler.ServeHTTP
1865 if f.Truncated {
1866 // Their header list was too long. Send a 431 error.
1867 handler = handleHeaderListTooLong
1868 } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
1869 handler = new400Handler(err)
1870 }
1871
1872 // The net/http package sets the read deadline from the
1873 // http.Server.ReadTimeout during the TLS handshake, but then
1874 // passes the connection off to us with the deadline already
1875 // set. Disarm it here after the request headers are read,
1876 // similar to how the http1 server works. Here it's
1877 // technically more like the http1 Server's ReadHeaderTimeout
1878 // (in Go 1.8), though. That's a more sane option anyway.
1879 if sc.hs.ReadTimeout != 0 {
1880 sc.conn.SetReadDeadline(time.Time{})
1881 }
1882
1883 go sc.runHandler(rw, req, handler)
1884 return nil
1885}
1886
1887func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
1888 sc := st.sc
1889 sc.serveG.check()
1890 if st.gotTrailerHeader {
1891 return ConnectionError(ErrCodeProtocol)
1892 }
1893 st.gotTrailerHeader = true
1894 if !f.StreamEnded() {
1895 return streamError(st.id, ErrCodeProtocol)
1896 }
1897
1898 if len(f.PseudoFields()) > 0 {
1899 return streamError(st.id, ErrCodeProtocol)
1900 }
1901 if st.trailer != nil {
1902 for _, hf := range f.RegularFields() {
1903 key := sc.canonicalHeader(hf.Name)
1904 if !httpguts.ValidTrailerHeader(key) {
1905 // TODO: send more details to the peer somehow. But http2 has
1906 // no way to send debug data at a stream level. Discuss with
1907 // HTTP folk.
1908 return streamError(st.id, ErrCodeProtocol)
1909 }
1910 st.trailer[key] = append(st.trailer[key], hf.Value)
1911 }
1912 }
1913 st.endStream()
1914 return nil
1915}
1916
1917func checkPriority(streamID uint32, p PriorityParam) error {
1918 if streamID == p.StreamDep {
1919 // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
1920 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
1921 // Section 5.3.3 says that a stream can depend on one of its dependencies,
1922 // so it's only self-dependencies that are forbidden.
1923 return streamError(streamID, ErrCodeProtocol)
1924 }
1925 return nil
1926}
1927
1928func (sc *serverConn) processPriority(f *PriorityFrame) error {
1929 if sc.inGoAway {
1930 return nil
1931 }
1932 if err := checkPriority(f.StreamID, f.PriorityParam); err != nil {
1933 return err
1934 }
1935 sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
1936 return nil
1937}
1938
1939func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
1940 sc.serveG.check()
1941 if id == 0 {
1942 panic("internal error: cannot create stream with id 0")
1943 }
1944
1945 ctx, cancelCtx := context.WithCancel(sc.baseCtx)
1946 st := &stream{
1947 sc: sc,
1948 id: id,
1949 state: state,
1950 ctx: ctx,
1951 cancelCtx: cancelCtx,
1952 }
1953 st.cw.Init()
1954 st.flow.conn = &sc.flow // link to conn-level counter
1955 st.flow.add(sc.initialStreamSendWindowSize)
1956 st.inflow.conn = &sc.inflow // link to conn-level counter
1957 st.inflow.add(sc.srv.initialStreamRecvWindowSize())
1958 if sc.hs.WriteTimeout != 0 {
1959 st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
1960 }
1961
1962 sc.streams[id] = st
1963 sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
1964 if st.isPushed() {
1965 sc.curPushedStreams++
1966 } else {
1967 sc.curClientStreams++
1968 }
1969 if sc.curOpenStreams() == 1 {
1970 sc.setConnState(http.StateActive)
1971 }
1972
1973 return st
1974}
1975
1976func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
1977 sc.serveG.check()
1978
1979 rp := requestParam{
1980 method: f.PseudoValue("method"),
1981 scheme: f.PseudoValue("scheme"),
1982 authority: f.PseudoValue("authority"),
1983 path: f.PseudoValue("path"),
1984 }
1985
1986 isConnect := rp.method == "CONNECT"
1987 if isConnect {
1988 if rp.path != "" || rp.scheme != "" || rp.authority == "" {
1989 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
1990 }
1991 } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
1992 // See 8.1.2.6 Malformed Requests and Responses:
1993 //
1994 // Malformed requests or responses that are detected
1995 // MUST be treated as a stream error (Section 5.4.2)
1996 // of type PROTOCOL_ERROR."
1997 //
1998 // 8.1.2.3 Request Pseudo-Header Fields
1999 // "All HTTP/2 requests MUST include exactly one valid
2000 // value for the :method, :scheme, and :path
2001 // pseudo-header fields"
2002 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
2003 }
2004
2005 bodyOpen := !f.StreamEnded()
2006 if rp.method == "HEAD" && bodyOpen {
2007 // HEAD requests can't have bodies
2008 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
2009 }
2010
2011 rp.header = make(http.Header)
2012 for _, hf := range f.RegularFields() {
2013 rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value)
2014 }
2015 if rp.authority == "" {
2016 rp.authority = rp.header.Get("Host")
2017 }
2018
2019 rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
2020 if err != nil {
2021 return nil, nil, err
2022 }
2023 if bodyOpen {
2024 if vv, ok := rp.header["Content-Length"]; ok {
2025 req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
2026 } else {
2027 req.ContentLength = -1
2028 }
2029 req.Body.(*requestBody).pipe = &pipe{
2030 b: &dataBuffer{expected: req.ContentLength},
2031 }
2032 }
2033 return rw, req, nil
2034}
2035
2036type requestParam struct {
2037 method string
2038 scheme, authority, path string
2039 header http.Header
2040}
2041
2042func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) {
2043 sc.serveG.check()
2044
2045 var tlsState *tls.ConnectionState // nil if not scheme https
2046 if rp.scheme == "https" {
2047 tlsState = sc.tlsState
2048 }
2049
2050 needsContinue := rp.header.Get("Expect") == "100-continue"
2051 if needsContinue {
2052 rp.header.Del("Expect")
2053 }
2054 // Merge Cookie headers into one "; "-delimited value.
2055 if cookies := rp.header["Cookie"]; len(cookies) > 1 {
2056 rp.header.Set("Cookie", strings.Join(cookies, "; "))
2057 }
2058
2059 // Setup Trailers
2060 var trailer http.Header
2061 for _, v := range rp.header["Trailer"] {
2062 for _, key := range strings.Split(v, ",") {
2063 key = http.CanonicalHeaderKey(strings.TrimSpace(key))
2064 switch key {
2065 case "Transfer-Encoding", "Trailer", "Content-Length":
2066 // Bogus. (copy of http1 rules)
2067 // Ignore.
2068 default:
2069 if trailer == nil {
2070 trailer = make(http.Header)
2071 }
2072 trailer[key] = nil
2073 }
2074 }
2075 }
2076 delete(rp.header, "Trailer")
2077
2078 var url_ *url.URL
2079 var requestURI string
2080 if rp.method == "CONNECT" {
2081 url_ = &url.URL{Host: rp.authority}
2082 requestURI = rp.authority // mimic HTTP/1 server behavior
2083 } else {
2084 var err error
2085 url_, err = url.ParseRequestURI(rp.path)
2086 if err != nil {
2087 return nil, nil, streamError(st.id, ErrCodeProtocol)
2088 }
2089 requestURI = rp.path
2090 }
2091
2092 body := &requestBody{
2093 conn: sc,
2094 stream: st,
2095 needsContinue: needsContinue,
2096 }
2097 req := &http.Request{
2098 Method: rp.method,
2099 URL: url_,
2100 RemoteAddr: sc.remoteAddrStr,
2101 Header: rp.header,
2102 RequestURI: requestURI,
2103 Proto: "HTTP/2.0",
2104 ProtoMajor: 2,
2105 ProtoMinor: 0,
2106 TLS: tlsState,
2107 Host: rp.authority,
2108 Body: body,
2109 Trailer: trailer,
2110 }
2111 req = req.WithContext(st.ctx)
2112
2113 rws := responseWriterStatePool.Get().(*responseWriterState)
2114 bwSave := rws.bw
2115 *rws = responseWriterState{} // zero all the fields
2116 rws.conn = sc
2117 rws.bw = bwSave
2118 rws.bw.Reset(chunkWriter{rws})
2119 rws.stream = st
2120 rws.req = req
2121 rws.body = body
2122
2123 rw := &responseWriter{rws: rws}
2124 return rw, req, nil
2125}
2126
2127// Run on its own goroutine.
2128func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
2129 didPanic := true
2130 defer func() {
2131 rw.rws.stream.cancelCtx()
2132 if didPanic {
2133 e := recover()
2134 sc.writeFrameFromHandler(FrameWriteRequest{
2135 write: handlerPanicRST{rw.rws.stream.id},
2136 stream: rw.rws.stream,
2137 })
2138 // Same as net/http:
2139 if e != nil && e != http.ErrAbortHandler {
2140 const size = 64 << 10
2141 buf := make([]byte, size)
2142 buf = buf[:runtime.Stack(buf, false)]
2143 sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
2144 }
2145 return
2146 }
2147 rw.handlerDone()
2148 }()
2149 handler(rw, req)
2150 didPanic = false
2151}
2152
2153func handleHeaderListTooLong(w http.ResponseWriter, r *http.Request) {
2154 // 10.5.1 Limits on Header Block Size:
2155 // .. "A server that receives a larger header block than it is
2156 // willing to handle can send an HTTP 431 (Request Header Fields Too
2157 // Large) status code"
2158 const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+
2159 w.WriteHeader(statusRequestHeaderFieldsTooLarge)
2160 io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>")
2161}
2162
2163// called from handler goroutines.
2164// h may be nil.
2165func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error {
2166 sc.serveG.checkNotOn() // NOT on
2167 var errc chan error
2168 if headerData.h != nil {
2169 // If there's a header map (which we don't own), so we have to block on
2170 // waiting for this frame to be written, so an http.Flush mid-handler
2171 // writes out the correct value of keys, before a handler later potentially
2172 // mutates it.
2173 errc = errChanPool.Get().(chan error)
2174 }
2175 if err := sc.writeFrameFromHandler(FrameWriteRequest{
2176 write: headerData,
2177 stream: st,
2178 done: errc,
2179 }); err != nil {
2180 return err
2181 }
2182 if errc != nil {
2183 select {
2184 case err := <-errc:
2185 errChanPool.Put(errc)
2186 return err
2187 case <-sc.doneServing:
2188 return errClientDisconnected
2189 case <-st.cw:
2190 return errStreamClosed
2191 }
2192 }
2193 return nil
2194}
2195
2196// called from handler goroutines.
2197func (sc *serverConn) write100ContinueHeaders(st *stream) {
2198 sc.writeFrameFromHandler(FrameWriteRequest{
2199 write: write100ContinueHeadersFrame{st.id},
2200 stream: st,
2201 })
2202}
2203
2204// A bodyReadMsg tells the server loop that the http.Handler read n
2205// bytes of the DATA from the client on the given stream.
2206type bodyReadMsg struct {
2207 st *stream
2208 n int
2209}
2210
2211// called from handler goroutines.
2212// Notes that the handler for the given stream ID read n bytes of its body
2213// and schedules flow control tokens to be sent.
2214func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
2215 sc.serveG.checkNotOn() // NOT on
2216 if n > 0 {
2217 select {
2218 case sc.bodyReadCh <- bodyReadMsg{st, n}:
2219 case <-sc.doneServing:
2220 }
2221 }
2222}
2223
2224func (sc *serverConn) noteBodyRead(st *stream, n int) {
2225 sc.serveG.check()
2226 sc.sendWindowUpdate(nil, n) // conn-level
2227 if st.state != stateHalfClosedRemote && st.state != stateClosed {
2228 // Don't send this WINDOW_UPDATE if the stream is closed
2229 // remotely.
2230 sc.sendWindowUpdate(st, n)
2231 }
2232}
2233
2234// st may be nil for conn-level
2235func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
2236 sc.serveG.check()
2237 // "The legal range for the increment to the flow control
2238 // window is 1 to 2^31-1 (2,147,483,647) octets."
2239 // A Go Read call on 64-bit machines could in theory read
2240 // a larger Read than this. Very unlikely, but we handle it here
2241 // rather than elsewhere for now.
2242 const maxUint31 = 1<<31 - 1
2243 for n >= maxUint31 {
2244 sc.sendWindowUpdate32(st, maxUint31)
2245 n -= maxUint31
2246 }
2247 sc.sendWindowUpdate32(st, int32(n))
2248}
2249
2250// st may be nil for conn-level
2251func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
2252 sc.serveG.check()
2253 if n == 0 {
2254 return
2255 }
2256 if n < 0 {
2257 panic("negative update")
2258 }
2259 var streamID uint32
2260 if st != nil {
2261 streamID = st.id
2262 }
2263 sc.writeFrame(FrameWriteRequest{
2264 write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
2265 stream: st,
2266 })
2267 var ok bool
2268 if st == nil {
2269 ok = sc.inflow.add(n)
2270 } else {
2271 ok = st.inflow.add(n)
2272 }
2273 if !ok {
2274 panic("internal error; sent too many window updates without decrements?")
2275 }
2276}
2277
2278// requestBody is the Handler's Request.Body type.
2279// Read and Close may be called concurrently.
2280type requestBody struct {
2281 stream *stream
2282 conn *serverConn
2283 closed bool // for use by Close only
2284 sawEOF bool // for use by Read only
2285 pipe *pipe // non-nil if we have a HTTP entity message body
2286 needsContinue bool // need to send a 100-continue
2287}
2288
2289func (b *requestBody) Close() error {
2290 if b.pipe != nil && !b.closed {
2291 b.pipe.BreakWithError(errClosedBody)
2292 }
2293 b.closed = true
2294 return nil
2295}
2296
2297func (b *requestBody) Read(p []byte) (n int, err error) {
2298 if b.needsContinue {
2299 b.needsContinue = false
2300 b.conn.write100ContinueHeaders(b.stream)
2301 }
2302 if b.pipe == nil || b.sawEOF {
2303 return 0, io.EOF
2304 }
2305 n, err = b.pipe.Read(p)
2306 if err == io.EOF {
2307 b.sawEOF = true
2308 }
2309 if b.conn == nil && inTests {
2310 return
2311 }
2312 b.conn.noteBodyReadFromHandler(b.stream, n, err)
2313 return
2314}
2315
2316// responseWriter is the http.ResponseWriter implementation. It's
2317// intentionally small (1 pointer wide) to minimize garbage. The
2318// responseWriterState pointer inside is zeroed at the end of a
2319// request (in handlerDone) and calls on the responseWriter thereafter
2320// simply crash (caller's mistake), but the much larger responseWriterState
2321// and buffers are reused between multiple requests.
2322type responseWriter struct {
2323 rws *responseWriterState
2324}
2325
2326// Optional http.ResponseWriter interfaces implemented.
2327var (
2328 _ http.CloseNotifier = (*responseWriter)(nil)
2329 _ http.Flusher = (*responseWriter)(nil)
2330 _ stringWriter = (*responseWriter)(nil)
2331)
2332
2333type responseWriterState struct {
2334 // immutable within a request:
2335 stream *stream
2336 req *http.Request
2337 body *requestBody // to close at end of request, if DATA frames didn't
2338 conn *serverConn
2339
2340 // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
2341 bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState}
2342
2343 // mutated by http.Handler goroutine:
2344 handlerHeader http.Header // nil until called
2345 snapHeader http.Header // snapshot of handlerHeader at WriteHeader time
2346 trailers []string // set in writeChunk
2347 status int // status code passed to WriteHeader
2348 wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
2349 sentHeader bool // have we sent the header frame?
2350 handlerDone bool // handler has finished
2351 dirty bool // a Write failed; don't reuse this responseWriterState
2352
2353 sentContentLen int64 // non-zero if handler set a Content-Length header
2354 wroteBytes int64
2355
2356 closeNotifierMu sync.Mutex // guards closeNotifierCh
2357 closeNotifierCh chan bool // nil until first used
2358}
2359
2360type chunkWriter struct{ rws *responseWriterState }
2361
2362func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) }
2363
Scott Baker8487c5d2019-10-18 12:49:46 -07002364func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 }
2365
2366func (rws *responseWriterState) hasNonemptyTrailers() bool {
2367 for _, trailer := range rws.trailers {
2368 if _, ok := rws.handlerHeader[trailer]; ok {
2369 return true
2370 }
2371 }
2372 return false
2373}
Scott Baker2d897982019-09-24 11:50:08 -07002374
2375// declareTrailer is called for each Trailer header when the
2376// response header is written. It notes that a header will need to be
2377// written in the trailers at the end of the response.
2378func (rws *responseWriterState) declareTrailer(k string) {
2379 k = http.CanonicalHeaderKey(k)
2380 if !httpguts.ValidTrailerHeader(k) {
2381 // Forbidden by RFC 7230, section 4.1.2.
2382 rws.conn.logf("ignoring invalid trailer %q", k)
2383 return
2384 }
2385 if !strSliceContains(rws.trailers, k) {
2386 rws.trailers = append(rws.trailers, k)
2387 }
2388}
2389
2390// writeChunk writes chunks from the bufio.Writer. But because
2391// bufio.Writer may bypass its chunking, sometimes p may be
2392// arbitrarily large.
2393//
2394// writeChunk is also responsible (on the first chunk) for sending the
2395// HEADER response.
2396func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
2397 if !rws.wroteHeader {
2398 rws.writeHeader(200)
2399 }
2400
2401 isHeadResp := rws.req.Method == "HEAD"
2402 if !rws.sentHeader {
2403 rws.sentHeader = true
2404 var ctype, clen string
2405 if clen = rws.snapHeader.Get("Content-Length"); clen != "" {
2406 rws.snapHeader.Del("Content-Length")
2407 clen64, err := strconv.ParseInt(clen, 10, 64)
2408 if err == nil && clen64 >= 0 {
2409 rws.sentContentLen = clen64
2410 } else {
2411 clen = ""
2412 }
2413 }
2414 if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
2415 clen = strconv.Itoa(len(p))
2416 }
2417 _, hasContentType := rws.snapHeader["Content-Type"]
2418 if !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 {
2419 ctype = http.DetectContentType(p)
2420 }
2421 var date string
2422 if _, ok := rws.snapHeader["Date"]; !ok {
2423 // TODO(bradfitz): be faster here, like net/http? measure.
2424 date = time.Now().UTC().Format(http.TimeFormat)
2425 }
2426
2427 for _, v := range rws.snapHeader["Trailer"] {
2428 foreachHeaderElement(v, rws.declareTrailer)
2429 }
2430
2431 // "Connection" headers aren't allowed in HTTP/2 (RFC 7540, 8.1.2.2),
2432 // but respect "Connection" == "close" to mean sending a GOAWAY and tearing
2433 // down the TCP connection when idle, like we do for HTTP/1.
2434 // TODO: remove more Connection-specific header fields here, in addition
2435 // to "Connection".
2436 if _, ok := rws.snapHeader["Connection"]; ok {
2437 v := rws.snapHeader.Get("Connection")
2438 delete(rws.snapHeader, "Connection")
2439 if v == "close" {
2440 rws.conn.startGracefulShutdown()
2441 }
2442 }
2443
2444 endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp
2445 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2446 streamID: rws.stream.id,
2447 httpResCode: rws.status,
2448 h: rws.snapHeader,
2449 endStream: endStream,
2450 contentType: ctype,
2451 contentLength: clen,
2452 date: date,
2453 })
2454 if err != nil {
2455 rws.dirty = true
2456 return 0, err
2457 }
2458 if endStream {
2459 return 0, nil
2460 }
2461 }
2462 if isHeadResp {
2463 return len(p), nil
2464 }
2465 if len(p) == 0 && !rws.handlerDone {
2466 return 0, nil
2467 }
2468
2469 if rws.handlerDone {
2470 rws.promoteUndeclaredTrailers()
2471 }
2472
Scott Baker8487c5d2019-10-18 12:49:46 -07002473 // only send trailers if they have actually been defined by the
2474 // server handler.
2475 hasNonemptyTrailers := rws.hasNonemptyTrailers()
2476 endStream := rws.handlerDone && !hasNonemptyTrailers
Scott Baker2d897982019-09-24 11:50:08 -07002477 if len(p) > 0 || endStream {
2478 // only send a 0 byte DATA frame if we're ending the stream.
2479 if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
2480 rws.dirty = true
2481 return 0, err
2482 }
2483 }
2484
Scott Baker8487c5d2019-10-18 12:49:46 -07002485 if rws.handlerDone && hasNonemptyTrailers {
Scott Baker2d897982019-09-24 11:50:08 -07002486 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2487 streamID: rws.stream.id,
2488 h: rws.handlerHeader,
2489 trailers: rws.trailers,
2490 endStream: true,
2491 })
2492 if err != nil {
2493 rws.dirty = true
2494 }
2495 return len(p), err
2496 }
2497 return len(p), nil
2498}
2499
2500// TrailerPrefix is a magic prefix for ResponseWriter.Header map keys
2501// that, if present, signals that the map entry is actually for
2502// the response trailers, and not the response headers. The prefix
2503// is stripped after the ServeHTTP call finishes and the values are
2504// sent in the trailers.
2505//
2506// This mechanism is intended only for trailers that are not known
2507// prior to the headers being written. If the set of trailers is fixed
2508// or known before the header is written, the normal Go trailers mechanism
2509// is preferred:
2510// https://golang.org/pkg/net/http/#ResponseWriter
2511// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
2512const TrailerPrefix = "Trailer:"
2513
2514// promoteUndeclaredTrailers permits http.Handlers to set trailers
2515// after the header has already been flushed. Because the Go
2516// ResponseWriter interface has no way to set Trailers (only the
2517// Header), and because we didn't want to expand the ResponseWriter
2518// interface, and because nobody used trailers, and because RFC 7230
2519// says you SHOULD (but not must) predeclare any trailers in the
2520// header, the official ResponseWriter rules said trailers in Go must
2521// be predeclared, and then we reuse the same ResponseWriter.Header()
2522// map to mean both Headers and Trailers. When it's time to write the
2523// Trailers, we pick out the fields of Headers that were declared as
2524// trailers. That worked for a while, until we found the first major
2525// user of Trailers in the wild: gRPC (using them only over http2),
2526// and gRPC libraries permit setting trailers mid-stream without
Scott Baker8487c5d2019-10-18 12:49:46 -07002527// predeclaring them. So: change of plans. We still permit the old
Scott Baker2d897982019-09-24 11:50:08 -07002528// way, but we also permit this hack: if a Header() key begins with
2529// "Trailer:", the suffix of that key is a Trailer. Because ':' is an
2530// invalid token byte anyway, there is no ambiguity. (And it's already
2531// filtered out) It's mildly hacky, but not terrible.
2532//
2533// This method runs after the Handler is done and promotes any Header
2534// fields to be trailers.
2535func (rws *responseWriterState) promoteUndeclaredTrailers() {
2536 for k, vv := range rws.handlerHeader {
2537 if !strings.HasPrefix(k, TrailerPrefix) {
2538 continue
2539 }
2540 trailerKey := strings.TrimPrefix(k, TrailerPrefix)
2541 rws.declareTrailer(trailerKey)
2542 rws.handlerHeader[http.CanonicalHeaderKey(trailerKey)] = vv
2543 }
2544
2545 if len(rws.trailers) > 1 {
2546 sorter := sorterPool.Get().(*sorter)
2547 sorter.SortStrings(rws.trailers)
2548 sorterPool.Put(sorter)
2549 }
2550}
2551
2552func (w *responseWriter) Flush() {
2553 rws := w.rws
2554 if rws == nil {
2555 panic("Header called after Handler finished")
2556 }
2557 if rws.bw.Buffered() > 0 {
2558 if err := rws.bw.Flush(); err != nil {
2559 // Ignore the error. The frame writer already knows.
2560 return
2561 }
2562 } else {
2563 // The bufio.Writer won't call chunkWriter.Write
2564 // (writeChunk with zero bytes, so we have to do it
2565 // ourselves to force the HTTP response header and/or
2566 // final DATA frame (with END_STREAM) to be sent.
2567 rws.writeChunk(nil)
2568 }
2569}
2570
2571func (w *responseWriter) CloseNotify() <-chan bool {
2572 rws := w.rws
2573 if rws == nil {
2574 panic("CloseNotify called after Handler finished")
2575 }
2576 rws.closeNotifierMu.Lock()
2577 ch := rws.closeNotifierCh
2578 if ch == nil {
2579 ch = make(chan bool, 1)
2580 rws.closeNotifierCh = ch
2581 cw := rws.stream.cw
2582 go func() {
2583 cw.Wait() // wait for close
2584 ch <- true
2585 }()
2586 }
2587 rws.closeNotifierMu.Unlock()
2588 return ch
2589}
2590
2591func (w *responseWriter) Header() http.Header {
2592 rws := w.rws
2593 if rws == nil {
2594 panic("Header called after Handler finished")
2595 }
2596 if rws.handlerHeader == nil {
2597 rws.handlerHeader = make(http.Header)
2598 }
2599 return rws.handlerHeader
2600}
2601
2602// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode.
2603func checkWriteHeaderCode(code int) {
2604 // Issue 22880: require valid WriteHeader status codes.
2605 // For now we only enforce that it's three digits.
2606 // In the future we might block things over 599 (600 and above aren't defined
2607 // at http://httpwg.org/specs/rfc7231.html#status.codes)
2608 // and we might block under 200 (once we have more mature 1xx support).
2609 // But for now any three digits.
2610 //
2611 // We used to send "HTTP/1.1 000 0" on the wire in responses but there's
2612 // no equivalent bogus thing we can realistically send in HTTP/2,
2613 // so we'll consistently panic instead and help people find their bugs
2614 // early. (We can't return an error from WriteHeader even if we wanted to.)
2615 if code < 100 || code > 999 {
2616 panic(fmt.Sprintf("invalid WriteHeader code %v", code))
2617 }
2618}
2619
2620func (w *responseWriter) WriteHeader(code int) {
2621 rws := w.rws
2622 if rws == nil {
2623 panic("WriteHeader called after Handler finished")
2624 }
2625 rws.writeHeader(code)
2626}
2627
2628func (rws *responseWriterState) writeHeader(code int) {
2629 if !rws.wroteHeader {
2630 checkWriteHeaderCode(code)
2631 rws.wroteHeader = true
2632 rws.status = code
2633 if len(rws.handlerHeader) > 0 {
2634 rws.snapHeader = cloneHeader(rws.handlerHeader)
2635 }
2636 }
2637}
2638
2639func cloneHeader(h http.Header) http.Header {
2640 h2 := make(http.Header, len(h))
2641 for k, vv := range h {
2642 vv2 := make([]string, len(vv))
2643 copy(vv2, vv)
2644 h2[k] = vv2
2645 }
2646 return h2
2647}
2648
2649// The Life Of A Write is like this:
2650//
2651// * Handler calls w.Write or w.WriteString ->
2652// * -> rws.bw (*bufio.Writer) ->
2653// * (Handler might call Flush)
2654// * -> chunkWriter{rws}
2655// * -> responseWriterState.writeChunk(p []byte)
2656// * -> responseWriterState.writeChunk (most of the magic; see comment there)
2657func (w *responseWriter) Write(p []byte) (n int, err error) {
2658 return w.write(len(p), p, "")
2659}
2660
2661func (w *responseWriter) WriteString(s string) (n int, err error) {
2662 return w.write(len(s), nil, s)
2663}
2664
2665// either dataB or dataS is non-zero.
2666func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) {
2667 rws := w.rws
2668 if rws == nil {
2669 panic("Write called after Handler finished")
2670 }
2671 if !rws.wroteHeader {
2672 w.WriteHeader(200)
2673 }
2674 if !bodyAllowedForStatus(rws.status) {
2675 return 0, http.ErrBodyNotAllowed
2676 }
2677 rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set
2678 if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen {
2679 // TODO: send a RST_STREAM
2680 return 0, errors.New("http2: handler wrote more than declared Content-Length")
2681 }
2682
2683 if dataB != nil {
2684 return rws.bw.Write(dataB)
2685 } else {
2686 return rws.bw.WriteString(dataS)
2687 }
2688}
2689
2690func (w *responseWriter) handlerDone() {
2691 rws := w.rws
2692 dirty := rws.dirty
2693 rws.handlerDone = true
2694 w.Flush()
2695 w.rws = nil
2696 if !dirty {
2697 // Only recycle the pool if all prior Write calls to
2698 // the serverConn goroutine completed successfully. If
2699 // they returned earlier due to resets from the peer
2700 // there might still be write goroutines outstanding
2701 // from the serverConn referencing the rws memory. See
2702 // issue 20704.
2703 responseWriterStatePool.Put(rws)
2704 }
2705}
2706
2707// Push errors.
2708var (
2709 ErrRecursivePush = errors.New("http2: recursive push not allowed")
2710 ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
2711)
2712
2713var _ http.Pusher = (*responseWriter)(nil)
2714
2715func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
2716 st := w.rws.stream
2717 sc := st.sc
2718 sc.serveG.checkNotOn()
2719
2720 // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
2721 // http://tools.ietf.org/html/rfc7540#section-6.6
2722 if st.isPushed() {
2723 return ErrRecursivePush
2724 }
2725
2726 if opts == nil {
2727 opts = new(http.PushOptions)
2728 }
2729
2730 // Default options.
2731 if opts.Method == "" {
2732 opts.Method = "GET"
2733 }
2734 if opts.Header == nil {
2735 opts.Header = http.Header{}
2736 }
2737 wantScheme := "http"
2738 if w.rws.req.TLS != nil {
2739 wantScheme = "https"
2740 }
2741
2742 // Validate the request.
2743 u, err := url.Parse(target)
2744 if err != nil {
2745 return err
2746 }
2747 if u.Scheme == "" {
2748 if !strings.HasPrefix(target, "/") {
2749 return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
2750 }
2751 u.Scheme = wantScheme
2752 u.Host = w.rws.req.Host
2753 } else {
2754 if u.Scheme != wantScheme {
2755 return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
2756 }
2757 if u.Host == "" {
2758 return errors.New("URL must have a host")
2759 }
2760 }
2761 for k := range opts.Header {
2762 if strings.HasPrefix(k, ":") {
2763 return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
2764 }
2765 // These headers are meaningful only if the request has a body,
2766 // but PUSH_PROMISE requests cannot have a body.
2767 // http://tools.ietf.org/html/rfc7540#section-8.2
2768 // Also disallow Host, since the promised URL must be absolute.
2769 switch strings.ToLower(k) {
2770 case "content-length", "content-encoding", "trailer", "te", "expect", "host":
2771 return fmt.Errorf("promised request headers cannot include %q", k)
2772 }
2773 }
2774 if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil {
2775 return err
2776 }
2777
2778 // The RFC effectively limits promised requests to GET and HEAD:
2779 // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
2780 // http://tools.ietf.org/html/rfc7540#section-8.2
2781 if opts.Method != "GET" && opts.Method != "HEAD" {
2782 return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
2783 }
2784
2785 msg := &startPushRequest{
2786 parent: st,
2787 method: opts.Method,
2788 url: u,
2789 header: cloneHeader(opts.Header),
2790 done: errChanPool.Get().(chan error),
2791 }
2792
2793 select {
2794 case <-sc.doneServing:
2795 return errClientDisconnected
2796 case <-st.cw:
2797 return errStreamClosed
2798 case sc.serveMsgCh <- msg:
2799 }
2800
2801 select {
2802 case <-sc.doneServing:
2803 return errClientDisconnected
2804 case <-st.cw:
2805 return errStreamClosed
2806 case err := <-msg.done:
2807 errChanPool.Put(msg.done)
2808 return err
2809 }
2810}
2811
2812type startPushRequest struct {
2813 parent *stream
2814 method string
2815 url *url.URL
2816 header http.Header
2817 done chan error
2818}
2819
2820func (sc *serverConn) startPush(msg *startPushRequest) {
2821 sc.serveG.check()
2822
2823 // http://tools.ietf.org/html/rfc7540#section-6.6.
2824 // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
2825 // is in either the "open" or "half-closed (remote)" state.
2826 if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
Scott Baker8487c5d2019-10-18 12:49:46 -07002827 // responseWriter.Push checks that the stream is peer-initiated.
Scott Baker2d897982019-09-24 11:50:08 -07002828 msg.done <- errStreamClosed
2829 return
2830 }
2831
2832 // http://tools.ietf.org/html/rfc7540#section-6.6.
2833 if !sc.pushEnabled {
2834 msg.done <- http.ErrNotSupported
2835 return
2836 }
2837
2838 // PUSH_PROMISE frames must be sent in increasing order by stream ID, so
2839 // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
2840 // is written. Once the ID is allocated, we start the request handler.
2841 allocatePromisedID := func() (uint32, error) {
2842 sc.serveG.check()
2843
2844 // Check this again, just in case. Technically, we might have received
2845 // an updated SETTINGS by the time we got around to writing this frame.
2846 if !sc.pushEnabled {
2847 return 0, http.ErrNotSupported
2848 }
2849 // http://tools.ietf.org/html/rfc7540#section-6.5.2.
2850 if sc.curPushedStreams+1 > sc.clientMaxStreams {
2851 return 0, ErrPushLimitReached
2852 }
2853
2854 // http://tools.ietf.org/html/rfc7540#section-5.1.1.
2855 // Streams initiated by the server MUST use even-numbered identifiers.
2856 // A server that is unable to establish a new stream identifier can send a GOAWAY
2857 // frame so that the client is forced to open a new connection for new streams.
2858 if sc.maxPushPromiseID+2 >= 1<<31 {
2859 sc.startGracefulShutdownInternal()
2860 return 0, ErrPushLimitReached
2861 }
2862 sc.maxPushPromiseID += 2
2863 promisedID := sc.maxPushPromiseID
2864
2865 // http://tools.ietf.org/html/rfc7540#section-8.2.
2866 // Strictly speaking, the new stream should start in "reserved (local)", then
2867 // transition to "half closed (remote)" after sending the initial HEADERS, but
2868 // we start in "half closed (remote)" for simplicity.
2869 // See further comments at the definition of stateHalfClosedRemote.
2870 promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote)
2871 rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{
2872 method: msg.method,
2873 scheme: msg.url.Scheme,
2874 authority: msg.url.Host,
2875 path: msg.url.RequestURI(),
2876 header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
2877 })
2878 if err != nil {
2879 // Should not happen, since we've already validated msg.url.
2880 panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
2881 }
2882
2883 go sc.runHandler(rw, req, sc.handler.ServeHTTP)
2884 return promisedID, nil
2885 }
2886
2887 sc.writeFrame(FrameWriteRequest{
2888 write: &writePushPromise{
2889 streamID: msg.parent.id,
2890 method: msg.method,
2891 url: msg.url,
2892 h: msg.header,
2893 allocatePromisedID: allocatePromisedID,
2894 },
2895 stream: msg.parent,
2896 done: msg.done,
2897 })
2898}
2899
2900// foreachHeaderElement splits v according to the "#rule" construction
2901// in RFC 7230 section 7 and calls fn for each non-empty element.
2902func foreachHeaderElement(v string, fn func(string)) {
2903 v = textproto.TrimString(v)
2904 if v == "" {
2905 return
2906 }
2907 if !strings.Contains(v, ",") {
2908 fn(v)
2909 return
2910 }
2911 for _, f := range strings.Split(v, ",") {
2912 if f = textproto.TrimString(f); f != "" {
2913 fn(f)
2914 }
2915 }
2916}
2917
2918// From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2
2919var connHeaders = []string{
2920 "Connection",
2921 "Keep-Alive",
2922 "Proxy-Connection",
2923 "Transfer-Encoding",
2924 "Upgrade",
2925}
2926
2927// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request,
2928// per RFC 7540 Section 8.1.2.2.
2929// The returned error is reported to users.
2930func checkValidHTTP2RequestHeaders(h http.Header) error {
2931 for _, k := range connHeaders {
2932 if _, ok := h[k]; ok {
2933 return fmt.Errorf("request header %q is not valid in HTTP/2", k)
2934 }
2935 }
2936 te := h["Te"]
2937 if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) {
2938 return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
2939 }
2940 return nil
2941}
2942
2943func new400Handler(err error) http.HandlerFunc {
2944 return func(w http.ResponseWriter, r *http.Request) {
2945 http.Error(w, err.Error(), http.StatusBadRequest)
2946 }
2947}
2948
2949// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
2950// disabled. See comments on h1ServerShutdownChan above for why
2951// the code is written this way.
2952func h1ServerKeepAlivesDisabled(hs *http.Server) bool {
2953 var x interface{} = hs
2954 type I interface {
2955 doKeepAlives() bool
2956 }
2957 if hs, ok := x.(I); ok {
2958 return !hs.doKeepAlives()
2959 }
2960 return false
2961}