blob: 2aa859f76f9f3b5960a21bb6a88fdd165966c56e [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// TODO: turn off the serve goroutine when idle, so
6// an idle conn only has the readFrames goroutine active. (which could
7// also be optimized probably to pin less memory in crypto/tls). This
8// would involve tracking when the serve goroutine is active (atomic
9// int32 read/CAS probably?) and starting it up when frames arrive,
10// and shutting it down when all handlers exit. the occasional PING
11// packets could use time.AfterFunc to call sc.wakeStartServeLoop()
12// (which is a no-op if already running) and then queue the PING write
13// as normal. The serve loop would then exit in most cases (if no
14// Handlers running) and not be woken up again until the PING packet
15// returns.
16
17// TODO (maybe): add a mechanism for Handlers to going into
18// half-closed-local mode (rw.(io.Closer) test?) but not exit their
19// handler, and continue to be able to read from the
20// Request.Body. This would be a somewhat semantic change from HTTP/1
21// (or at least what we expose in net/http), so I'd probably want to
22// add it there too. For now, this package says that returning from
23// the Handler ServeHTTP function means you're both done reading and
24// done writing, without a way to stop just one or the other.
25
26package http2
27
28import (
29 "bufio"
30 "bytes"
31 "context"
32 "crypto/tls"
33 "errors"
34 "fmt"
35 "io"
36 "log"
37 "math"
38 "net"
39 "net/http"
40 "net/textproto"
41 "net/url"
42 "os"
43 "reflect"
44 "runtime"
45 "strconv"
46 "strings"
47 "sync"
48 "time"
49
50 "golang.org/x/net/http/httpguts"
51 "golang.org/x/net/http2/hpack"
52)
53
54const (
55 prefaceTimeout = 10 * time.Second
56 firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
57 handlerChunkWriteSize = 4 << 10
58 defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
59 maxQueuedControlFrames = 10000
60)
61
62var (
63 errClientDisconnected = errors.New("client disconnected")
64 errClosedBody = errors.New("body closed by handler")
65 errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
66 errStreamClosed = errors.New("http2: stream closed")
67)
68
69var responseWriterStatePool = sync.Pool{
70 New: func() interface{} {
71 rws := &responseWriterState{}
72 rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
73 return rws
74 },
75}
76
77// Test hooks.
78var (
79 testHookOnConn func()
80 testHookGetServerConn func(*serverConn)
81 testHookOnPanicMu *sync.Mutex // nil except in tests
82 testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool)
83)
84
85// Server is an HTTP/2 server.
86type Server struct {
87 // MaxHandlers limits the number of http.Handler ServeHTTP goroutines
88 // which may run at a time over all connections.
89 // Negative or zero no limit.
90 // TODO: implement
91 MaxHandlers int
92
93 // MaxConcurrentStreams optionally specifies the number of
94 // concurrent streams that each client may have open at a
95 // time. This is unrelated to the number of http.Handler goroutines
96 // which may be active globally, which is MaxHandlers.
97 // If zero, MaxConcurrentStreams defaults to at least 100, per
98 // the HTTP/2 spec's recommendations.
99 MaxConcurrentStreams uint32
100
101 // MaxReadFrameSize optionally specifies the largest frame
102 // this server is willing to read. A valid value is between
103 // 16k and 16M, inclusive. If zero or otherwise invalid, a
104 // default value is used.
105 MaxReadFrameSize uint32
106
107 // PermitProhibitedCipherSuites, if true, permits the use of
108 // cipher suites prohibited by the HTTP/2 spec.
109 PermitProhibitedCipherSuites bool
110
111 // IdleTimeout specifies how long until idle clients should be
112 // closed with a GOAWAY frame. PING frames are not considered
113 // activity for the purposes of IdleTimeout.
114 IdleTimeout time.Duration
115
116 // MaxUploadBufferPerConnection is the size of the initial flow
117 // control window for each connections. The HTTP/2 spec does not
118 // allow this to be smaller than 65535 or larger than 2^32-1.
119 // If the value is outside this range, a default value will be
120 // used instead.
121 MaxUploadBufferPerConnection int32
122
123 // MaxUploadBufferPerStream is the size of the initial flow control
124 // window for each stream. The HTTP/2 spec does not allow this to
125 // be larger than 2^32-1. If the value is zero or larger than the
126 // maximum, a default value will be used instead.
127 MaxUploadBufferPerStream int32
128
129 // NewWriteScheduler constructs a write scheduler for a connection.
130 // If nil, a default scheduler is chosen.
131 NewWriteScheduler func() WriteScheduler
132
133 // Internal state. This is a pointer (rather than embedded directly)
134 // so that we don't embed a Mutex in this struct, which will make the
135 // struct non-copyable, which might break some callers.
136 state *serverInternalState
137}
138
139func (s *Server) initialConnRecvWindowSize() int32 {
140 if s.MaxUploadBufferPerConnection > initialWindowSize {
141 return s.MaxUploadBufferPerConnection
142 }
143 return 1 << 20
144}
145
146func (s *Server) initialStreamRecvWindowSize() int32 {
147 if s.MaxUploadBufferPerStream > 0 {
148 return s.MaxUploadBufferPerStream
149 }
150 return 1 << 20
151}
152
153func (s *Server) maxReadFrameSize() uint32 {
154 if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
155 return v
156 }
157 return defaultMaxReadFrameSize
158}
159
160func (s *Server) maxConcurrentStreams() uint32 {
161 if v := s.MaxConcurrentStreams; v > 0 {
162 return v
163 }
164 return defaultMaxStreams
165}
166
167// maxQueuedControlFrames is the maximum number of control frames like
168// SETTINGS, PING and RST_STREAM that will be queued for writing before
169// the connection is closed to prevent memory exhaustion attacks.
170func (s *Server) maxQueuedControlFrames() int {
171 // TODO: if anybody asks, add a Server field, and remember to define the
172 // behavior of negative values.
173 return maxQueuedControlFrames
174}
175
176type serverInternalState struct {
177 mu sync.Mutex
178 activeConns map[*serverConn]struct{}
179}
180
181func (s *serverInternalState) registerConn(sc *serverConn) {
182 if s == nil {
183 return // if the Server was used without calling ConfigureServer
184 }
185 s.mu.Lock()
186 s.activeConns[sc] = struct{}{}
187 s.mu.Unlock()
188}
189
190func (s *serverInternalState) unregisterConn(sc *serverConn) {
191 if s == nil {
192 return // if the Server was used without calling ConfigureServer
193 }
194 s.mu.Lock()
195 delete(s.activeConns, sc)
196 s.mu.Unlock()
197}
198
199func (s *serverInternalState) startGracefulShutdown() {
200 if s == nil {
201 return // if the Server was used without calling ConfigureServer
202 }
203 s.mu.Lock()
204 for sc := range s.activeConns {
205 sc.startGracefulShutdown()
206 }
207 s.mu.Unlock()
208}
209
210// ConfigureServer adds HTTP/2 support to a net/http Server.
211//
212// The configuration conf may be nil.
213//
214// ConfigureServer must be called before s begins serving.
215func ConfigureServer(s *http.Server, conf *Server) error {
216 if s == nil {
217 panic("nil *http.Server")
218 }
219 if conf == nil {
220 conf = new(Server)
221 }
222 conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
223 if h1, h2 := s, conf; h2.IdleTimeout == 0 {
224 if h1.IdleTimeout != 0 {
225 h2.IdleTimeout = h1.IdleTimeout
226 } else {
227 h2.IdleTimeout = h1.ReadTimeout
228 }
229 }
230 s.RegisterOnShutdown(conf.state.startGracefulShutdown)
231
232 if s.TLSConfig == nil {
233 s.TLSConfig = new(tls.Config)
234 } else if s.TLSConfig.CipherSuites != nil {
235 // If they already provided a CipherSuite list, return
236 // an error if it has a bad order or is missing
237 // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
238 haveRequired := false
239 sawBad := false
240 for i, cs := range s.TLSConfig.CipherSuites {
241 switch cs {
242 case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
243 // Alternative MTI cipher to not discourage ECDSA-only servers.
244 // See http://golang.org/cl/30721 for further information.
245 tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
246 haveRequired = true
247 }
248 if isBadCipher(cs) {
249 sawBad = true
250 } else if sawBad {
251 return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", i, cs)
252 }
253 }
254 if !haveRequired {
255 return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher (need at least one of TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 or TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256).")
256 }
257 }
258
259 // Note: not setting MinVersion to tls.VersionTLS12,
260 // as we don't want to interfere with HTTP/1.1 traffic
261 // on the user's server. We enforce TLS 1.2 later once
262 // we accept a connection. Ideally this should be done
263 // during next-proto selection, but using TLS <1.2 with
264 // HTTP/2 is still the client's bug.
265
266 s.TLSConfig.PreferServerCipherSuites = true
267
268 haveNPN := false
269 for _, p := range s.TLSConfig.NextProtos {
270 if p == NextProtoTLS {
271 haveNPN = true
272 break
273 }
274 }
275 if !haveNPN {
276 s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS)
277 }
278
279 if s.TLSNextProto == nil {
280 s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
281 }
282 protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) {
283 if testHookOnConn != nil {
284 testHookOnConn()
285 }
286 // The TLSNextProto interface predates contexts, so
287 // the net/http package passes down its per-connection
288 // base context via an exported but unadvertised
289 // method on the Handler. This is for internal
290 // net/http<=>http2 use only.
291 var ctx context.Context
292 type baseContexter interface {
293 BaseContext() context.Context
294 }
295 if bc, ok := h.(baseContexter); ok {
296 ctx = bc.BaseContext()
297 }
298 conf.ServeConn(c, &ServeConnOpts{
299 Context: ctx,
300 Handler: h,
301 BaseConfig: hs,
302 })
303 }
304 s.TLSNextProto[NextProtoTLS] = protoHandler
305 return nil
306}
307
308// ServeConnOpts are options for the Server.ServeConn method.
309type ServeConnOpts struct {
310 // Context is the base context to use.
311 // If nil, context.Background is used.
312 Context context.Context
313
314 // BaseConfig optionally sets the base configuration
315 // for values. If nil, defaults are used.
316 BaseConfig *http.Server
317
318 // Handler specifies which handler to use for processing
319 // requests. If nil, BaseConfig.Handler is used. If BaseConfig
320 // or BaseConfig.Handler is nil, http.DefaultServeMux is used.
321 Handler http.Handler
322}
323
324func (o *ServeConnOpts) context() context.Context {
325 if o != nil && o.Context != nil {
326 return o.Context
327 }
328 return context.Background()
329}
330
331func (o *ServeConnOpts) baseConfig() *http.Server {
332 if o != nil && o.BaseConfig != nil {
333 return o.BaseConfig
334 }
335 return new(http.Server)
336}
337
338func (o *ServeConnOpts) handler() http.Handler {
339 if o != nil {
340 if o.Handler != nil {
341 return o.Handler
342 }
343 if o.BaseConfig != nil && o.BaseConfig.Handler != nil {
344 return o.BaseConfig.Handler
345 }
346 }
347 return http.DefaultServeMux
348}
349
350// ServeConn serves HTTP/2 requests on the provided connection and
351// blocks until the connection is no longer readable.
352//
353// ServeConn starts speaking HTTP/2 assuming that c has not had any
354// reads or writes. It writes its initial settings frame and expects
355// to be able to read the preface and settings frame from the
356// client. If c has a ConnectionState method like a *tls.Conn, the
357// ConnectionState is used to verify the TLS ciphersuite and to set
358// the Request.TLS field in Handlers.
359//
360// ServeConn does not support h2c by itself. Any h2c support must be
361// implemented in terms of providing a suitably-behaving net.Conn.
362//
363// The opts parameter is optional. If nil, default values are used.
364func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
365 baseCtx, cancel := serverConnBaseContext(c, opts)
366 defer cancel()
367
368 sc := &serverConn{
369 srv: s,
370 hs: opts.baseConfig(),
371 conn: c,
372 baseCtx: baseCtx,
373 remoteAddrStr: c.RemoteAddr().String(),
374 bw: newBufferedWriter(c),
375 handler: opts.handler(),
376 streams: make(map[uint32]*stream),
377 readFrameCh: make(chan readFrameResult),
378 wantWriteFrameCh: make(chan FrameWriteRequest, 8),
379 serveMsgCh: make(chan interface{}, 8),
380 wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
381 bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
382 doneServing: make(chan struct{}),
383 clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
384 advMaxStreams: s.maxConcurrentStreams(),
385 initialStreamSendWindowSize: initialWindowSize,
386 maxFrameSize: initialMaxFrameSize,
387 headerTableSize: initialHeaderTableSize,
388 serveG: newGoroutineLock(),
389 pushEnabled: true,
390 }
391
392 s.state.registerConn(sc)
393 defer s.state.unregisterConn(sc)
394
395 // The net/http package sets the write deadline from the
396 // http.Server.WriteTimeout during the TLS handshake, but then
397 // passes the connection off to us with the deadline already set.
398 // Write deadlines are set per stream in serverConn.newStream.
399 // Disarm the net.Conn write deadline here.
400 if sc.hs.WriteTimeout != 0 {
401 sc.conn.SetWriteDeadline(time.Time{})
402 }
403
404 if s.NewWriteScheduler != nil {
405 sc.writeSched = s.NewWriteScheduler()
406 } else {
407 sc.writeSched = NewRandomWriteScheduler()
408 }
409
410 // These start at the RFC-specified defaults. If there is a higher
411 // configured value for inflow, that will be updated when we send a
412 // WINDOW_UPDATE shortly after sending SETTINGS.
413 sc.flow.add(initialWindowSize)
414 sc.inflow.add(initialWindowSize)
415 sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
416
417 fr := NewFramer(sc.bw, c)
418 fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
419 fr.MaxHeaderListSize = sc.maxHeaderListSize()
420 fr.SetMaxReadFrameSize(s.maxReadFrameSize())
421 sc.framer = fr
422
423 if tc, ok := c.(connectionStater); ok {
424 sc.tlsState = new(tls.ConnectionState)
425 *sc.tlsState = tc.ConnectionState()
426 // 9.2 Use of TLS Features
427 // An implementation of HTTP/2 over TLS MUST use TLS
428 // 1.2 or higher with the restrictions on feature set
429 // and cipher suite described in this section. Due to
430 // implementation limitations, it might not be
431 // possible to fail TLS negotiation. An endpoint MUST
432 // immediately terminate an HTTP/2 connection that
433 // does not meet the TLS requirements described in
434 // this section with a connection error (Section
435 // 5.4.1) of type INADEQUATE_SECURITY.
436 if sc.tlsState.Version < tls.VersionTLS12 {
437 sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low")
438 return
439 }
440
441 if sc.tlsState.ServerName == "" {
442 // Client must use SNI, but we don't enforce that anymore,
443 // since it was causing problems when connecting to bare IP
444 // addresses during development.
445 //
446 // TODO: optionally enforce? Or enforce at the time we receive
447 // a new request, and verify the ServerName matches the :authority?
448 // But that precludes proxy situations, perhaps.
449 //
450 // So for now, do nothing here again.
451 }
452
453 if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
454 // "Endpoints MAY choose to generate a connection error
455 // (Section 5.4.1) of type INADEQUATE_SECURITY if one of
456 // the prohibited cipher suites are negotiated."
457 //
458 // We choose that. In my opinion, the spec is weak
459 // here. It also says both parties must support at least
460 // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no
461 // excuses here. If we really must, we could allow an
462 // "AllowInsecureWeakCiphers" option on the server later.
463 // Let's see how it plays out first.
464 sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite))
465 return
466 }
467 }
468
469 if hook := testHookGetServerConn; hook != nil {
470 hook(sc)
471 }
472 sc.serve()
473}
474
475func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) {
476 ctx, cancel = context.WithCancel(opts.context())
477 ctx = context.WithValue(ctx, http.LocalAddrContextKey, c.LocalAddr())
478 if hs := opts.baseConfig(); hs != nil {
479 ctx = context.WithValue(ctx, http.ServerContextKey, hs)
480 }
481 return
482}
483
484func (sc *serverConn) rejectConn(err ErrCode, debug string) {
485 sc.vlogf("http2: server rejecting conn: %v, %s", err, debug)
486 // ignoring errors. hanging up anyway.
487 sc.framer.WriteGoAway(0, err, []byte(debug))
488 sc.bw.Flush()
489 sc.conn.Close()
490}
491
492type serverConn struct {
493 // Immutable:
494 srv *Server
495 hs *http.Server
496 conn net.Conn
497 bw *bufferedWriter // writing to conn
498 handler http.Handler
499 baseCtx context.Context
500 framer *Framer
501 doneServing chan struct{} // closed when serverConn.serve ends
502 readFrameCh chan readFrameResult // written by serverConn.readFrames
503 wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
504 wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
505 bodyReadCh chan bodyReadMsg // from handlers -> serve
506 serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
507 flow flow // conn-wide (not stream-specific) outbound flow control
508 inflow flow // conn-wide inbound flow control
509 tlsState *tls.ConnectionState // shared by all handlers, like net/http
510 remoteAddrStr string
511 writeSched WriteScheduler
512
513 // Everything following is owned by the serve loop; use serveG.check():
514 serveG goroutineLock // used to verify funcs are on serve()
515 pushEnabled bool
516 sawFirstSettings bool // got the initial SETTINGS frame after the preface
517 needToSendSettingsAck bool
518 unackedSettings int // how many SETTINGS have we sent without ACKs?
519 queuedControlFrames int // control frames in the writeSched queue
520 clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
521 advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
522 curClientStreams uint32 // number of open streams initiated by the client
523 curPushedStreams uint32 // number of open streams initiated by server push
524 maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
525 maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
526 streams map[uint32]*stream
527 initialStreamSendWindowSize int32
528 maxFrameSize int32
529 headerTableSize uint32
530 peerMaxHeaderListSize uint32 // zero means unknown (default)
531 canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
532 writingFrame bool // started writing a frame (on serve goroutine or separate)
533 writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
534 needsFrameFlush bool // last frame write wasn't a flush
535 inGoAway bool // we've started to or sent GOAWAY
536 inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
537 needToSendGoAway bool // we need to schedule a GOAWAY frame write
538 goAwayCode ErrCode
539 shutdownTimer *time.Timer // nil until used
540 idleTimer *time.Timer // nil if unused
541
542 // Owned by the writeFrameAsync goroutine:
543 headerWriteBuf bytes.Buffer
544 hpackEncoder *hpack.Encoder
545
546 // Used by startGracefulShutdown.
547 shutdownOnce sync.Once
548}
549
550func (sc *serverConn) maxHeaderListSize() uint32 {
551 n := sc.hs.MaxHeaderBytes
552 if n <= 0 {
553 n = http.DefaultMaxHeaderBytes
554 }
555 // http2's count is in a slightly different unit and includes 32 bytes per pair.
556 // So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
557 const perFieldOverhead = 32 // per http2 spec
558 const typicalHeaders = 10 // conservative
559 return uint32(n + typicalHeaders*perFieldOverhead)
560}
561
562func (sc *serverConn) curOpenStreams() uint32 {
563 sc.serveG.check()
564 return sc.curClientStreams + sc.curPushedStreams
565}
566
567// stream represents a stream. This is the minimal metadata needed by
568// the serve goroutine. Most of the actual stream state is owned by
569// the http.Handler's goroutine in the responseWriter. Because the
570// responseWriter's responseWriterState is recycled at the end of a
571// handler, this struct intentionally has no pointer to the
572// *responseWriter{,State} itself, as the Handler ending nils out the
573// responseWriter's state field.
574type stream struct {
575 // immutable:
576 sc *serverConn
577 id uint32
578 body *pipe // non-nil if expecting DATA frames
579 cw closeWaiter // closed wait stream transitions to closed state
580 ctx context.Context
581 cancelCtx func()
582
583 // owned by serverConn's serve loop:
584 bodyBytes int64 // body bytes seen so far
585 declBodyBytes int64 // or -1 if undeclared
586 flow flow // limits writing from Handler to client
587 inflow flow // what the client is allowed to POST/etc to us
588 state streamState
589 resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
590 gotTrailerHeader bool // HEADER frame for trailers was seen
591 wroteHeaders bool // whether we wrote headers (not status 100)
592 writeDeadline *time.Timer // nil if unused
593
594 trailer http.Header // accumulated trailers
595 reqTrailer http.Header // handler's Request.Trailer
596}
597
598func (sc *serverConn) Framer() *Framer { return sc.framer }
599func (sc *serverConn) CloseConn() error { return sc.conn.Close() }
600func (sc *serverConn) Flush() error { return sc.bw.Flush() }
601func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
602 return sc.hpackEncoder, &sc.headerWriteBuf
603}
604
605func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
606 sc.serveG.check()
607 // http://tools.ietf.org/html/rfc7540#section-5.1
608 if st, ok := sc.streams[streamID]; ok {
609 return st.state, st
610 }
611 // "The first use of a new stream identifier implicitly closes all
612 // streams in the "idle" state that might have been initiated by
613 // that peer with a lower-valued stream identifier. For example, if
614 // a client sends a HEADERS frame on stream 7 without ever sending a
615 // frame on stream 5, then stream 5 transitions to the "closed"
616 // state when the first frame for stream 7 is sent or received."
617 if streamID%2 == 1 {
618 if streamID <= sc.maxClientStreamID {
619 return stateClosed, nil
620 }
621 } else {
622 if streamID <= sc.maxPushPromiseID {
623 return stateClosed, nil
624 }
625 }
626 return stateIdle, nil
627}
628
629// setConnState calls the net/http ConnState hook for this connection, if configured.
630// Note that the net/http package does StateNew and StateClosed for us.
631// There is currently no plan for StateHijacked or hijacking HTTP/2 connections.
632func (sc *serverConn) setConnState(state http.ConnState) {
633 if sc.hs.ConnState != nil {
634 sc.hs.ConnState(sc.conn, state)
635 }
636}
637
638func (sc *serverConn) vlogf(format string, args ...interface{}) {
639 if VerboseLogs {
640 sc.logf(format, args...)
641 }
642}
643
644func (sc *serverConn) logf(format string, args ...interface{}) {
645 if lg := sc.hs.ErrorLog; lg != nil {
646 lg.Printf(format, args...)
647 } else {
648 log.Printf(format, args...)
649 }
650}
651
652// errno returns v's underlying uintptr, else 0.
653//
654// TODO: remove this helper function once http2 can use build
655// tags. See comment in isClosedConnError.
656func errno(v error) uintptr {
657 if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr {
658 return uintptr(rv.Uint())
659 }
660 return 0
661}
662
663// isClosedConnError reports whether err is an error from use of a closed
664// network connection.
665func isClosedConnError(err error) bool {
666 if err == nil {
667 return false
668 }
669
670 // TODO: remove this string search and be more like the Windows
671 // case below. That might involve modifying the standard library
672 // to return better error types.
673 str := err.Error()
674 if strings.Contains(str, "use of closed network connection") {
675 return true
676 }
677
678 // TODO(bradfitz): x/tools/cmd/bundle doesn't really support
679 // build tags, so I can't make an http2_windows.go file with
680 // Windows-specific stuff. Fix that and move this, once we
681 // have a way to bundle this into std's net/http somehow.
682 if runtime.GOOS == "windows" {
683 if oe, ok := err.(*net.OpError); ok && oe.Op == "read" {
684 if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" {
685 const WSAECONNABORTED = 10053
686 const WSAECONNRESET = 10054
687 if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED {
688 return true
689 }
690 }
691 }
692 }
693 return false
694}
695
696func (sc *serverConn) condlogf(err error, format string, args ...interface{}) {
697 if err == nil {
698 return
699 }
700 if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout {
701 // Boring, expected errors.
702 sc.vlogf(format, args...)
703 } else {
704 sc.logf(format, args...)
705 }
706}
707
708func (sc *serverConn) canonicalHeader(v string) string {
709 sc.serveG.check()
710 buildCommonHeaderMapsOnce()
711 cv, ok := commonCanonHeader[v]
712 if ok {
713 return cv
714 }
715 cv, ok = sc.canonHeader[v]
716 if ok {
717 return cv
718 }
719 if sc.canonHeader == nil {
720 sc.canonHeader = make(map[string]string)
721 }
722 cv = http.CanonicalHeaderKey(v)
723 sc.canonHeader[v] = cv
724 return cv
725}
726
727type readFrameResult struct {
728 f Frame // valid until readMore is called
729 err error
730
731 // readMore should be called once the consumer no longer needs or
732 // retains f. After readMore, f is invalid and more frames can be
733 // read.
734 readMore func()
735}
736
737// readFrames is the loop that reads incoming frames.
738// It takes care to only read one frame at a time, blocking until the
739// consumer is done with the frame.
740// It's run on its own goroutine.
741func (sc *serverConn) readFrames() {
742 gate := make(gate)
743 gateDone := gate.Done
744 for {
745 f, err := sc.framer.ReadFrame()
746 select {
747 case sc.readFrameCh <- readFrameResult{f, err, gateDone}:
748 case <-sc.doneServing:
749 return
750 }
751 select {
752 case <-gate:
753 case <-sc.doneServing:
754 return
755 }
756 if terminalReadFrameError(err) {
757 return
758 }
759 }
760}
761
762// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine.
763type frameWriteResult struct {
Andrea Campanellaaec20bd2021-02-25 12:41:34 +0100764 _ incomparable
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000765 wr FrameWriteRequest // what was written (or attempted)
766 err error // result of the writeFrame call
767}
768
769// writeFrameAsync runs in its own goroutine and writes a single frame
770// and then reports when it's done.
771// At most one goroutine can be running writeFrameAsync at a time per
772// serverConn.
773func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) {
774 err := wr.write.writeFrame(sc)
Andrea Campanellaaec20bd2021-02-25 12:41:34 +0100775 sc.wroteFrameCh <- frameWriteResult{wr: wr, err: err}
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000776}
777
778func (sc *serverConn) closeAllStreamsOnConnClose() {
779 sc.serveG.check()
780 for _, st := range sc.streams {
781 sc.closeStream(st, errClientDisconnected)
782 }
783}
784
785func (sc *serverConn) stopShutdownTimer() {
786 sc.serveG.check()
787 if t := sc.shutdownTimer; t != nil {
788 t.Stop()
789 }
790}
791
792func (sc *serverConn) notePanic() {
793 // Note: this is for serverConn.serve panicking, not http.Handler code.
794 if testHookOnPanicMu != nil {
795 testHookOnPanicMu.Lock()
796 defer testHookOnPanicMu.Unlock()
797 }
798 if testHookOnPanic != nil {
799 if e := recover(); e != nil {
800 if testHookOnPanic(sc, e) {
801 panic(e)
802 }
803 }
804 }
805}
806
807func (sc *serverConn) serve() {
808 sc.serveG.check()
809 defer sc.notePanic()
810 defer sc.conn.Close()
811 defer sc.closeAllStreamsOnConnClose()
812 defer sc.stopShutdownTimer()
813 defer close(sc.doneServing) // unblocks handlers trying to send
814
815 if VerboseLogs {
816 sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
817 }
818
819 sc.writeFrame(FrameWriteRequest{
820 write: writeSettings{
821 {SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
822 {SettingMaxConcurrentStreams, sc.advMaxStreams},
823 {SettingMaxHeaderListSize, sc.maxHeaderListSize()},
824 {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
825 },
826 })
827 sc.unackedSettings++
828
829 // Each connection starts with intialWindowSize inflow tokens.
830 // If a higher value is configured, we add more tokens.
831 if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
832 sc.sendWindowUpdate(nil, int(diff))
833 }
834
835 if err := sc.readPreface(); err != nil {
836 sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
837 return
838 }
839 // Now that we've got the preface, get us out of the
840 // "StateNew" state. We can't go directly to idle, though.
841 // Active means we read some data and anticipate a request. We'll
842 // do another Active when we get a HEADERS frame.
843 sc.setConnState(http.StateActive)
844 sc.setConnState(http.StateIdle)
845
846 if sc.srv.IdleTimeout != 0 {
847 sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
848 defer sc.idleTimer.Stop()
849 }
850
851 go sc.readFrames() // closed by defer sc.conn.Close above
852
853 settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
854 defer settingsTimer.Stop()
855
856 loopNum := 0
857 for {
858 loopNum++
859 select {
860 case wr := <-sc.wantWriteFrameCh:
861 if se, ok := wr.write.(StreamError); ok {
862 sc.resetStream(se)
863 break
864 }
865 sc.writeFrame(wr)
866 case res := <-sc.wroteFrameCh:
867 sc.wroteFrame(res)
868 case res := <-sc.readFrameCh:
869 if !sc.processFrameFromReader(res) {
870 return
871 }
872 res.readMore()
873 if settingsTimer != nil {
874 settingsTimer.Stop()
875 settingsTimer = nil
876 }
877 case m := <-sc.bodyReadCh:
878 sc.noteBodyRead(m.st, m.n)
879 case msg := <-sc.serveMsgCh:
880 switch v := msg.(type) {
881 case func(int):
882 v(loopNum) // for testing
883 case *serverMessage:
884 switch v {
885 case settingsTimerMsg:
886 sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
887 return
888 case idleTimerMsg:
889 sc.vlogf("connection is idle")
890 sc.goAway(ErrCodeNo)
891 case shutdownTimerMsg:
892 sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
893 return
894 case gracefulShutdownMsg:
895 sc.startGracefulShutdownInternal()
896 default:
897 panic("unknown timer")
898 }
899 case *startPushRequest:
900 sc.startPush(v)
901 default:
902 panic(fmt.Sprintf("unexpected type %T", v))
903 }
904 }
905
906 // If the peer is causing us to generate a lot of control frames,
907 // but not reading them from us, assume they are trying to make us
908 // run out of memory.
909 if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() {
910 sc.vlogf("http2: too many control frames in send queue, closing connection")
911 return
912 }
913
914 // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY
915 // with no error code (graceful shutdown), don't start the timer until
916 // all open streams have been completed.
917 sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame
918 gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0
919 if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) {
920 sc.shutDownIn(goAwayTimeout)
921 }
922 }
923}
924
925func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
926 select {
927 case <-sc.doneServing:
928 case <-sharedCh:
929 close(privateCh)
930 }
931}
932
933type serverMessage int
934
935// Message values sent to serveMsgCh.
936var (
937 settingsTimerMsg = new(serverMessage)
938 idleTimerMsg = new(serverMessage)
939 shutdownTimerMsg = new(serverMessage)
940 gracefulShutdownMsg = new(serverMessage)
941)
942
943func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
944func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) }
945func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
946
947func (sc *serverConn) sendServeMsg(msg interface{}) {
948 sc.serveG.checkNotOn() // NOT
949 select {
950 case sc.serveMsgCh <- msg:
951 case <-sc.doneServing:
952 }
953}
954
955var errPrefaceTimeout = errors.New("timeout waiting for client preface")
956
957// readPreface reads the ClientPreface greeting from the peer or
958// returns errPrefaceTimeout on timeout, or an error if the greeting
959// is invalid.
960func (sc *serverConn) readPreface() error {
961 errc := make(chan error, 1)
962 go func() {
963 // Read the client preface
964 buf := make([]byte, len(ClientPreface))
965 if _, err := io.ReadFull(sc.conn, buf); err != nil {
966 errc <- err
967 } else if !bytes.Equal(buf, clientPreface) {
968 errc <- fmt.Errorf("bogus greeting %q", buf)
969 } else {
970 errc <- nil
971 }
972 }()
973 timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
974 defer timer.Stop()
975 select {
976 case <-timer.C:
977 return errPrefaceTimeout
978 case err := <-errc:
979 if err == nil {
980 if VerboseLogs {
981 sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr())
982 }
983 }
984 return err
985 }
986}
987
988var errChanPool = sync.Pool{
989 New: func() interface{} { return make(chan error, 1) },
990}
991
992var writeDataPool = sync.Pool{
993 New: func() interface{} { return new(writeData) },
994}
995
996// writeDataFromHandler writes DATA response frames from a handler on
997// the given stream.
998func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
999 ch := errChanPool.Get().(chan error)
1000 writeArg := writeDataPool.Get().(*writeData)
1001 *writeArg = writeData{stream.id, data, endStream}
1002 err := sc.writeFrameFromHandler(FrameWriteRequest{
1003 write: writeArg,
1004 stream: stream,
1005 done: ch,
1006 })
1007 if err != nil {
1008 return err
1009 }
1010 var frameWriteDone bool // the frame write is done (successfully or not)
1011 select {
1012 case err = <-ch:
1013 frameWriteDone = true
1014 case <-sc.doneServing:
1015 return errClientDisconnected
1016 case <-stream.cw:
1017 // If both ch and stream.cw were ready (as might
1018 // happen on the final Write after an http.Handler
1019 // ends), prefer the write result. Otherwise this
1020 // might just be us successfully closing the stream.
1021 // The writeFrameAsync and serve goroutines guarantee
1022 // that the ch send will happen before the stream.cw
1023 // close.
1024 select {
1025 case err = <-ch:
1026 frameWriteDone = true
1027 default:
1028 return errStreamClosed
1029 }
1030 }
1031 errChanPool.Put(ch)
1032 if frameWriteDone {
1033 writeDataPool.Put(writeArg)
1034 }
1035 return err
1036}
1037
1038// writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts
1039// if the connection has gone away.
1040//
1041// This must not be run from the serve goroutine itself, else it might
1042// deadlock writing to sc.wantWriteFrameCh (which is only mildly
1043// buffered and is read by serve itself). If you're on the serve
1044// goroutine, call writeFrame instead.
1045func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
1046 sc.serveG.checkNotOn() // NOT
1047 select {
1048 case sc.wantWriteFrameCh <- wr:
1049 return nil
1050 case <-sc.doneServing:
1051 // Serve loop is gone.
1052 // Client has closed their connection to the server.
1053 return errClientDisconnected
1054 }
1055}
1056
1057// writeFrame schedules a frame to write and sends it if there's nothing
1058// already being written.
1059//
1060// There is no pushback here (the serve goroutine never blocks). It's
1061// the http.Handlers that block, waiting for their previous frames to
1062// make it onto the wire
1063//
1064// If you're not on the serve goroutine, use writeFrameFromHandler instead.
1065func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
1066 sc.serveG.check()
1067
1068 // If true, wr will not be written and wr.done will not be signaled.
1069 var ignoreWrite bool
1070
1071 // We are not allowed to write frames on closed streams. RFC 7540 Section
1072 // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
1073 // a closed stream." Our server never sends PRIORITY, so that exception
1074 // does not apply.
1075 //
1076 // The serverConn might close an open stream while the stream's handler
1077 // is still running. For example, the server might close a stream when it
1078 // receives bad data from the client. If this happens, the handler might
1079 // attempt to write a frame after the stream has been closed (since the
1080 // handler hasn't yet been notified of the close). In this case, we simply
1081 // ignore the frame. The handler will notice that the stream is closed when
1082 // it waits for the frame to be written.
1083 //
1084 // As an exception to this rule, we allow sending RST_STREAM after close.
1085 // This allows us to immediately reject new streams without tracking any
1086 // state for those streams (except for the queued RST_STREAM frame). This
1087 // may result in duplicate RST_STREAMs in some cases, but the client should
1088 // ignore those.
1089 if wr.StreamID() != 0 {
1090 _, isReset := wr.write.(StreamError)
1091 if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
1092 ignoreWrite = true
1093 }
1094 }
1095
1096 // Don't send a 100-continue response if we've already sent headers.
1097 // See golang.org/issue/14030.
1098 switch wr.write.(type) {
1099 case *writeResHeaders:
1100 wr.stream.wroteHeaders = true
1101 case write100ContinueHeadersFrame:
1102 if wr.stream.wroteHeaders {
1103 // We do not need to notify wr.done because this frame is
1104 // never written with wr.done != nil.
1105 if wr.done != nil {
1106 panic("wr.done != nil for write100ContinueHeadersFrame")
1107 }
1108 ignoreWrite = true
1109 }
1110 }
1111
1112 if !ignoreWrite {
1113 if wr.isControl() {
1114 sc.queuedControlFrames++
1115 // For extra safety, detect wraparounds, which should not happen,
1116 // and pull the plug.
1117 if sc.queuedControlFrames < 0 {
1118 sc.conn.Close()
1119 }
1120 }
1121 sc.writeSched.Push(wr)
1122 }
1123 sc.scheduleFrameWrite()
1124}
1125
1126// startFrameWrite starts a goroutine to write wr (in a separate
1127// goroutine since that might block on the network), and updates the
1128// serve goroutine's state about the world, updated from info in wr.
1129func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
1130 sc.serveG.check()
1131 if sc.writingFrame {
1132 panic("internal error: can only be writing one frame at a time")
1133 }
1134
1135 st := wr.stream
1136 if st != nil {
1137 switch st.state {
1138 case stateHalfClosedLocal:
1139 switch wr.write.(type) {
1140 case StreamError, handlerPanicRST, writeWindowUpdate:
1141 // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
1142 // in this state. (We never send PRIORITY from the server, so that is not checked.)
1143 default:
1144 panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
1145 }
1146 case stateClosed:
1147 panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr))
1148 }
1149 }
1150 if wpp, ok := wr.write.(*writePushPromise); ok {
1151 var err error
1152 wpp.promisedID, err = wpp.allocatePromisedID()
1153 if err != nil {
1154 sc.writingFrameAsync = false
1155 wr.replyToWriter(err)
1156 return
1157 }
1158 }
1159
1160 sc.writingFrame = true
1161 sc.needsFrameFlush = true
1162 if wr.write.staysWithinBuffer(sc.bw.Available()) {
1163 sc.writingFrameAsync = false
1164 err := wr.write.writeFrame(sc)
Andrea Campanellaaec20bd2021-02-25 12:41:34 +01001165 sc.wroteFrame(frameWriteResult{wr: wr, err: err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001166 } else {
1167 sc.writingFrameAsync = true
1168 go sc.writeFrameAsync(wr)
1169 }
1170}
1171
1172// errHandlerPanicked is the error given to any callers blocked in a read from
1173// Request.Body when the main goroutine panics. Since most handlers read in the
1174// main ServeHTTP goroutine, this will show up rarely.
1175var errHandlerPanicked = errors.New("http2: handler panicked")
1176
1177// wroteFrame is called on the serve goroutine with the result of
1178// whatever happened on writeFrameAsync.
1179func (sc *serverConn) wroteFrame(res frameWriteResult) {
1180 sc.serveG.check()
1181 if !sc.writingFrame {
1182 panic("internal error: expected to be already writing a frame")
1183 }
1184 sc.writingFrame = false
1185 sc.writingFrameAsync = false
1186
1187 wr := res.wr
1188
1189 if writeEndsStream(wr.write) {
1190 st := wr.stream
1191 if st == nil {
1192 panic("internal error: expecting non-nil stream")
1193 }
1194 switch st.state {
1195 case stateOpen:
1196 // Here we would go to stateHalfClosedLocal in
1197 // theory, but since our handler is done and
1198 // the net/http package provides no mechanism
1199 // for closing a ResponseWriter while still
1200 // reading data (see possible TODO at top of
1201 // this file), we go into closed state here
1202 // anyway, after telling the peer we're
1203 // hanging up on them. We'll transition to
1204 // stateClosed after the RST_STREAM frame is
1205 // written.
1206 st.state = stateHalfClosedLocal
1207 // Section 8.1: a server MAY request that the client abort
1208 // transmission of a request without error by sending a
1209 // RST_STREAM with an error code of NO_ERROR after sending
1210 // a complete response.
1211 sc.resetStream(streamError(st.id, ErrCodeNo))
1212 case stateHalfClosedRemote:
1213 sc.closeStream(st, errHandlerComplete)
1214 }
1215 } else {
1216 switch v := wr.write.(type) {
1217 case StreamError:
1218 // st may be unknown if the RST_STREAM was generated to reject bad input.
1219 if st, ok := sc.streams[v.StreamID]; ok {
1220 sc.closeStream(st, v)
1221 }
1222 case handlerPanicRST:
1223 sc.closeStream(wr.stream, errHandlerPanicked)
1224 }
1225 }
1226
1227 // Reply (if requested) to unblock the ServeHTTP goroutine.
1228 wr.replyToWriter(res.err)
1229
1230 sc.scheduleFrameWrite()
1231}
1232
1233// scheduleFrameWrite tickles the frame writing scheduler.
1234//
1235// If a frame is already being written, nothing happens. This will be called again
1236// when the frame is done being written.
1237//
1238// If a frame isn't being written and we need to send one, the best frame
1239// to send is selected by writeSched.
1240//
1241// If a frame isn't being written and there's nothing else to send, we
1242// flush the write buffer.
1243func (sc *serverConn) scheduleFrameWrite() {
1244 sc.serveG.check()
1245 if sc.writingFrame || sc.inFrameScheduleLoop {
1246 return
1247 }
1248 sc.inFrameScheduleLoop = true
1249 for !sc.writingFrameAsync {
1250 if sc.needToSendGoAway {
1251 sc.needToSendGoAway = false
1252 sc.startFrameWrite(FrameWriteRequest{
1253 write: &writeGoAway{
1254 maxStreamID: sc.maxClientStreamID,
1255 code: sc.goAwayCode,
1256 },
1257 })
1258 continue
1259 }
1260 if sc.needToSendSettingsAck {
1261 sc.needToSendSettingsAck = false
1262 sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
1263 continue
1264 }
1265 if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
1266 if wr, ok := sc.writeSched.Pop(); ok {
1267 if wr.isControl() {
1268 sc.queuedControlFrames--
1269 }
1270 sc.startFrameWrite(wr)
1271 continue
1272 }
1273 }
1274 if sc.needsFrameFlush {
1275 sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
1276 sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
1277 continue
1278 }
1279 break
1280 }
1281 sc.inFrameScheduleLoop = false
1282}
1283
1284// startGracefulShutdown gracefully shuts down a connection. This
1285// sends GOAWAY with ErrCodeNo to tell the client we're gracefully
1286// shutting down. The connection isn't closed until all current
1287// streams are done.
1288//
1289// startGracefulShutdown returns immediately; it does not wait until
1290// the connection has shut down.
1291func (sc *serverConn) startGracefulShutdown() {
1292 sc.serveG.checkNotOn() // NOT
1293 sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) })
1294}
1295
1296// After sending GOAWAY, the connection will close after goAwayTimeout.
1297// If we close the connection immediately after sending GOAWAY, there may
1298// be unsent data in our kernel receive buffer, which will cause the kernel
1299// to send a TCP RST on close() instead of a FIN. This RST will abort the
1300// connection immediately, whether or not the client had received the GOAWAY.
1301//
1302// Ideally we should delay for at least 1 RTT + epsilon so the client has
1303// a chance to read the GOAWAY and stop sending messages. Measuring RTT
1304// is hard, so we approximate with 1 second. See golang.org/issue/18701.
1305//
1306// This is a var so it can be shorter in tests, where all requests uses the
1307// loopback interface making the expected RTT very small.
1308//
1309// TODO: configurable?
1310var goAwayTimeout = 1 * time.Second
1311
1312func (sc *serverConn) startGracefulShutdownInternal() {
1313 sc.goAway(ErrCodeNo)
1314}
1315
1316func (sc *serverConn) goAway(code ErrCode) {
1317 sc.serveG.check()
1318 if sc.inGoAway {
1319 return
1320 }
1321 sc.inGoAway = true
1322 sc.needToSendGoAway = true
1323 sc.goAwayCode = code
1324 sc.scheduleFrameWrite()
1325}
1326
1327func (sc *serverConn) shutDownIn(d time.Duration) {
1328 sc.serveG.check()
1329 sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
1330}
1331
1332func (sc *serverConn) resetStream(se StreamError) {
1333 sc.serveG.check()
1334 sc.writeFrame(FrameWriteRequest{write: se})
1335 if st, ok := sc.streams[se.StreamID]; ok {
1336 st.resetQueued = true
1337 }
1338}
1339
1340// processFrameFromReader processes the serve loop's read from readFrameCh from the
1341// frame-reading goroutine.
1342// processFrameFromReader returns whether the connection should be kept open.
1343func (sc *serverConn) processFrameFromReader(res readFrameResult) bool {
1344 sc.serveG.check()
1345 err := res.err
1346 if err != nil {
1347 if err == ErrFrameTooLarge {
1348 sc.goAway(ErrCodeFrameSize)
1349 return true // goAway will close the loop
1350 }
1351 clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err)
1352 if clientGone {
1353 // TODO: could we also get into this state if
1354 // the peer does a half close
1355 // (e.g. CloseWrite) because they're done
1356 // sending frames but they're still wanting
1357 // our open replies? Investigate.
1358 // TODO: add CloseWrite to crypto/tls.Conn first
1359 // so we have a way to test this? I suppose
1360 // just for testing we could have a non-TLS mode.
1361 return false
1362 }
1363 } else {
1364 f := res.f
1365 if VerboseLogs {
1366 sc.vlogf("http2: server read frame %v", summarizeFrame(f))
1367 }
1368 err = sc.processFrame(f)
1369 if err == nil {
1370 return true
1371 }
1372 }
1373
1374 switch ev := err.(type) {
1375 case StreamError:
1376 sc.resetStream(ev)
1377 return true
1378 case goAwayFlowError:
1379 sc.goAway(ErrCodeFlowControl)
1380 return true
1381 case ConnectionError:
1382 sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
1383 sc.goAway(ErrCode(ev))
1384 return true // goAway will handle shutdown
1385 default:
1386 if res.err != nil {
1387 sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
1388 } else {
1389 sc.logf("http2: server closing client connection: %v", err)
1390 }
1391 return false
1392 }
1393}
1394
1395func (sc *serverConn) processFrame(f Frame) error {
1396 sc.serveG.check()
1397
1398 // First frame received must be SETTINGS.
1399 if !sc.sawFirstSettings {
1400 if _, ok := f.(*SettingsFrame); !ok {
1401 return ConnectionError(ErrCodeProtocol)
1402 }
1403 sc.sawFirstSettings = true
1404 }
1405
1406 switch f := f.(type) {
1407 case *SettingsFrame:
1408 return sc.processSettings(f)
1409 case *MetaHeadersFrame:
1410 return sc.processHeaders(f)
1411 case *WindowUpdateFrame:
1412 return sc.processWindowUpdate(f)
1413 case *PingFrame:
1414 return sc.processPing(f)
1415 case *DataFrame:
1416 return sc.processData(f)
1417 case *RSTStreamFrame:
1418 return sc.processResetStream(f)
1419 case *PriorityFrame:
1420 return sc.processPriority(f)
1421 case *GoAwayFrame:
1422 return sc.processGoAway(f)
1423 case *PushPromiseFrame:
1424 // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
1425 // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1426 return ConnectionError(ErrCodeProtocol)
1427 default:
1428 sc.vlogf("http2: server ignoring frame: %v", f.Header())
1429 return nil
1430 }
1431}
1432
1433func (sc *serverConn) processPing(f *PingFrame) error {
1434 sc.serveG.check()
1435 if f.IsAck() {
1436 // 6.7 PING: " An endpoint MUST NOT respond to PING frames
1437 // containing this flag."
1438 return nil
1439 }
1440 if f.StreamID != 0 {
1441 // "PING frames are not associated with any individual
1442 // stream. If a PING frame is received with a stream
1443 // identifier field value other than 0x0, the recipient MUST
1444 // respond with a connection error (Section 5.4.1) of type
1445 // PROTOCOL_ERROR."
1446 return ConnectionError(ErrCodeProtocol)
1447 }
1448 if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1449 return nil
1450 }
1451 sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
1452 return nil
1453}
1454
1455func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
1456 sc.serveG.check()
1457 switch {
1458 case f.StreamID != 0: // stream-level flow control
1459 state, st := sc.state(f.StreamID)
1460 if state == stateIdle {
1461 // Section 5.1: "Receiving any frame other than HEADERS
1462 // or PRIORITY on a stream in this state MUST be
1463 // treated as a connection error (Section 5.4.1) of
1464 // type PROTOCOL_ERROR."
1465 return ConnectionError(ErrCodeProtocol)
1466 }
1467 if st == nil {
1468 // "WINDOW_UPDATE can be sent by a peer that has sent a
1469 // frame bearing the END_STREAM flag. This means that a
1470 // receiver could receive a WINDOW_UPDATE frame on a "half
1471 // closed (remote)" or "closed" stream. A receiver MUST
1472 // NOT treat this as an error, see Section 5.1."
1473 return nil
1474 }
1475 if !st.flow.add(int32(f.Increment)) {
1476 return streamError(f.StreamID, ErrCodeFlowControl)
1477 }
1478 default: // connection-level flow control
1479 if !sc.flow.add(int32(f.Increment)) {
1480 return goAwayFlowError{}
1481 }
1482 }
1483 sc.scheduleFrameWrite()
1484 return nil
1485}
1486
1487func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
1488 sc.serveG.check()
1489
1490 state, st := sc.state(f.StreamID)
1491 if state == stateIdle {
1492 // 6.4 "RST_STREAM frames MUST NOT be sent for a
1493 // stream in the "idle" state. If a RST_STREAM frame
1494 // identifying an idle stream is received, the
1495 // recipient MUST treat this as a connection error
1496 // (Section 5.4.1) of type PROTOCOL_ERROR.
1497 return ConnectionError(ErrCodeProtocol)
1498 }
1499 if st != nil {
1500 st.cancelCtx()
1501 sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
1502 }
1503 return nil
1504}
1505
1506func (sc *serverConn) closeStream(st *stream, err error) {
1507 sc.serveG.check()
1508 if st.state == stateIdle || st.state == stateClosed {
1509 panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
1510 }
1511 st.state = stateClosed
1512 if st.writeDeadline != nil {
1513 st.writeDeadline.Stop()
1514 }
1515 if st.isPushed() {
1516 sc.curPushedStreams--
1517 } else {
1518 sc.curClientStreams--
1519 }
1520 delete(sc.streams, st.id)
1521 if len(sc.streams) == 0 {
1522 sc.setConnState(http.StateIdle)
1523 if sc.srv.IdleTimeout != 0 {
1524 sc.idleTimer.Reset(sc.srv.IdleTimeout)
1525 }
1526 if h1ServerKeepAlivesDisabled(sc.hs) {
1527 sc.startGracefulShutdownInternal()
1528 }
1529 }
1530 if p := st.body; p != nil {
1531 // Return any buffered unread bytes worth of conn-level flow control.
1532 // See golang.org/issue/16481
1533 sc.sendWindowUpdate(nil, p.Len())
1534
1535 p.CloseWithError(err)
1536 }
1537 st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
1538 sc.writeSched.CloseStream(st.id)
1539}
1540
1541func (sc *serverConn) processSettings(f *SettingsFrame) error {
1542 sc.serveG.check()
1543 if f.IsAck() {
1544 sc.unackedSettings--
1545 if sc.unackedSettings < 0 {
1546 // Why is the peer ACKing settings we never sent?
1547 // The spec doesn't mention this case, but
1548 // hang up on them anyway.
1549 return ConnectionError(ErrCodeProtocol)
1550 }
1551 return nil
1552 }
1553 if f.NumSettings() > 100 || f.HasDuplicates() {
1554 // This isn't actually in the spec, but hang up on
1555 // suspiciously large settings frames or those with
1556 // duplicate entries.
1557 return ConnectionError(ErrCodeProtocol)
1558 }
1559 if err := f.ForeachSetting(sc.processSetting); err != nil {
1560 return err
1561 }
1562 // TODO: judging by RFC 7540, Section 6.5.3 each SETTINGS frame should be
1563 // acknowledged individually, even if multiple are received before the ACK.
1564 sc.needToSendSettingsAck = true
1565 sc.scheduleFrameWrite()
1566 return nil
1567}
1568
1569func (sc *serverConn) processSetting(s Setting) error {
1570 sc.serveG.check()
1571 if err := s.Valid(); err != nil {
1572 return err
1573 }
1574 if VerboseLogs {
1575 sc.vlogf("http2: server processing setting %v", s)
1576 }
1577 switch s.ID {
1578 case SettingHeaderTableSize:
1579 sc.headerTableSize = s.Val
1580 sc.hpackEncoder.SetMaxDynamicTableSize(s.Val)
1581 case SettingEnablePush:
1582 sc.pushEnabled = s.Val != 0
1583 case SettingMaxConcurrentStreams:
1584 sc.clientMaxStreams = s.Val
1585 case SettingInitialWindowSize:
1586 return sc.processSettingInitialWindowSize(s.Val)
1587 case SettingMaxFrameSize:
1588 sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
1589 case SettingMaxHeaderListSize:
1590 sc.peerMaxHeaderListSize = s.Val
1591 default:
1592 // Unknown setting: "An endpoint that receives a SETTINGS
1593 // frame with any unknown or unsupported identifier MUST
1594 // ignore that setting."
1595 if VerboseLogs {
1596 sc.vlogf("http2: server ignoring unknown setting %v", s)
1597 }
1598 }
1599 return nil
1600}
1601
1602func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
1603 sc.serveG.check()
1604 // Note: val already validated to be within range by
1605 // processSetting's Valid call.
1606
1607 // "A SETTINGS frame can alter the initial flow control window
1608 // size for all current streams. When the value of
1609 // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
1610 // adjust the size of all stream flow control windows that it
1611 // maintains by the difference between the new value and the
1612 // old value."
1613 old := sc.initialStreamSendWindowSize
1614 sc.initialStreamSendWindowSize = int32(val)
1615 growth := int32(val) - old // may be negative
1616 for _, st := range sc.streams {
1617 if !st.flow.add(growth) {
1618 // 6.9.2 Initial Flow Control Window Size
1619 // "An endpoint MUST treat a change to
1620 // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
1621 // control window to exceed the maximum size as a
1622 // connection error (Section 5.4.1) of type
1623 // FLOW_CONTROL_ERROR."
1624 return ConnectionError(ErrCodeFlowControl)
1625 }
1626 }
1627 return nil
1628}
1629
1630func (sc *serverConn) processData(f *DataFrame) error {
1631 sc.serveG.check()
1632 if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1633 return nil
1634 }
1635 data := f.Data()
1636
1637 // "If a DATA frame is received whose stream is not in "open"
1638 // or "half closed (local)" state, the recipient MUST respond
1639 // with a stream error (Section 5.4.2) of type STREAM_CLOSED."
1640 id := f.Header().StreamID
1641 state, st := sc.state(id)
1642 if id == 0 || state == stateIdle {
1643 // Section 5.1: "Receiving any frame other than HEADERS
1644 // or PRIORITY on a stream in this state MUST be
1645 // treated as a connection error (Section 5.4.1) of
1646 // type PROTOCOL_ERROR."
1647 return ConnectionError(ErrCodeProtocol)
1648 }
1649 if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
1650 // This includes sending a RST_STREAM if the stream is
1651 // in stateHalfClosedLocal (which currently means that
1652 // the http.Handler returned, so it's done reading &
1653 // done writing). Try to stop the client from sending
1654 // more DATA.
1655
1656 // But still enforce their connection-level flow control,
1657 // and return any flow control bytes since we're not going
1658 // to consume them.
1659 if sc.inflow.available() < int32(f.Length) {
1660 return streamError(id, ErrCodeFlowControl)
1661 }
1662 // Deduct the flow control from inflow, since we're
1663 // going to immediately add it back in
1664 // sendWindowUpdate, which also schedules sending the
1665 // frames.
1666 sc.inflow.take(int32(f.Length))
1667 sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
1668
1669 if st != nil && st.resetQueued {
1670 // Already have a stream error in flight. Don't send another.
1671 return nil
1672 }
1673 return streamError(id, ErrCodeStreamClosed)
1674 }
1675 if st.body == nil {
1676 panic("internal error: should have a body in this state")
1677 }
1678
1679 // Sender sending more than they'd declared?
1680 if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
1681 st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
1682 // RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
1683 // value of a content-length header field does not equal the sum of the
1684 // DATA frame payload lengths that form the body.
1685 return streamError(id, ErrCodeProtocol)
1686 }
1687 if f.Length > 0 {
1688 // Check whether the client has flow control quota.
1689 if st.inflow.available() < int32(f.Length) {
1690 return streamError(id, ErrCodeFlowControl)
1691 }
1692 st.inflow.take(int32(f.Length))
1693
1694 if len(data) > 0 {
1695 wrote, err := st.body.Write(data)
1696 if err != nil {
Andrea Campanellaaec20bd2021-02-25 12:41:34 +01001697 sc.sendWindowUpdate(nil, int(f.Length)-wrote)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001698 return streamError(id, ErrCodeStreamClosed)
1699 }
1700 if wrote != len(data) {
1701 panic("internal error: bad Writer")
1702 }
1703 st.bodyBytes += int64(len(data))
1704 }
1705
1706 // Return any padded flow control now, since we won't
1707 // refund it later on body reads.
1708 if pad := int32(f.Length) - int32(len(data)); pad > 0 {
1709 sc.sendWindowUpdate32(nil, pad)
1710 sc.sendWindowUpdate32(st, pad)
1711 }
1712 }
1713 if f.StreamEnded() {
1714 st.endStream()
1715 }
1716 return nil
1717}
1718
1719func (sc *serverConn) processGoAway(f *GoAwayFrame) error {
1720 sc.serveG.check()
1721 if f.ErrCode != ErrCodeNo {
1722 sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1723 } else {
1724 sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1725 }
1726 sc.startGracefulShutdownInternal()
1727 // http://tools.ietf.org/html/rfc7540#section-6.8
1728 // We should not create any new streams, which means we should disable push.
1729 sc.pushEnabled = false
1730 return nil
1731}
1732
1733// isPushed reports whether the stream is server-initiated.
1734func (st *stream) isPushed() bool {
1735 return st.id%2 == 0
1736}
1737
1738// endStream closes a Request.Body's pipe. It is called when a DATA
1739// frame says a request body is over (or after trailers).
1740func (st *stream) endStream() {
1741 sc := st.sc
1742 sc.serveG.check()
1743
1744 if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
1745 st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
1746 st.declBodyBytes, st.bodyBytes))
1747 } else {
1748 st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest)
1749 st.body.CloseWithError(io.EOF)
1750 }
1751 st.state = stateHalfClosedRemote
1752}
1753
1754// copyTrailersToHandlerRequest is run in the Handler's goroutine in
1755// its Request.Body.Read just before it gets io.EOF.
1756func (st *stream) copyTrailersToHandlerRequest() {
1757 for k, vv := range st.trailer {
1758 if _, ok := st.reqTrailer[k]; ok {
1759 // Only copy it over it was pre-declared.
1760 st.reqTrailer[k] = vv
1761 }
1762 }
1763}
1764
1765// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
1766// when the stream's WriteTimeout has fired.
1767func (st *stream) onWriteTimeout() {
1768 st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)})
1769}
1770
1771func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
1772 sc.serveG.check()
1773 id := f.StreamID
1774 if sc.inGoAway {
1775 // Ignore.
1776 return nil
1777 }
1778 // http://tools.ietf.org/html/rfc7540#section-5.1.1
1779 // Streams initiated by a client MUST use odd-numbered stream
1780 // identifiers. [...] An endpoint that receives an unexpected
1781 // stream identifier MUST respond with a connection error
1782 // (Section 5.4.1) of type PROTOCOL_ERROR.
1783 if id%2 != 1 {
1784 return ConnectionError(ErrCodeProtocol)
1785 }
1786 // A HEADERS frame can be used to create a new stream or
1787 // send a trailer for an open one. If we already have a stream
1788 // open, let it process its own HEADERS frame (trailers at this
1789 // point, if it's valid).
1790 if st := sc.streams[f.StreamID]; st != nil {
1791 if st.resetQueued {
1792 // We're sending RST_STREAM to close the stream, so don't bother
1793 // processing this frame.
1794 return nil
1795 }
1796 // RFC 7540, sec 5.1: If an endpoint receives additional frames, other than
1797 // WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in
1798 // this state, it MUST respond with a stream error (Section 5.4.2) of
1799 // type STREAM_CLOSED.
1800 if st.state == stateHalfClosedRemote {
1801 return streamError(id, ErrCodeStreamClosed)
1802 }
1803 return st.processTrailerHeaders(f)
1804 }
1805
1806 // [...] The identifier of a newly established stream MUST be
1807 // numerically greater than all streams that the initiating
1808 // endpoint has opened or reserved. [...] An endpoint that
1809 // receives an unexpected stream identifier MUST respond with
1810 // a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1811 if id <= sc.maxClientStreamID {
1812 return ConnectionError(ErrCodeProtocol)
1813 }
1814 sc.maxClientStreamID = id
1815
1816 if sc.idleTimer != nil {
1817 sc.idleTimer.Stop()
1818 }
1819
1820 // http://tools.ietf.org/html/rfc7540#section-5.1.2
1821 // [...] Endpoints MUST NOT exceed the limit set by their peer. An
1822 // endpoint that receives a HEADERS frame that causes their
1823 // advertised concurrent stream limit to be exceeded MUST treat
1824 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
1825 // or REFUSED_STREAM.
1826 if sc.curClientStreams+1 > sc.advMaxStreams {
1827 if sc.unackedSettings == 0 {
1828 // They should know better.
1829 return streamError(id, ErrCodeProtocol)
1830 }
1831 // Assume it's a network race, where they just haven't
1832 // received our last SETTINGS update. But actually
1833 // this can't happen yet, because we don't yet provide
1834 // a way for users to adjust server parameters at
1835 // runtime.
1836 return streamError(id, ErrCodeRefusedStream)
1837 }
1838
1839 initialState := stateOpen
1840 if f.StreamEnded() {
1841 initialState = stateHalfClosedRemote
1842 }
1843 st := sc.newStream(id, 0, initialState)
1844
1845 if f.HasPriority() {
1846 if err := checkPriority(f.StreamID, f.Priority); err != nil {
1847 return err
1848 }
1849 sc.writeSched.AdjustStream(st.id, f.Priority)
1850 }
1851
1852 rw, req, err := sc.newWriterAndRequest(st, f)
1853 if err != nil {
1854 return err
1855 }
1856 st.reqTrailer = req.Trailer
1857 if st.reqTrailer != nil {
1858 st.trailer = make(http.Header)
1859 }
1860 st.body = req.Body.(*requestBody).pipe // may be nil
1861 st.declBodyBytes = req.ContentLength
1862
1863 handler := sc.handler.ServeHTTP
1864 if f.Truncated {
1865 // Their header list was too long. Send a 431 error.
1866 handler = handleHeaderListTooLong
1867 } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
1868 handler = new400Handler(err)
1869 }
1870
1871 // The net/http package sets the read deadline from the
1872 // http.Server.ReadTimeout during the TLS handshake, but then
1873 // passes the connection off to us with the deadline already
1874 // set. Disarm it here after the request headers are read,
1875 // similar to how the http1 server works. Here it's
1876 // technically more like the http1 Server's ReadHeaderTimeout
1877 // (in Go 1.8), though. That's a more sane option anyway.
1878 if sc.hs.ReadTimeout != 0 {
1879 sc.conn.SetReadDeadline(time.Time{})
1880 }
1881
1882 go sc.runHandler(rw, req, handler)
1883 return nil
1884}
1885
1886func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
1887 sc := st.sc
1888 sc.serveG.check()
1889 if st.gotTrailerHeader {
1890 return ConnectionError(ErrCodeProtocol)
1891 }
1892 st.gotTrailerHeader = true
1893 if !f.StreamEnded() {
1894 return streamError(st.id, ErrCodeProtocol)
1895 }
1896
1897 if len(f.PseudoFields()) > 0 {
1898 return streamError(st.id, ErrCodeProtocol)
1899 }
1900 if st.trailer != nil {
1901 for _, hf := range f.RegularFields() {
1902 key := sc.canonicalHeader(hf.Name)
1903 if !httpguts.ValidTrailerHeader(key) {
1904 // TODO: send more details to the peer somehow. But http2 has
1905 // no way to send debug data at a stream level. Discuss with
1906 // HTTP folk.
1907 return streamError(st.id, ErrCodeProtocol)
1908 }
1909 st.trailer[key] = append(st.trailer[key], hf.Value)
1910 }
1911 }
1912 st.endStream()
1913 return nil
1914}
1915
1916func checkPriority(streamID uint32, p PriorityParam) error {
1917 if streamID == p.StreamDep {
1918 // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
1919 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
1920 // Section 5.3.3 says that a stream can depend on one of its dependencies,
1921 // so it's only self-dependencies that are forbidden.
1922 return streamError(streamID, ErrCodeProtocol)
1923 }
1924 return nil
1925}
1926
1927func (sc *serverConn) processPriority(f *PriorityFrame) error {
1928 if sc.inGoAway {
1929 return nil
1930 }
1931 if err := checkPriority(f.StreamID, f.PriorityParam); err != nil {
1932 return err
1933 }
1934 sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
1935 return nil
1936}
1937
1938func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
1939 sc.serveG.check()
1940 if id == 0 {
1941 panic("internal error: cannot create stream with id 0")
1942 }
1943
1944 ctx, cancelCtx := context.WithCancel(sc.baseCtx)
1945 st := &stream{
1946 sc: sc,
1947 id: id,
1948 state: state,
1949 ctx: ctx,
1950 cancelCtx: cancelCtx,
1951 }
1952 st.cw.Init()
1953 st.flow.conn = &sc.flow // link to conn-level counter
1954 st.flow.add(sc.initialStreamSendWindowSize)
1955 st.inflow.conn = &sc.inflow // link to conn-level counter
1956 st.inflow.add(sc.srv.initialStreamRecvWindowSize())
1957 if sc.hs.WriteTimeout != 0 {
1958 st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
1959 }
1960
1961 sc.streams[id] = st
1962 sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
1963 if st.isPushed() {
1964 sc.curPushedStreams++
1965 } else {
1966 sc.curClientStreams++
1967 }
1968 if sc.curOpenStreams() == 1 {
1969 sc.setConnState(http.StateActive)
1970 }
1971
1972 return st
1973}
1974
1975func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
1976 sc.serveG.check()
1977
1978 rp := requestParam{
1979 method: f.PseudoValue("method"),
1980 scheme: f.PseudoValue("scheme"),
1981 authority: f.PseudoValue("authority"),
1982 path: f.PseudoValue("path"),
1983 }
1984
1985 isConnect := rp.method == "CONNECT"
1986 if isConnect {
1987 if rp.path != "" || rp.scheme != "" || rp.authority == "" {
1988 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
1989 }
1990 } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
1991 // See 8.1.2.6 Malformed Requests and Responses:
1992 //
1993 // Malformed requests or responses that are detected
1994 // MUST be treated as a stream error (Section 5.4.2)
1995 // of type PROTOCOL_ERROR."
1996 //
1997 // 8.1.2.3 Request Pseudo-Header Fields
1998 // "All HTTP/2 requests MUST include exactly one valid
1999 // value for the :method, :scheme, and :path
2000 // pseudo-header fields"
2001 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
2002 }
2003
2004 bodyOpen := !f.StreamEnded()
2005 if rp.method == "HEAD" && bodyOpen {
2006 // HEAD requests can't have bodies
2007 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
2008 }
2009
2010 rp.header = make(http.Header)
2011 for _, hf := range f.RegularFields() {
2012 rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value)
2013 }
2014 if rp.authority == "" {
2015 rp.authority = rp.header.Get("Host")
2016 }
2017
2018 rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
2019 if err != nil {
2020 return nil, nil, err
2021 }
2022 if bodyOpen {
2023 if vv, ok := rp.header["Content-Length"]; ok {
Andrea Campanellaaec20bd2021-02-25 12:41:34 +01002024 if cl, err := strconv.ParseUint(vv[0], 10, 63); err == nil {
2025 req.ContentLength = int64(cl)
2026 } else {
2027 req.ContentLength = 0
2028 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +00002029 } else {
2030 req.ContentLength = -1
2031 }
2032 req.Body.(*requestBody).pipe = &pipe{
2033 b: &dataBuffer{expected: req.ContentLength},
2034 }
2035 }
2036 return rw, req, nil
2037}
2038
2039type requestParam struct {
2040 method string
2041 scheme, authority, path string
2042 header http.Header
2043}
2044
2045func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) {
2046 sc.serveG.check()
2047
2048 var tlsState *tls.ConnectionState // nil if not scheme https
2049 if rp.scheme == "https" {
2050 tlsState = sc.tlsState
2051 }
2052
2053 needsContinue := rp.header.Get("Expect") == "100-continue"
2054 if needsContinue {
2055 rp.header.Del("Expect")
2056 }
2057 // Merge Cookie headers into one "; "-delimited value.
2058 if cookies := rp.header["Cookie"]; len(cookies) > 1 {
2059 rp.header.Set("Cookie", strings.Join(cookies, "; "))
2060 }
2061
2062 // Setup Trailers
2063 var trailer http.Header
2064 for _, v := range rp.header["Trailer"] {
2065 for _, key := range strings.Split(v, ",") {
Andrea Campanellaaec20bd2021-02-25 12:41:34 +01002066 key = http.CanonicalHeaderKey(textproto.TrimString(key))
Holger Hildebrandtfa074992020-03-27 15:42:06 +00002067 switch key {
2068 case "Transfer-Encoding", "Trailer", "Content-Length":
2069 // Bogus. (copy of http1 rules)
2070 // Ignore.
2071 default:
2072 if trailer == nil {
2073 trailer = make(http.Header)
2074 }
2075 trailer[key] = nil
2076 }
2077 }
2078 }
2079 delete(rp.header, "Trailer")
2080
2081 var url_ *url.URL
2082 var requestURI string
2083 if rp.method == "CONNECT" {
2084 url_ = &url.URL{Host: rp.authority}
2085 requestURI = rp.authority // mimic HTTP/1 server behavior
2086 } else {
2087 var err error
2088 url_, err = url.ParseRequestURI(rp.path)
2089 if err != nil {
2090 return nil, nil, streamError(st.id, ErrCodeProtocol)
2091 }
2092 requestURI = rp.path
2093 }
2094
2095 body := &requestBody{
2096 conn: sc,
2097 stream: st,
2098 needsContinue: needsContinue,
2099 }
2100 req := &http.Request{
2101 Method: rp.method,
2102 URL: url_,
2103 RemoteAddr: sc.remoteAddrStr,
2104 Header: rp.header,
2105 RequestURI: requestURI,
2106 Proto: "HTTP/2.0",
2107 ProtoMajor: 2,
2108 ProtoMinor: 0,
2109 TLS: tlsState,
2110 Host: rp.authority,
2111 Body: body,
2112 Trailer: trailer,
2113 }
2114 req = req.WithContext(st.ctx)
2115
2116 rws := responseWriterStatePool.Get().(*responseWriterState)
2117 bwSave := rws.bw
2118 *rws = responseWriterState{} // zero all the fields
2119 rws.conn = sc
2120 rws.bw = bwSave
2121 rws.bw.Reset(chunkWriter{rws})
2122 rws.stream = st
2123 rws.req = req
2124 rws.body = body
2125
2126 rw := &responseWriter{rws: rws}
2127 return rw, req, nil
2128}
2129
2130// Run on its own goroutine.
2131func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
2132 didPanic := true
2133 defer func() {
2134 rw.rws.stream.cancelCtx()
2135 if didPanic {
2136 e := recover()
2137 sc.writeFrameFromHandler(FrameWriteRequest{
2138 write: handlerPanicRST{rw.rws.stream.id},
2139 stream: rw.rws.stream,
2140 })
2141 // Same as net/http:
2142 if e != nil && e != http.ErrAbortHandler {
2143 const size = 64 << 10
2144 buf := make([]byte, size)
2145 buf = buf[:runtime.Stack(buf, false)]
2146 sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
2147 }
2148 return
2149 }
2150 rw.handlerDone()
2151 }()
2152 handler(rw, req)
2153 didPanic = false
2154}
2155
2156func handleHeaderListTooLong(w http.ResponseWriter, r *http.Request) {
2157 // 10.5.1 Limits on Header Block Size:
2158 // .. "A server that receives a larger header block than it is
2159 // willing to handle can send an HTTP 431 (Request Header Fields Too
2160 // Large) status code"
2161 const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+
2162 w.WriteHeader(statusRequestHeaderFieldsTooLarge)
2163 io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>")
2164}
2165
2166// called from handler goroutines.
2167// h may be nil.
2168func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error {
2169 sc.serveG.checkNotOn() // NOT on
2170 var errc chan error
2171 if headerData.h != nil {
2172 // If there's a header map (which we don't own), so we have to block on
2173 // waiting for this frame to be written, so an http.Flush mid-handler
2174 // writes out the correct value of keys, before a handler later potentially
2175 // mutates it.
2176 errc = errChanPool.Get().(chan error)
2177 }
2178 if err := sc.writeFrameFromHandler(FrameWriteRequest{
2179 write: headerData,
2180 stream: st,
2181 done: errc,
2182 }); err != nil {
2183 return err
2184 }
2185 if errc != nil {
2186 select {
2187 case err := <-errc:
2188 errChanPool.Put(errc)
2189 return err
2190 case <-sc.doneServing:
2191 return errClientDisconnected
2192 case <-st.cw:
2193 return errStreamClosed
2194 }
2195 }
2196 return nil
2197}
2198
2199// called from handler goroutines.
2200func (sc *serverConn) write100ContinueHeaders(st *stream) {
2201 sc.writeFrameFromHandler(FrameWriteRequest{
2202 write: write100ContinueHeadersFrame{st.id},
2203 stream: st,
2204 })
2205}
2206
2207// A bodyReadMsg tells the server loop that the http.Handler read n
2208// bytes of the DATA from the client on the given stream.
2209type bodyReadMsg struct {
2210 st *stream
2211 n int
2212}
2213
2214// called from handler goroutines.
2215// Notes that the handler for the given stream ID read n bytes of its body
2216// and schedules flow control tokens to be sent.
2217func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
2218 sc.serveG.checkNotOn() // NOT on
2219 if n > 0 {
2220 select {
2221 case sc.bodyReadCh <- bodyReadMsg{st, n}:
2222 case <-sc.doneServing:
2223 }
2224 }
2225}
2226
2227func (sc *serverConn) noteBodyRead(st *stream, n int) {
2228 sc.serveG.check()
2229 sc.sendWindowUpdate(nil, n) // conn-level
2230 if st.state != stateHalfClosedRemote && st.state != stateClosed {
2231 // Don't send this WINDOW_UPDATE if the stream is closed
2232 // remotely.
2233 sc.sendWindowUpdate(st, n)
2234 }
2235}
2236
2237// st may be nil for conn-level
2238func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
2239 sc.serveG.check()
2240 // "The legal range for the increment to the flow control
2241 // window is 1 to 2^31-1 (2,147,483,647) octets."
2242 // A Go Read call on 64-bit machines could in theory read
2243 // a larger Read than this. Very unlikely, but we handle it here
2244 // rather than elsewhere for now.
2245 const maxUint31 = 1<<31 - 1
2246 for n >= maxUint31 {
2247 sc.sendWindowUpdate32(st, maxUint31)
2248 n -= maxUint31
2249 }
2250 sc.sendWindowUpdate32(st, int32(n))
2251}
2252
2253// st may be nil for conn-level
2254func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
2255 sc.serveG.check()
2256 if n == 0 {
2257 return
2258 }
2259 if n < 0 {
2260 panic("negative update")
2261 }
2262 var streamID uint32
2263 if st != nil {
2264 streamID = st.id
2265 }
2266 sc.writeFrame(FrameWriteRequest{
2267 write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
2268 stream: st,
2269 })
2270 var ok bool
2271 if st == nil {
2272 ok = sc.inflow.add(n)
2273 } else {
2274 ok = st.inflow.add(n)
2275 }
2276 if !ok {
2277 panic("internal error; sent too many window updates without decrements?")
2278 }
2279}
2280
2281// requestBody is the Handler's Request.Body type.
2282// Read and Close may be called concurrently.
2283type requestBody struct {
Andrea Campanellaaec20bd2021-02-25 12:41:34 +01002284 _ incomparable
Holger Hildebrandtfa074992020-03-27 15:42:06 +00002285 stream *stream
2286 conn *serverConn
2287 closed bool // for use by Close only
2288 sawEOF bool // for use by Read only
2289 pipe *pipe // non-nil if we have a HTTP entity message body
2290 needsContinue bool // need to send a 100-continue
2291}
2292
2293func (b *requestBody) Close() error {
2294 if b.pipe != nil && !b.closed {
2295 b.pipe.BreakWithError(errClosedBody)
2296 }
2297 b.closed = true
2298 return nil
2299}
2300
2301func (b *requestBody) Read(p []byte) (n int, err error) {
2302 if b.needsContinue {
2303 b.needsContinue = false
2304 b.conn.write100ContinueHeaders(b.stream)
2305 }
2306 if b.pipe == nil || b.sawEOF {
2307 return 0, io.EOF
2308 }
2309 n, err = b.pipe.Read(p)
2310 if err == io.EOF {
2311 b.sawEOF = true
2312 }
2313 if b.conn == nil && inTests {
2314 return
2315 }
2316 b.conn.noteBodyReadFromHandler(b.stream, n, err)
2317 return
2318}
2319
2320// responseWriter is the http.ResponseWriter implementation. It's
2321// intentionally small (1 pointer wide) to minimize garbage. The
2322// responseWriterState pointer inside is zeroed at the end of a
2323// request (in handlerDone) and calls on the responseWriter thereafter
2324// simply crash (caller's mistake), but the much larger responseWriterState
2325// and buffers are reused between multiple requests.
2326type responseWriter struct {
2327 rws *responseWriterState
2328}
2329
2330// Optional http.ResponseWriter interfaces implemented.
2331var (
2332 _ http.CloseNotifier = (*responseWriter)(nil)
2333 _ http.Flusher = (*responseWriter)(nil)
2334 _ stringWriter = (*responseWriter)(nil)
2335)
2336
2337type responseWriterState struct {
2338 // immutable within a request:
2339 stream *stream
2340 req *http.Request
2341 body *requestBody // to close at end of request, if DATA frames didn't
2342 conn *serverConn
2343
2344 // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
2345 bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState}
2346
2347 // mutated by http.Handler goroutine:
2348 handlerHeader http.Header // nil until called
2349 snapHeader http.Header // snapshot of handlerHeader at WriteHeader time
2350 trailers []string // set in writeChunk
2351 status int // status code passed to WriteHeader
2352 wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
2353 sentHeader bool // have we sent the header frame?
2354 handlerDone bool // handler has finished
2355 dirty bool // a Write failed; don't reuse this responseWriterState
2356
2357 sentContentLen int64 // non-zero if handler set a Content-Length header
2358 wroteBytes int64
2359
2360 closeNotifierMu sync.Mutex // guards closeNotifierCh
2361 closeNotifierCh chan bool // nil until first used
2362}
2363
2364type chunkWriter struct{ rws *responseWriterState }
2365
2366func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) }
2367
2368func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 }
2369
2370func (rws *responseWriterState) hasNonemptyTrailers() bool {
2371 for _, trailer := range rws.trailers {
2372 if _, ok := rws.handlerHeader[trailer]; ok {
2373 return true
2374 }
2375 }
2376 return false
2377}
2378
2379// declareTrailer is called for each Trailer header when the
2380// response header is written. It notes that a header will need to be
2381// written in the trailers at the end of the response.
2382func (rws *responseWriterState) declareTrailer(k string) {
2383 k = http.CanonicalHeaderKey(k)
2384 if !httpguts.ValidTrailerHeader(k) {
2385 // Forbidden by RFC 7230, section 4.1.2.
2386 rws.conn.logf("ignoring invalid trailer %q", k)
2387 return
2388 }
2389 if !strSliceContains(rws.trailers, k) {
2390 rws.trailers = append(rws.trailers, k)
2391 }
2392}
2393
2394// writeChunk writes chunks from the bufio.Writer. But because
2395// bufio.Writer may bypass its chunking, sometimes p may be
2396// arbitrarily large.
2397//
2398// writeChunk is also responsible (on the first chunk) for sending the
2399// HEADER response.
2400func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
2401 if !rws.wroteHeader {
2402 rws.writeHeader(200)
2403 }
2404
2405 isHeadResp := rws.req.Method == "HEAD"
2406 if !rws.sentHeader {
2407 rws.sentHeader = true
2408 var ctype, clen string
2409 if clen = rws.snapHeader.Get("Content-Length"); clen != "" {
2410 rws.snapHeader.Del("Content-Length")
Andrea Campanellaaec20bd2021-02-25 12:41:34 +01002411 if cl, err := strconv.ParseUint(clen, 10, 63); err == nil {
2412 rws.sentContentLen = int64(cl)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00002413 } else {
2414 clen = ""
2415 }
2416 }
2417 if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
2418 clen = strconv.Itoa(len(p))
2419 }
2420 _, hasContentType := rws.snapHeader["Content-Type"]
2421 // If the Content-Encoding is non-blank, we shouldn't
2422 // sniff the body. See Issue golang.org/issue/31753.
2423 ce := rws.snapHeader.Get("Content-Encoding")
2424 hasCE := len(ce) > 0
2425 if !hasCE && !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 {
2426 ctype = http.DetectContentType(p)
2427 }
2428 var date string
2429 if _, ok := rws.snapHeader["Date"]; !ok {
2430 // TODO(bradfitz): be faster here, like net/http? measure.
2431 date = time.Now().UTC().Format(http.TimeFormat)
2432 }
2433
2434 for _, v := range rws.snapHeader["Trailer"] {
2435 foreachHeaderElement(v, rws.declareTrailer)
2436 }
2437
2438 // "Connection" headers aren't allowed in HTTP/2 (RFC 7540, 8.1.2.2),
2439 // but respect "Connection" == "close" to mean sending a GOAWAY and tearing
2440 // down the TCP connection when idle, like we do for HTTP/1.
2441 // TODO: remove more Connection-specific header fields here, in addition
2442 // to "Connection".
2443 if _, ok := rws.snapHeader["Connection"]; ok {
2444 v := rws.snapHeader.Get("Connection")
2445 delete(rws.snapHeader, "Connection")
2446 if v == "close" {
2447 rws.conn.startGracefulShutdown()
2448 }
2449 }
2450
2451 endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp
2452 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2453 streamID: rws.stream.id,
2454 httpResCode: rws.status,
2455 h: rws.snapHeader,
2456 endStream: endStream,
2457 contentType: ctype,
2458 contentLength: clen,
2459 date: date,
2460 })
2461 if err != nil {
2462 rws.dirty = true
2463 return 0, err
2464 }
2465 if endStream {
2466 return 0, nil
2467 }
2468 }
2469 if isHeadResp {
2470 return len(p), nil
2471 }
2472 if len(p) == 0 && !rws.handlerDone {
2473 return 0, nil
2474 }
2475
2476 if rws.handlerDone {
2477 rws.promoteUndeclaredTrailers()
2478 }
2479
2480 // only send trailers if they have actually been defined by the
2481 // server handler.
2482 hasNonemptyTrailers := rws.hasNonemptyTrailers()
2483 endStream := rws.handlerDone && !hasNonemptyTrailers
2484 if len(p) > 0 || endStream {
2485 // only send a 0 byte DATA frame if we're ending the stream.
2486 if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
2487 rws.dirty = true
2488 return 0, err
2489 }
2490 }
2491
2492 if rws.handlerDone && hasNonemptyTrailers {
2493 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2494 streamID: rws.stream.id,
2495 h: rws.handlerHeader,
2496 trailers: rws.trailers,
2497 endStream: true,
2498 })
2499 if err != nil {
2500 rws.dirty = true
2501 }
2502 return len(p), err
2503 }
2504 return len(p), nil
2505}
2506
2507// TrailerPrefix is a magic prefix for ResponseWriter.Header map keys
2508// that, if present, signals that the map entry is actually for
2509// the response trailers, and not the response headers. The prefix
2510// is stripped after the ServeHTTP call finishes and the values are
2511// sent in the trailers.
2512//
2513// This mechanism is intended only for trailers that are not known
2514// prior to the headers being written. If the set of trailers is fixed
2515// or known before the header is written, the normal Go trailers mechanism
2516// is preferred:
2517// https://golang.org/pkg/net/http/#ResponseWriter
2518// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
2519const TrailerPrefix = "Trailer:"
2520
2521// promoteUndeclaredTrailers permits http.Handlers to set trailers
2522// after the header has already been flushed. Because the Go
2523// ResponseWriter interface has no way to set Trailers (only the
2524// Header), and because we didn't want to expand the ResponseWriter
2525// interface, and because nobody used trailers, and because RFC 7230
2526// says you SHOULD (but not must) predeclare any trailers in the
2527// header, the official ResponseWriter rules said trailers in Go must
2528// be predeclared, and then we reuse the same ResponseWriter.Header()
2529// map to mean both Headers and Trailers. When it's time to write the
2530// Trailers, we pick out the fields of Headers that were declared as
2531// trailers. That worked for a while, until we found the first major
2532// user of Trailers in the wild: gRPC (using them only over http2),
2533// and gRPC libraries permit setting trailers mid-stream without
2534// predeclaring them. So: change of plans. We still permit the old
2535// way, but we also permit this hack: if a Header() key begins with
2536// "Trailer:", the suffix of that key is a Trailer. Because ':' is an
2537// invalid token byte anyway, there is no ambiguity. (And it's already
2538// filtered out) It's mildly hacky, but not terrible.
2539//
2540// This method runs after the Handler is done and promotes any Header
2541// fields to be trailers.
2542func (rws *responseWriterState) promoteUndeclaredTrailers() {
2543 for k, vv := range rws.handlerHeader {
2544 if !strings.HasPrefix(k, TrailerPrefix) {
2545 continue
2546 }
2547 trailerKey := strings.TrimPrefix(k, TrailerPrefix)
2548 rws.declareTrailer(trailerKey)
2549 rws.handlerHeader[http.CanonicalHeaderKey(trailerKey)] = vv
2550 }
2551
2552 if len(rws.trailers) > 1 {
2553 sorter := sorterPool.Get().(*sorter)
2554 sorter.SortStrings(rws.trailers)
2555 sorterPool.Put(sorter)
2556 }
2557}
2558
2559func (w *responseWriter) Flush() {
2560 rws := w.rws
2561 if rws == nil {
2562 panic("Header called after Handler finished")
2563 }
2564 if rws.bw.Buffered() > 0 {
2565 if err := rws.bw.Flush(); err != nil {
2566 // Ignore the error. The frame writer already knows.
2567 return
2568 }
2569 } else {
2570 // The bufio.Writer won't call chunkWriter.Write
2571 // (writeChunk with zero bytes, so we have to do it
2572 // ourselves to force the HTTP response header and/or
2573 // final DATA frame (with END_STREAM) to be sent.
2574 rws.writeChunk(nil)
2575 }
2576}
2577
2578func (w *responseWriter) CloseNotify() <-chan bool {
2579 rws := w.rws
2580 if rws == nil {
2581 panic("CloseNotify called after Handler finished")
2582 }
2583 rws.closeNotifierMu.Lock()
2584 ch := rws.closeNotifierCh
2585 if ch == nil {
2586 ch = make(chan bool, 1)
2587 rws.closeNotifierCh = ch
2588 cw := rws.stream.cw
2589 go func() {
2590 cw.Wait() // wait for close
2591 ch <- true
2592 }()
2593 }
2594 rws.closeNotifierMu.Unlock()
2595 return ch
2596}
2597
2598func (w *responseWriter) Header() http.Header {
2599 rws := w.rws
2600 if rws == nil {
2601 panic("Header called after Handler finished")
2602 }
2603 if rws.handlerHeader == nil {
2604 rws.handlerHeader = make(http.Header)
2605 }
2606 return rws.handlerHeader
2607}
2608
2609// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode.
2610func checkWriteHeaderCode(code int) {
2611 // Issue 22880: require valid WriteHeader status codes.
2612 // For now we only enforce that it's three digits.
2613 // In the future we might block things over 599 (600 and above aren't defined
2614 // at http://httpwg.org/specs/rfc7231.html#status.codes)
2615 // and we might block under 200 (once we have more mature 1xx support).
2616 // But for now any three digits.
2617 //
2618 // We used to send "HTTP/1.1 000 0" on the wire in responses but there's
2619 // no equivalent bogus thing we can realistically send in HTTP/2,
2620 // so we'll consistently panic instead and help people find their bugs
2621 // early. (We can't return an error from WriteHeader even if we wanted to.)
2622 if code < 100 || code > 999 {
2623 panic(fmt.Sprintf("invalid WriteHeader code %v", code))
2624 }
2625}
2626
2627func (w *responseWriter) WriteHeader(code int) {
2628 rws := w.rws
2629 if rws == nil {
2630 panic("WriteHeader called after Handler finished")
2631 }
2632 rws.writeHeader(code)
2633}
2634
2635func (rws *responseWriterState) writeHeader(code int) {
2636 if !rws.wroteHeader {
2637 checkWriteHeaderCode(code)
2638 rws.wroteHeader = true
2639 rws.status = code
2640 if len(rws.handlerHeader) > 0 {
2641 rws.snapHeader = cloneHeader(rws.handlerHeader)
2642 }
2643 }
2644}
2645
2646func cloneHeader(h http.Header) http.Header {
2647 h2 := make(http.Header, len(h))
2648 for k, vv := range h {
2649 vv2 := make([]string, len(vv))
2650 copy(vv2, vv)
2651 h2[k] = vv2
2652 }
2653 return h2
2654}
2655
2656// The Life Of A Write is like this:
2657//
2658// * Handler calls w.Write or w.WriteString ->
2659// * -> rws.bw (*bufio.Writer) ->
2660// * (Handler might call Flush)
2661// * -> chunkWriter{rws}
2662// * -> responseWriterState.writeChunk(p []byte)
2663// * -> responseWriterState.writeChunk (most of the magic; see comment there)
2664func (w *responseWriter) Write(p []byte) (n int, err error) {
2665 return w.write(len(p), p, "")
2666}
2667
2668func (w *responseWriter) WriteString(s string) (n int, err error) {
2669 return w.write(len(s), nil, s)
2670}
2671
2672// either dataB or dataS is non-zero.
2673func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) {
2674 rws := w.rws
2675 if rws == nil {
2676 panic("Write called after Handler finished")
2677 }
2678 if !rws.wroteHeader {
2679 w.WriteHeader(200)
2680 }
2681 if !bodyAllowedForStatus(rws.status) {
2682 return 0, http.ErrBodyNotAllowed
2683 }
2684 rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set
2685 if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen {
2686 // TODO: send a RST_STREAM
2687 return 0, errors.New("http2: handler wrote more than declared Content-Length")
2688 }
2689
2690 if dataB != nil {
2691 return rws.bw.Write(dataB)
2692 } else {
2693 return rws.bw.WriteString(dataS)
2694 }
2695}
2696
2697func (w *responseWriter) handlerDone() {
2698 rws := w.rws
2699 dirty := rws.dirty
2700 rws.handlerDone = true
2701 w.Flush()
2702 w.rws = nil
2703 if !dirty {
2704 // Only recycle the pool if all prior Write calls to
2705 // the serverConn goroutine completed successfully. If
2706 // they returned earlier due to resets from the peer
2707 // there might still be write goroutines outstanding
2708 // from the serverConn referencing the rws memory. See
2709 // issue 20704.
2710 responseWriterStatePool.Put(rws)
2711 }
2712}
2713
2714// Push errors.
2715var (
2716 ErrRecursivePush = errors.New("http2: recursive push not allowed")
2717 ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
2718)
2719
2720var _ http.Pusher = (*responseWriter)(nil)
2721
2722func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
2723 st := w.rws.stream
2724 sc := st.sc
2725 sc.serveG.checkNotOn()
2726
2727 // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
2728 // http://tools.ietf.org/html/rfc7540#section-6.6
2729 if st.isPushed() {
2730 return ErrRecursivePush
2731 }
2732
2733 if opts == nil {
2734 opts = new(http.PushOptions)
2735 }
2736
2737 // Default options.
2738 if opts.Method == "" {
2739 opts.Method = "GET"
2740 }
2741 if opts.Header == nil {
2742 opts.Header = http.Header{}
2743 }
2744 wantScheme := "http"
2745 if w.rws.req.TLS != nil {
2746 wantScheme = "https"
2747 }
2748
2749 // Validate the request.
2750 u, err := url.Parse(target)
2751 if err != nil {
2752 return err
2753 }
2754 if u.Scheme == "" {
2755 if !strings.HasPrefix(target, "/") {
2756 return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
2757 }
2758 u.Scheme = wantScheme
2759 u.Host = w.rws.req.Host
2760 } else {
2761 if u.Scheme != wantScheme {
2762 return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
2763 }
2764 if u.Host == "" {
2765 return errors.New("URL must have a host")
2766 }
2767 }
2768 for k := range opts.Header {
2769 if strings.HasPrefix(k, ":") {
2770 return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
2771 }
2772 // These headers are meaningful only if the request has a body,
2773 // but PUSH_PROMISE requests cannot have a body.
2774 // http://tools.ietf.org/html/rfc7540#section-8.2
2775 // Also disallow Host, since the promised URL must be absolute.
2776 switch strings.ToLower(k) {
2777 case "content-length", "content-encoding", "trailer", "te", "expect", "host":
2778 return fmt.Errorf("promised request headers cannot include %q", k)
2779 }
2780 }
2781 if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil {
2782 return err
2783 }
2784
2785 // The RFC effectively limits promised requests to GET and HEAD:
2786 // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
2787 // http://tools.ietf.org/html/rfc7540#section-8.2
2788 if opts.Method != "GET" && opts.Method != "HEAD" {
2789 return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
2790 }
2791
2792 msg := &startPushRequest{
2793 parent: st,
2794 method: opts.Method,
2795 url: u,
2796 header: cloneHeader(opts.Header),
2797 done: errChanPool.Get().(chan error),
2798 }
2799
2800 select {
2801 case <-sc.doneServing:
2802 return errClientDisconnected
2803 case <-st.cw:
2804 return errStreamClosed
2805 case sc.serveMsgCh <- msg:
2806 }
2807
2808 select {
2809 case <-sc.doneServing:
2810 return errClientDisconnected
2811 case <-st.cw:
2812 return errStreamClosed
2813 case err := <-msg.done:
2814 errChanPool.Put(msg.done)
2815 return err
2816 }
2817}
2818
2819type startPushRequest struct {
2820 parent *stream
2821 method string
2822 url *url.URL
2823 header http.Header
2824 done chan error
2825}
2826
2827func (sc *serverConn) startPush(msg *startPushRequest) {
2828 sc.serveG.check()
2829
2830 // http://tools.ietf.org/html/rfc7540#section-6.6.
2831 // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
2832 // is in either the "open" or "half-closed (remote)" state.
2833 if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
2834 // responseWriter.Push checks that the stream is peer-initiated.
2835 msg.done <- errStreamClosed
2836 return
2837 }
2838
2839 // http://tools.ietf.org/html/rfc7540#section-6.6.
2840 if !sc.pushEnabled {
2841 msg.done <- http.ErrNotSupported
2842 return
2843 }
2844
2845 // PUSH_PROMISE frames must be sent in increasing order by stream ID, so
2846 // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
2847 // is written. Once the ID is allocated, we start the request handler.
2848 allocatePromisedID := func() (uint32, error) {
2849 sc.serveG.check()
2850
2851 // Check this again, just in case. Technically, we might have received
2852 // an updated SETTINGS by the time we got around to writing this frame.
2853 if !sc.pushEnabled {
2854 return 0, http.ErrNotSupported
2855 }
2856 // http://tools.ietf.org/html/rfc7540#section-6.5.2.
2857 if sc.curPushedStreams+1 > sc.clientMaxStreams {
2858 return 0, ErrPushLimitReached
2859 }
2860
2861 // http://tools.ietf.org/html/rfc7540#section-5.1.1.
2862 // Streams initiated by the server MUST use even-numbered identifiers.
2863 // A server that is unable to establish a new stream identifier can send a GOAWAY
2864 // frame so that the client is forced to open a new connection for new streams.
2865 if sc.maxPushPromiseID+2 >= 1<<31 {
2866 sc.startGracefulShutdownInternal()
2867 return 0, ErrPushLimitReached
2868 }
2869 sc.maxPushPromiseID += 2
2870 promisedID := sc.maxPushPromiseID
2871
2872 // http://tools.ietf.org/html/rfc7540#section-8.2.
2873 // Strictly speaking, the new stream should start in "reserved (local)", then
2874 // transition to "half closed (remote)" after sending the initial HEADERS, but
2875 // we start in "half closed (remote)" for simplicity.
2876 // See further comments at the definition of stateHalfClosedRemote.
2877 promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote)
2878 rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{
2879 method: msg.method,
2880 scheme: msg.url.Scheme,
2881 authority: msg.url.Host,
2882 path: msg.url.RequestURI(),
2883 header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
2884 })
2885 if err != nil {
2886 // Should not happen, since we've already validated msg.url.
2887 panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
2888 }
2889
2890 go sc.runHandler(rw, req, sc.handler.ServeHTTP)
2891 return promisedID, nil
2892 }
2893
2894 sc.writeFrame(FrameWriteRequest{
2895 write: &writePushPromise{
2896 streamID: msg.parent.id,
2897 method: msg.method,
2898 url: msg.url,
2899 h: msg.header,
2900 allocatePromisedID: allocatePromisedID,
2901 },
2902 stream: msg.parent,
2903 done: msg.done,
2904 })
2905}
2906
2907// foreachHeaderElement splits v according to the "#rule" construction
2908// in RFC 7230 section 7 and calls fn for each non-empty element.
2909func foreachHeaderElement(v string, fn func(string)) {
2910 v = textproto.TrimString(v)
2911 if v == "" {
2912 return
2913 }
2914 if !strings.Contains(v, ",") {
2915 fn(v)
2916 return
2917 }
2918 for _, f := range strings.Split(v, ",") {
2919 if f = textproto.TrimString(f); f != "" {
2920 fn(f)
2921 }
2922 }
2923}
2924
2925// From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2
2926var connHeaders = []string{
2927 "Connection",
2928 "Keep-Alive",
2929 "Proxy-Connection",
2930 "Transfer-Encoding",
2931 "Upgrade",
2932}
2933
2934// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request,
2935// per RFC 7540 Section 8.1.2.2.
2936// The returned error is reported to users.
2937func checkValidHTTP2RequestHeaders(h http.Header) error {
2938 for _, k := range connHeaders {
2939 if _, ok := h[k]; ok {
2940 return fmt.Errorf("request header %q is not valid in HTTP/2", k)
2941 }
2942 }
2943 te := h["Te"]
2944 if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) {
2945 return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
2946 }
2947 return nil
2948}
2949
2950func new400Handler(err error) http.HandlerFunc {
2951 return func(w http.ResponseWriter, r *http.Request) {
2952 http.Error(w, err.Error(), http.StatusBadRequest)
2953 }
2954}
2955
2956// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
2957// disabled. See comments on h1ServerShutdownChan above for why
2958// the code is written this way.
2959func h1ServerKeepAlivesDisabled(hs *http.Server) bool {
2960 var x interface{} = hs
2961 type I interface {
2962 doKeepAlives() bool
2963 }
2964 if hs, ok := x.(I); ok {
2965 return !hs.doKeepAlives()
2966 }
2967 return false
2968}