blob: 193d7c6e79eab271d7ccba0023bfd5ab60e8751e [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// TODO: turn off the serve goroutine when idle, so
6// an idle conn only has the readFrames goroutine active. (which could
7// also be optimized probably to pin less memory in crypto/tls). This
8// would involve tracking when the serve goroutine is active (atomic
9// int32 read/CAS probably?) and starting it up when frames arrive,
10// and shutting it down when all handlers exit. the occasional PING
11// packets could use time.AfterFunc to call sc.wakeStartServeLoop()
12// (which is a no-op if already running) and then queue the PING write
13// as normal. The serve loop would then exit in most cases (if no
14// Handlers running) and not be woken up again until the PING packet
15// returns.
16
17// TODO (maybe): add a mechanism for Handlers to going into
18// half-closed-local mode (rw.(io.Closer) test?) but not exit their
19// handler, and continue to be able to read from the
20// Request.Body. This would be a somewhat semantic change from HTTP/1
21// (or at least what we expose in net/http), so I'd probably want to
22// add it there too. For now, this package says that returning from
23// the Handler ServeHTTP function means you're both done reading and
24// done writing, without a way to stop just one or the other.
25
26package http2
27
28import (
29 "bufio"
30 "bytes"
31 "context"
32 "crypto/tls"
33 "errors"
34 "fmt"
35 "io"
36 "log"
37 "math"
38 "net"
39 "net/http"
40 "net/textproto"
41 "net/url"
42 "os"
43 "reflect"
44 "runtime"
45 "strconv"
46 "strings"
47 "sync"
48 "time"
49
50 "golang.org/x/net/http/httpguts"
51 "golang.org/x/net/http2/hpack"
52)
53
54const (
55 prefaceTimeout = 10 * time.Second
56 firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
57 handlerChunkWriteSize = 4 << 10
58 defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
59 maxQueuedControlFrames = 10000
60)
61
62var (
63 errClientDisconnected = errors.New("client disconnected")
64 errClosedBody = errors.New("body closed by handler")
65 errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
66 errStreamClosed = errors.New("http2: stream closed")
67)
68
69var responseWriterStatePool = sync.Pool{
70 New: func() interface{} {
71 rws := &responseWriterState{}
72 rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
73 return rws
74 },
75}
76
77// Test hooks.
78var (
79 testHookOnConn func()
80 testHookGetServerConn func(*serverConn)
81 testHookOnPanicMu *sync.Mutex // nil except in tests
82 testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool)
83)
84
85// Server is an HTTP/2 server.
86type Server struct {
87 // MaxHandlers limits the number of http.Handler ServeHTTP goroutines
88 // which may run at a time over all connections.
89 // Negative or zero no limit.
90 // TODO: implement
91 MaxHandlers int
92
93 // MaxConcurrentStreams optionally specifies the number of
94 // concurrent streams that each client may have open at a
95 // time. This is unrelated to the number of http.Handler goroutines
96 // which may be active globally, which is MaxHandlers.
97 // If zero, MaxConcurrentStreams defaults to at least 100, per
98 // the HTTP/2 spec's recommendations.
99 MaxConcurrentStreams uint32
100
101 // MaxReadFrameSize optionally specifies the largest frame
102 // this server is willing to read. A valid value is between
103 // 16k and 16M, inclusive. If zero or otherwise invalid, a
104 // default value is used.
105 MaxReadFrameSize uint32
106
107 // PermitProhibitedCipherSuites, if true, permits the use of
108 // cipher suites prohibited by the HTTP/2 spec.
109 PermitProhibitedCipherSuites bool
110
111 // IdleTimeout specifies how long until idle clients should be
112 // closed with a GOAWAY frame. PING frames are not considered
113 // activity for the purposes of IdleTimeout.
114 IdleTimeout time.Duration
115
116 // MaxUploadBufferPerConnection is the size of the initial flow
117 // control window for each connections. The HTTP/2 spec does not
118 // allow this to be smaller than 65535 or larger than 2^32-1.
119 // If the value is outside this range, a default value will be
120 // used instead.
121 MaxUploadBufferPerConnection int32
122
123 // MaxUploadBufferPerStream is the size of the initial flow control
124 // window for each stream. The HTTP/2 spec does not allow this to
125 // be larger than 2^32-1. If the value is zero or larger than the
126 // maximum, a default value will be used instead.
127 MaxUploadBufferPerStream int32
128
129 // NewWriteScheduler constructs a write scheduler for a connection.
130 // If nil, a default scheduler is chosen.
131 NewWriteScheduler func() WriteScheduler
132
133 // Internal state. This is a pointer (rather than embedded directly)
134 // so that we don't embed a Mutex in this struct, which will make the
135 // struct non-copyable, which might break some callers.
136 state *serverInternalState
137}
138
139func (s *Server) initialConnRecvWindowSize() int32 {
140 if s.MaxUploadBufferPerConnection > initialWindowSize {
141 return s.MaxUploadBufferPerConnection
142 }
143 return 1 << 20
144}
145
146func (s *Server) initialStreamRecvWindowSize() int32 {
147 if s.MaxUploadBufferPerStream > 0 {
148 return s.MaxUploadBufferPerStream
149 }
150 return 1 << 20
151}
152
153func (s *Server) maxReadFrameSize() uint32 {
154 if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
155 return v
156 }
157 return defaultMaxReadFrameSize
158}
159
160func (s *Server) maxConcurrentStreams() uint32 {
161 if v := s.MaxConcurrentStreams; v > 0 {
162 return v
163 }
164 return defaultMaxStreams
165}
166
167// maxQueuedControlFrames is the maximum number of control frames like
168// SETTINGS, PING and RST_STREAM that will be queued for writing before
169// the connection is closed to prevent memory exhaustion attacks.
170func (s *Server) maxQueuedControlFrames() int {
171 // TODO: if anybody asks, add a Server field, and remember to define the
172 // behavior of negative values.
173 return maxQueuedControlFrames
174}
175
176type serverInternalState struct {
177 mu sync.Mutex
178 activeConns map[*serverConn]struct{}
179}
180
181func (s *serverInternalState) registerConn(sc *serverConn) {
182 if s == nil {
183 return // if the Server was used without calling ConfigureServer
184 }
185 s.mu.Lock()
186 s.activeConns[sc] = struct{}{}
187 s.mu.Unlock()
188}
189
190func (s *serverInternalState) unregisterConn(sc *serverConn) {
191 if s == nil {
192 return // if the Server was used without calling ConfigureServer
193 }
194 s.mu.Lock()
195 delete(s.activeConns, sc)
196 s.mu.Unlock()
197}
198
199func (s *serverInternalState) startGracefulShutdown() {
200 if s == nil {
201 return // if the Server was used without calling ConfigureServer
202 }
203 s.mu.Lock()
204 for sc := range s.activeConns {
205 sc.startGracefulShutdown()
206 }
207 s.mu.Unlock()
208}
209
210// ConfigureServer adds HTTP/2 support to a net/http Server.
211//
212// The configuration conf may be nil.
213//
214// ConfigureServer must be called before s begins serving.
215func ConfigureServer(s *http.Server, conf *Server) error {
216 if s == nil {
217 panic("nil *http.Server")
218 }
219 if conf == nil {
220 conf = new(Server)
221 }
222 conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
223 if h1, h2 := s, conf; h2.IdleTimeout == 0 {
224 if h1.IdleTimeout != 0 {
225 h2.IdleTimeout = h1.IdleTimeout
226 } else {
227 h2.IdleTimeout = h1.ReadTimeout
228 }
229 }
230 s.RegisterOnShutdown(conf.state.startGracefulShutdown)
231
232 if s.TLSConfig == nil {
233 s.TLSConfig = new(tls.Config)
234 } else if s.TLSConfig.CipherSuites != nil {
235 // If they already provided a CipherSuite list, return
236 // an error if it has a bad order or is missing
237 // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
238 haveRequired := false
239 sawBad := false
240 for i, cs := range s.TLSConfig.CipherSuites {
241 switch cs {
242 case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
243 // Alternative MTI cipher to not discourage ECDSA-only servers.
244 // See http://golang.org/cl/30721 for further information.
245 tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
246 haveRequired = true
247 }
248 if isBadCipher(cs) {
249 sawBad = true
250 } else if sawBad {
251 return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", i, cs)
252 }
253 }
254 if !haveRequired {
255 return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher (need at least one of TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 or TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256).")
256 }
257 }
258
259 // Note: not setting MinVersion to tls.VersionTLS12,
260 // as we don't want to interfere with HTTP/1.1 traffic
261 // on the user's server. We enforce TLS 1.2 later once
262 // we accept a connection. Ideally this should be done
263 // during next-proto selection, but using TLS <1.2 with
264 // HTTP/2 is still the client's bug.
265
266 s.TLSConfig.PreferServerCipherSuites = true
267
268 haveNPN := false
269 for _, p := range s.TLSConfig.NextProtos {
270 if p == NextProtoTLS {
271 haveNPN = true
272 break
273 }
274 }
275 if !haveNPN {
276 s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS)
277 }
278
279 if s.TLSNextProto == nil {
280 s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
281 }
282 protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) {
283 if testHookOnConn != nil {
284 testHookOnConn()
285 }
286 // The TLSNextProto interface predates contexts, so
287 // the net/http package passes down its per-connection
288 // base context via an exported but unadvertised
289 // method on the Handler. This is for internal
290 // net/http<=>http2 use only.
291 var ctx context.Context
292 type baseContexter interface {
293 BaseContext() context.Context
294 }
295 if bc, ok := h.(baseContexter); ok {
296 ctx = bc.BaseContext()
297 }
298 conf.ServeConn(c, &ServeConnOpts{
299 Context: ctx,
300 Handler: h,
301 BaseConfig: hs,
302 })
303 }
304 s.TLSNextProto[NextProtoTLS] = protoHandler
305 return nil
306}
307
308// ServeConnOpts are options for the Server.ServeConn method.
309type ServeConnOpts struct {
310 // Context is the base context to use.
311 // If nil, context.Background is used.
312 Context context.Context
313
314 // BaseConfig optionally sets the base configuration
315 // for values. If nil, defaults are used.
316 BaseConfig *http.Server
317
318 // Handler specifies which handler to use for processing
319 // requests. If nil, BaseConfig.Handler is used. If BaseConfig
320 // or BaseConfig.Handler is nil, http.DefaultServeMux is used.
321 Handler http.Handler
322}
323
324func (o *ServeConnOpts) context() context.Context {
325 if o != nil && o.Context != nil {
326 return o.Context
327 }
328 return context.Background()
329}
330
331func (o *ServeConnOpts) baseConfig() *http.Server {
332 if o != nil && o.BaseConfig != nil {
333 return o.BaseConfig
334 }
335 return new(http.Server)
336}
337
338func (o *ServeConnOpts) handler() http.Handler {
339 if o != nil {
340 if o.Handler != nil {
341 return o.Handler
342 }
343 if o.BaseConfig != nil && o.BaseConfig.Handler != nil {
344 return o.BaseConfig.Handler
345 }
346 }
347 return http.DefaultServeMux
348}
349
350// ServeConn serves HTTP/2 requests on the provided connection and
351// blocks until the connection is no longer readable.
352//
353// ServeConn starts speaking HTTP/2 assuming that c has not had any
354// reads or writes. It writes its initial settings frame and expects
355// to be able to read the preface and settings frame from the
356// client. If c has a ConnectionState method like a *tls.Conn, the
357// ConnectionState is used to verify the TLS ciphersuite and to set
358// the Request.TLS field in Handlers.
359//
360// ServeConn does not support h2c by itself. Any h2c support must be
361// implemented in terms of providing a suitably-behaving net.Conn.
362//
363// The opts parameter is optional. If nil, default values are used.
364func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
365 baseCtx, cancel := serverConnBaseContext(c, opts)
366 defer cancel()
367
368 sc := &serverConn{
369 srv: s,
370 hs: opts.baseConfig(),
371 conn: c,
372 baseCtx: baseCtx,
373 remoteAddrStr: c.RemoteAddr().String(),
374 bw: newBufferedWriter(c),
375 handler: opts.handler(),
376 streams: make(map[uint32]*stream),
377 readFrameCh: make(chan readFrameResult),
378 wantWriteFrameCh: make(chan FrameWriteRequest, 8),
379 serveMsgCh: make(chan interface{}, 8),
380 wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
381 bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
382 doneServing: make(chan struct{}),
383 clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
384 advMaxStreams: s.maxConcurrentStreams(),
385 initialStreamSendWindowSize: initialWindowSize,
386 maxFrameSize: initialMaxFrameSize,
387 headerTableSize: initialHeaderTableSize,
388 serveG: newGoroutineLock(),
389 pushEnabled: true,
390 }
391
392 s.state.registerConn(sc)
393 defer s.state.unregisterConn(sc)
394
395 // The net/http package sets the write deadline from the
396 // http.Server.WriteTimeout during the TLS handshake, but then
397 // passes the connection off to us with the deadline already set.
398 // Write deadlines are set per stream in serverConn.newStream.
399 // Disarm the net.Conn write deadline here.
400 if sc.hs.WriteTimeout != 0 {
401 sc.conn.SetWriteDeadline(time.Time{})
402 }
403
404 if s.NewWriteScheduler != nil {
405 sc.writeSched = s.NewWriteScheduler()
406 } else {
407 sc.writeSched = NewRandomWriteScheduler()
408 }
409
410 // These start at the RFC-specified defaults. If there is a higher
411 // configured value for inflow, that will be updated when we send a
412 // WINDOW_UPDATE shortly after sending SETTINGS.
413 sc.flow.add(initialWindowSize)
414 sc.inflow.add(initialWindowSize)
415 sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
416
417 fr := NewFramer(sc.bw, c)
418 fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
419 fr.MaxHeaderListSize = sc.maxHeaderListSize()
420 fr.SetMaxReadFrameSize(s.maxReadFrameSize())
421 sc.framer = fr
422
423 if tc, ok := c.(connectionStater); ok {
424 sc.tlsState = new(tls.ConnectionState)
425 *sc.tlsState = tc.ConnectionState()
426 // 9.2 Use of TLS Features
427 // An implementation of HTTP/2 over TLS MUST use TLS
428 // 1.2 or higher with the restrictions on feature set
429 // and cipher suite described in this section. Due to
430 // implementation limitations, it might not be
431 // possible to fail TLS negotiation. An endpoint MUST
432 // immediately terminate an HTTP/2 connection that
433 // does not meet the TLS requirements described in
434 // this section with a connection error (Section
435 // 5.4.1) of type INADEQUATE_SECURITY.
436 if sc.tlsState.Version < tls.VersionTLS12 {
437 sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low")
438 return
439 }
440
441 if sc.tlsState.ServerName == "" {
442 // Client must use SNI, but we don't enforce that anymore,
443 // since it was causing problems when connecting to bare IP
444 // addresses during development.
445 //
446 // TODO: optionally enforce? Or enforce at the time we receive
447 // a new request, and verify the ServerName matches the :authority?
448 // But that precludes proxy situations, perhaps.
449 //
450 // So for now, do nothing here again.
451 }
452
453 if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
454 // "Endpoints MAY choose to generate a connection error
455 // (Section 5.4.1) of type INADEQUATE_SECURITY if one of
456 // the prohibited cipher suites are negotiated."
457 //
458 // We choose that. In my opinion, the spec is weak
459 // here. It also says both parties must support at least
460 // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no
461 // excuses here. If we really must, we could allow an
462 // "AllowInsecureWeakCiphers" option on the server later.
463 // Let's see how it plays out first.
464 sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite))
465 return
466 }
467 }
468
469 if hook := testHookGetServerConn; hook != nil {
470 hook(sc)
471 }
472 sc.serve()
473}
474
475func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) {
476 ctx, cancel = context.WithCancel(opts.context())
477 ctx = context.WithValue(ctx, http.LocalAddrContextKey, c.LocalAddr())
478 if hs := opts.baseConfig(); hs != nil {
479 ctx = context.WithValue(ctx, http.ServerContextKey, hs)
480 }
481 return
482}
483
484func (sc *serverConn) rejectConn(err ErrCode, debug string) {
485 sc.vlogf("http2: server rejecting conn: %v, %s", err, debug)
486 // ignoring errors. hanging up anyway.
487 sc.framer.WriteGoAway(0, err, []byte(debug))
488 sc.bw.Flush()
489 sc.conn.Close()
490}
491
492type serverConn struct {
493 // Immutable:
494 srv *Server
495 hs *http.Server
496 conn net.Conn
497 bw *bufferedWriter // writing to conn
498 handler http.Handler
499 baseCtx context.Context
500 framer *Framer
501 doneServing chan struct{} // closed when serverConn.serve ends
502 readFrameCh chan readFrameResult // written by serverConn.readFrames
503 wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
504 wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
505 bodyReadCh chan bodyReadMsg // from handlers -> serve
506 serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
507 flow flow // conn-wide (not stream-specific) outbound flow control
508 inflow flow // conn-wide inbound flow control
509 tlsState *tls.ConnectionState // shared by all handlers, like net/http
510 remoteAddrStr string
511 writeSched WriteScheduler
512
513 // Everything following is owned by the serve loop; use serveG.check():
514 serveG goroutineLock // used to verify funcs are on serve()
515 pushEnabled bool
516 sawFirstSettings bool // got the initial SETTINGS frame after the preface
517 needToSendSettingsAck bool
518 unackedSettings int // how many SETTINGS have we sent without ACKs?
519 queuedControlFrames int // control frames in the writeSched queue
520 clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
521 advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
522 curClientStreams uint32 // number of open streams initiated by the client
523 curPushedStreams uint32 // number of open streams initiated by server push
524 maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
525 maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
526 streams map[uint32]*stream
527 initialStreamSendWindowSize int32
528 maxFrameSize int32
529 headerTableSize uint32
530 peerMaxHeaderListSize uint32 // zero means unknown (default)
531 canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
532 writingFrame bool // started writing a frame (on serve goroutine or separate)
533 writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
534 needsFrameFlush bool // last frame write wasn't a flush
535 inGoAway bool // we've started to or sent GOAWAY
536 inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
537 needToSendGoAway bool // we need to schedule a GOAWAY frame write
538 goAwayCode ErrCode
539 shutdownTimer *time.Timer // nil until used
540 idleTimer *time.Timer // nil if unused
541
542 // Owned by the writeFrameAsync goroutine:
543 headerWriteBuf bytes.Buffer
544 hpackEncoder *hpack.Encoder
545
546 // Used by startGracefulShutdown.
547 shutdownOnce sync.Once
548}
549
550func (sc *serverConn) maxHeaderListSize() uint32 {
551 n := sc.hs.MaxHeaderBytes
552 if n <= 0 {
553 n = http.DefaultMaxHeaderBytes
554 }
555 // http2's count is in a slightly different unit and includes 32 bytes per pair.
556 // So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
557 const perFieldOverhead = 32 // per http2 spec
558 const typicalHeaders = 10 // conservative
559 return uint32(n + typicalHeaders*perFieldOverhead)
560}
561
562func (sc *serverConn) curOpenStreams() uint32 {
563 sc.serveG.check()
564 return sc.curClientStreams + sc.curPushedStreams
565}
566
567// stream represents a stream. This is the minimal metadata needed by
568// the serve goroutine. Most of the actual stream state is owned by
569// the http.Handler's goroutine in the responseWriter. Because the
570// responseWriter's responseWriterState is recycled at the end of a
571// handler, this struct intentionally has no pointer to the
572// *responseWriter{,State} itself, as the Handler ending nils out the
573// responseWriter's state field.
574type stream struct {
575 // immutable:
576 sc *serverConn
577 id uint32
578 body *pipe // non-nil if expecting DATA frames
579 cw closeWaiter // closed wait stream transitions to closed state
580 ctx context.Context
581 cancelCtx func()
582
583 // owned by serverConn's serve loop:
584 bodyBytes int64 // body bytes seen so far
585 declBodyBytes int64 // or -1 if undeclared
586 flow flow // limits writing from Handler to client
587 inflow flow // what the client is allowed to POST/etc to us
588 state streamState
589 resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
590 gotTrailerHeader bool // HEADER frame for trailers was seen
591 wroteHeaders bool // whether we wrote headers (not status 100)
592 writeDeadline *time.Timer // nil if unused
593
594 trailer http.Header // accumulated trailers
595 reqTrailer http.Header // handler's Request.Trailer
596}
597
598func (sc *serverConn) Framer() *Framer { return sc.framer }
599func (sc *serverConn) CloseConn() error { return sc.conn.Close() }
600func (sc *serverConn) Flush() error { return sc.bw.Flush() }
601func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
602 return sc.hpackEncoder, &sc.headerWriteBuf
603}
604
605func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
606 sc.serveG.check()
607 // http://tools.ietf.org/html/rfc7540#section-5.1
608 if st, ok := sc.streams[streamID]; ok {
609 return st.state, st
610 }
611 // "The first use of a new stream identifier implicitly closes all
612 // streams in the "idle" state that might have been initiated by
613 // that peer with a lower-valued stream identifier. For example, if
614 // a client sends a HEADERS frame on stream 7 without ever sending a
615 // frame on stream 5, then stream 5 transitions to the "closed"
616 // state when the first frame for stream 7 is sent or received."
617 if streamID%2 == 1 {
618 if streamID <= sc.maxClientStreamID {
619 return stateClosed, nil
620 }
621 } else {
622 if streamID <= sc.maxPushPromiseID {
623 return stateClosed, nil
624 }
625 }
626 return stateIdle, nil
627}
628
629// setConnState calls the net/http ConnState hook for this connection, if configured.
630// Note that the net/http package does StateNew and StateClosed for us.
631// There is currently no plan for StateHijacked or hijacking HTTP/2 connections.
632func (sc *serverConn) setConnState(state http.ConnState) {
633 if sc.hs.ConnState != nil {
634 sc.hs.ConnState(sc.conn, state)
635 }
636}
637
638func (sc *serverConn) vlogf(format string, args ...interface{}) {
639 if VerboseLogs {
640 sc.logf(format, args...)
641 }
642}
643
644func (sc *serverConn) logf(format string, args ...interface{}) {
645 if lg := sc.hs.ErrorLog; lg != nil {
646 lg.Printf(format, args...)
647 } else {
648 log.Printf(format, args...)
649 }
650}
651
652// errno returns v's underlying uintptr, else 0.
653//
654// TODO: remove this helper function once http2 can use build
655// tags. See comment in isClosedConnError.
656func errno(v error) uintptr {
657 if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr {
658 return uintptr(rv.Uint())
659 }
660 return 0
661}
662
663// isClosedConnError reports whether err is an error from use of a closed
664// network connection.
665func isClosedConnError(err error) bool {
666 if err == nil {
667 return false
668 }
669
670 // TODO: remove this string search and be more like the Windows
671 // case below. That might involve modifying the standard library
672 // to return better error types.
673 str := err.Error()
674 if strings.Contains(str, "use of closed network connection") {
675 return true
676 }
677
678 // TODO(bradfitz): x/tools/cmd/bundle doesn't really support
679 // build tags, so I can't make an http2_windows.go file with
680 // Windows-specific stuff. Fix that and move this, once we
681 // have a way to bundle this into std's net/http somehow.
682 if runtime.GOOS == "windows" {
683 if oe, ok := err.(*net.OpError); ok && oe.Op == "read" {
684 if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" {
685 const WSAECONNABORTED = 10053
686 const WSAECONNRESET = 10054
687 if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED {
688 return true
689 }
690 }
691 }
692 }
693 return false
694}
695
696func (sc *serverConn) condlogf(err error, format string, args ...interface{}) {
697 if err == nil {
698 return
699 }
700 if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout {
701 // Boring, expected errors.
702 sc.vlogf(format, args...)
703 } else {
704 sc.logf(format, args...)
705 }
706}
707
708func (sc *serverConn) canonicalHeader(v string) string {
709 sc.serveG.check()
710 buildCommonHeaderMapsOnce()
711 cv, ok := commonCanonHeader[v]
712 if ok {
713 return cv
714 }
715 cv, ok = sc.canonHeader[v]
716 if ok {
717 return cv
718 }
719 if sc.canonHeader == nil {
720 sc.canonHeader = make(map[string]string)
721 }
722 cv = http.CanonicalHeaderKey(v)
723 sc.canonHeader[v] = cv
724 return cv
725}
726
727type readFrameResult struct {
728 f Frame // valid until readMore is called
729 err error
730
731 // readMore should be called once the consumer no longer needs or
732 // retains f. After readMore, f is invalid and more frames can be
733 // read.
734 readMore func()
735}
736
737// readFrames is the loop that reads incoming frames.
738// It takes care to only read one frame at a time, blocking until the
739// consumer is done with the frame.
740// It's run on its own goroutine.
741func (sc *serverConn) readFrames() {
742 gate := make(gate)
743 gateDone := gate.Done
744 for {
745 f, err := sc.framer.ReadFrame()
746 select {
747 case sc.readFrameCh <- readFrameResult{f, err, gateDone}:
748 case <-sc.doneServing:
749 return
750 }
751 select {
752 case <-gate:
753 case <-sc.doneServing:
754 return
755 }
756 if terminalReadFrameError(err) {
757 return
758 }
759 }
760}
761
762// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine.
763type frameWriteResult struct {
764 _ incomparable
765 wr FrameWriteRequest // what was written (or attempted)
766 err error // result of the writeFrame call
767}
768
769// writeFrameAsync runs in its own goroutine and writes a single frame
770// and then reports when it's done.
771// At most one goroutine can be running writeFrameAsync at a time per
772// serverConn.
773func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) {
774 err := wr.write.writeFrame(sc)
775 sc.wroteFrameCh <- frameWriteResult{wr: wr, err: err}
776}
777
778func (sc *serverConn) closeAllStreamsOnConnClose() {
779 sc.serveG.check()
780 for _, st := range sc.streams {
781 sc.closeStream(st, errClientDisconnected)
782 }
783}
784
785func (sc *serverConn) stopShutdownTimer() {
786 sc.serveG.check()
787 if t := sc.shutdownTimer; t != nil {
788 t.Stop()
789 }
790}
791
792func (sc *serverConn) notePanic() {
793 // Note: this is for serverConn.serve panicking, not http.Handler code.
794 if testHookOnPanicMu != nil {
795 testHookOnPanicMu.Lock()
796 defer testHookOnPanicMu.Unlock()
797 }
798 if testHookOnPanic != nil {
799 if e := recover(); e != nil {
800 if testHookOnPanic(sc, e) {
801 panic(e)
802 }
803 }
804 }
805}
806
807func (sc *serverConn) serve() {
808 sc.serveG.check()
809 defer sc.notePanic()
810 defer sc.conn.Close()
811 defer sc.closeAllStreamsOnConnClose()
812 defer sc.stopShutdownTimer()
813 defer close(sc.doneServing) // unblocks handlers trying to send
814
815 if VerboseLogs {
816 sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
817 }
818
819 sc.writeFrame(FrameWriteRequest{
820 write: writeSettings{
821 {SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
822 {SettingMaxConcurrentStreams, sc.advMaxStreams},
823 {SettingMaxHeaderListSize, sc.maxHeaderListSize()},
824 {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
825 },
826 })
827 sc.unackedSettings++
828
829 // Each connection starts with intialWindowSize inflow tokens.
830 // If a higher value is configured, we add more tokens.
831 if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
832 sc.sendWindowUpdate(nil, int(diff))
833 }
834
835 if err := sc.readPreface(); err != nil {
836 sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
837 return
838 }
839 // Now that we've got the preface, get us out of the
840 // "StateNew" state. We can't go directly to idle, though.
841 // Active means we read some data and anticipate a request. We'll
842 // do another Active when we get a HEADERS frame.
843 sc.setConnState(http.StateActive)
844 sc.setConnState(http.StateIdle)
845
846 if sc.srv.IdleTimeout != 0 {
847 sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
848 defer sc.idleTimer.Stop()
849 }
850
851 go sc.readFrames() // closed by defer sc.conn.Close above
852
853 settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
854 defer settingsTimer.Stop()
855
856 loopNum := 0
857 for {
858 loopNum++
859 select {
860 case wr := <-sc.wantWriteFrameCh:
861 if se, ok := wr.write.(StreamError); ok {
862 sc.resetStream(se)
863 break
864 }
865 sc.writeFrame(wr)
866 case res := <-sc.wroteFrameCh:
867 sc.wroteFrame(res)
868 case res := <-sc.readFrameCh:
869 if !sc.processFrameFromReader(res) {
870 return
871 }
872 res.readMore()
873 if settingsTimer != nil {
874 settingsTimer.Stop()
875 settingsTimer = nil
876 }
877 case m := <-sc.bodyReadCh:
878 sc.noteBodyRead(m.st, m.n)
879 case msg := <-sc.serveMsgCh:
880 switch v := msg.(type) {
881 case func(int):
882 v(loopNum) // for testing
883 case *serverMessage:
884 switch v {
885 case settingsTimerMsg:
886 sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
887 return
888 case idleTimerMsg:
889 sc.vlogf("connection is idle")
890 sc.goAway(ErrCodeNo)
891 case shutdownTimerMsg:
892 sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
893 return
894 case gracefulShutdownMsg:
895 sc.startGracefulShutdownInternal()
896 default:
897 panic("unknown timer")
898 }
899 case *startPushRequest:
900 sc.startPush(v)
901 default:
902 panic(fmt.Sprintf("unexpected type %T", v))
903 }
904 }
905
906 // If the peer is causing us to generate a lot of control frames,
907 // but not reading them from us, assume they are trying to make us
908 // run out of memory.
909 if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() {
910 sc.vlogf("http2: too many control frames in send queue, closing connection")
911 return
912 }
913
914 // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY
915 // with no error code (graceful shutdown), don't start the timer until
916 // all open streams have been completed.
917 sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame
918 gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0
919 if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) {
920 sc.shutDownIn(goAwayTimeout)
921 }
922 }
923}
924
925func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
926 select {
927 case <-sc.doneServing:
928 case <-sharedCh:
929 close(privateCh)
930 }
931}
932
933type serverMessage int
934
935// Message values sent to serveMsgCh.
936var (
937 settingsTimerMsg = new(serverMessage)
938 idleTimerMsg = new(serverMessage)
939 shutdownTimerMsg = new(serverMessage)
940 gracefulShutdownMsg = new(serverMessage)
941)
942
943func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
944func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) }
945func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
946
947func (sc *serverConn) sendServeMsg(msg interface{}) {
948 sc.serveG.checkNotOn() // NOT
949 select {
950 case sc.serveMsgCh <- msg:
951 case <-sc.doneServing:
952 }
953}
954
955var errPrefaceTimeout = errors.New("timeout waiting for client preface")
956
957// readPreface reads the ClientPreface greeting from the peer or
958// returns errPrefaceTimeout on timeout, or an error if the greeting
959// is invalid.
960func (sc *serverConn) readPreface() error {
961 errc := make(chan error, 1)
962 go func() {
963 // Read the client preface
964 buf := make([]byte, len(ClientPreface))
965 if _, err := io.ReadFull(sc.conn, buf); err != nil {
966 errc <- err
967 } else if !bytes.Equal(buf, clientPreface) {
968 errc <- fmt.Errorf("bogus greeting %q", buf)
969 } else {
970 errc <- nil
971 }
972 }()
973 timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
974 defer timer.Stop()
975 select {
976 case <-timer.C:
977 return errPrefaceTimeout
978 case err := <-errc:
979 if err == nil {
980 if VerboseLogs {
981 sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr())
982 }
983 }
984 return err
985 }
986}
987
988var errChanPool = sync.Pool{
989 New: func() interface{} { return make(chan error, 1) },
990}
991
992var writeDataPool = sync.Pool{
993 New: func() interface{} { return new(writeData) },
994}
995
996// writeDataFromHandler writes DATA response frames from a handler on
997// the given stream.
998func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
999 ch := errChanPool.Get().(chan error)
1000 writeArg := writeDataPool.Get().(*writeData)
1001 *writeArg = writeData{stream.id, data, endStream}
1002 err := sc.writeFrameFromHandler(FrameWriteRequest{
1003 write: writeArg,
1004 stream: stream,
1005 done: ch,
1006 })
1007 if err != nil {
1008 return err
1009 }
1010 var frameWriteDone bool // the frame write is done (successfully or not)
1011 select {
1012 case err = <-ch:
1013 frameWriteDone = true
1014 case <-sc.doneServing:
1015 return errClientDisconnected
1016 case <-stream.cw:
1017 // If both ch and stream.cw were ready (as might
1018 // happen on the final Write after an http.Handler
1019 // ends), prefer the write result. Otherwise this
1020 // might just be us successfully closing the stream.
1021 // The writeFrameAsync and serve goroutines guarantee
1022 // that the ch send will happen before the stream.cw
1023 // close.
1024 select {
1025 case err = <-ch:
1026 frameWriteDone = true
1027 default:
1028 return errStreamClosed
1029 }
1030 }
1031 errChanPool.Put(ch)
1032 if frameWriteDone {
1033 writeDataPool.Put(writeArg)
1034 }
1035 return err
1036}
1037
1038// writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts
1039// if the connection has gone away.
1040//
1041// This must not be run from the serve goroutine itself, else it might
1042// deadlock writing to sc.wantWriteFrameCh (which is only mildly
1043// buffered and is read by serve itself). If you're on the serve
1044// goroutine, call writeFrame instead.
1045func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
1046 sc.serveG.checkNotOn() // NOT
1047 select {
1048 case sc.wantWriteFrameCh <- wr:
1049 return nil
1050 case <-sc.doneServing:
1051 // Serve loop is gone.
1052 // Client has closed their connection to the server.
1053 return errClientDisconnected
1054 }
1055}
1056
1057// writeFrame schedules a frame to write and sends it if there's nothing
1058// already being written.
1059//
1060// There is no pushback here (the serve goroutine never blocks). It's
1061// the http.Handlers that block, waiting for their previous frames to
1062// make it onto the wire
1063//
1064// If you're not on the serve goroutine, use writeFrameFromHandler instead.
1065func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
1066 sc.serveG.check()
1067
1068 // If true, wr will not be written and wr.done will not be signaled.
1069 var ignoreWrite bool
1070
1071 // We are not allowed to write frames on closed streams. RFC 7540 Section
1072 // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
1073 // a closed stream." Our server never sends PRIORITY, so that exception
1074 // does not apply.
1075 //
1076 // The serverConn might close an open stream while the stream's handler
1077 // is still running. For example, the server might close a stream when it
1078 // receives bad data from the client. If this happens, the handler might
1079 // attempt to write a frame after the stream has been closed (since the
1080 // handler hasn't yet been notified of the close). In this case, we simply
1081 // ignore the frame. The handler will notice that the stream is closed when
1082 // it waits for the frame to be written.
1083 //
1084 // As an exception to this rule, we allow sending RST_STREAM after close.
1085 // This allows us to immediately reject new streams without tracking any
1086 // state for those streams (except for the queued RST_STREAM frame). This
1087 // may result in duplicate RST_STREAMs in some cases, but the client should
1088 // ignore those.
1089 if wr.StreamID() != 0 {
1090 _, isReset := wr.write.(StreamError)
1091 if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
1092 ignoreWrite = true
1093 }
1094 }
1095
1096 // Don't send a 100-continue response if we've already sent headers.
1097 // See golang.org/issue/14030.
1098 switch wr.write.(type) {
1099 case *writeResHeaders:
1100 wr.stream.wroteHeaders = true
1101 case write100ContinueHeadersFrame:
1102 if wr.stream.wroteHeaders {
1103 // We do not need to notify wr.done because this frame is
1104 // never written with wr.done != nil.
1105 if wr.done != nil {
1106 panic("wr.done != nil for write100ContinueHeadersFrame")
1107 }
1108 ignoreWrite = true
1109 }
1110 }
1111
1112 if !ignoreWrite {
1113 if wr.isControl() {
1114 sc.queuedControlFrames++
1115 // For extra safety, detect wraparounds, which should not happen,
1116 // and pull the plug.
1117 if sc.queuedControlFrames < 0 {
1118 sc.conn.Close()
1119 }
1120 }
1121 sc.writeSched.Push(wr)
1122 }
1123 sc.scheduleFrameWrite()
1124}
1125
1126// startFrameWrite starts a goroutine to write wr (in a separate
1127// goroutine since that might block on the network), and updates the
1128// serve goroutine's state about the world, updated from info in wr.
1129func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
1130 sc.serveG.check()
1131 if sc.writingFrame {
1132 panic("internal error: can only be writing one frame at a time")
1133 }
1134
1135 st := wr.stream
1136 if st != nil {
1137 switch st.state {
1138 case stateHalfClosedLocal:
1139 switch wr.write.(type) {
1140 case StreamError, handlerPanicRST, writeWindowUpdate:
1141 // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
1142 // in this state. (We never send PRIORITY from the server, so that is not checked.)
1143 default:
1144 panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
1145 }
1146 case stateClosed:
1147 panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr))
1148 }
1149 }
1150 if wpp, ok := wr.write.(*writePushPromise); ok {
1151 var err error
1152 wpp.promisedID, err = wpp.allocatePromisedID()
1153 if err != nil {
1154 sc.writingFrameAsync = false
1155 wr.replyToWriter(err)
1156 return
1157 }
1158 }
1159
1160 sc.writingFrame = true
1161 sc.needsFrameFlush = true
1162 if wr.write.staysWithinBuffer(sc.bw.Available()) {
1163 sc.writingFrameAsync = false
1164 err := wr.write.writeFrame(sc)
1165 sc.wroteFrame(frameWriteResult{wr: wr, err: err})
1166 } else {
1167 sc.writingFrameAsync = true
1168 go sc.writeFrameAsync(wr)
1169 }
1170}
1171
1172// errHandlerPanicked is the error given to any callers blocked in a read from
1173// Request.Body when the main goroutine panics. Since most handlers read in the
1174// main ServeHTTP goroutine, this will show up rarely.
1175var errHandlerPanicked = errors.New("http2: handler panicked")
1176
1177// wroteFrame is called on the serve goroutine with the result of
1178// whatever happened on writeFrameAsync.
1179func (sc *serverConn) wroteFrame(res frameWriteResult) {
1180 sc.serveG.check()
1181 if !sc.writingFrame {
1182 panic("internal error: expected to be already writing a frame")
1183 }
1184 sc.writingFrame = false
1185 sc.writingFrameAsync = false
1186
1187 wr := res.wr
1188
1189 if writeEndsStream(wr.write) {
1190 st := wr.stream
1191 if st == nil {
1192 panic("internal error: expecting non-nil stream")
1193 }
1194 switch st.state {
1195 case stateOpen:
1196 // Here we would go to stateHalfClosedLocal in
1197 // theory, but since our handler is done and
1198 // the net/http package provides no mechanism
1199 // for closing a ResponseWriter while still
1200 // reading data (see possible TODO at top of
1201 // this file), we go into closed state here
1202 // anyway, after telling the peer we're
1203 // hanging up on them. We'll transition to
1204 // stateClosed after the RST_STREAM frame is
1205 // written.
1206 st.state = stateHalfClosedLocal
1207 // Section 8.1: a server MAY request that the client abort
1208 // transmission of a request without error by sending a
1209 // RST_STREAM with an error code of NO_ERROR after sending
1210 // a complete response.
1211 sc.resetStream(streamError(st.id, ErrCodeNo))
1212 case stateHalfClosedRemote:
1213 sc.closeStream(st, errHandlerComplete)
1214 }
1215 } else {
1216 switch v := wr.write.(type) {
1217 case StreamError:
1218 // st may be unknown if the RST_STREAM was generated to reject bad input.
1219 if st, ok := sc.streams[v.StreamID]; ok {
1220 sc.closeStream(st, v)
1221 }
1222 case handlerPanicRST:
1223 sc.closeStream(wr.stream, errHandlerPanicked)
1224 }
1225 }
1226
1227 // Reply (if requested) to unblock the ServeHTTP goroutine.
1228 wr.replyToWriter(res.err)
1229
1230 sc.scheduleFrameWrite()
1231}
1232
1233// scheduleFrameWrite tickles the frame writing scheduler.
1234//
1235// If a frame is already being written, nothing happens. This will be called again
1236// when the frame is done being written.
1237//
1238// If a frame isn't being written and we need to send one, the best frame
1239// to send is selected by writeSched.
1240//
1241// If a frame isn't being written and there's nothing else to send, we
1242// flush the write buffer.
1243func (sc *serverConn) scheduleFrameWrite() {
1244 sc.serveG.check()
1245 if sc.writingFrame || sc.inFrameScheduleLoop {
1246 return
1247 }
1248 sc.inFrameScheduleLoop = true
1249 for !sc.writingFrameAsync {
1250 if sc.needToSendGoAway {
1251 sc.needToSendGoAway = false
1252 sc.startFrameWrite(FrameWriteRequest{
1253 write: &writeGoAway{
1254 maxStreamID: sc.maxClientStreamID,
1255 code: sc.goAwayCode,
1256 },
1257 })
1258 continue
1259 }
1260 if sc.needToSendSettingsAck {
1261 sc.needToSendSettingsAck = false
1262 sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
1263 continue
1264 }
1265 if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
1266 if wr, ok := sc.writeSched.Pop(); ok {
1267 if wr.isControl() {
1268 sc.queuedControlFrames--
1269 }
1270 sc.startFrameWrite(wr)
1271 continue
1272 }
1273 }
1274 if sc.needsFrameFlush {
1275 sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
1276 sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
1277 continue
1278 }
1279 break
1280 }
1281 sc.inFrameScheduleLoop = false
1282}
1283
1284// startGracefulShutdown gracefully shuts down a connection. This
1285// sends GOAWAY with ErrCodeNo to tell the client we're gracefully
1286// shutting down. The connection isn't closed until all current
1287// streams are done.
1288//
1289// startGracefulShutdown returns immediately; it does not wait until
1290// the connection has shut down.
1291func (sc *serverConn) startGracefulShutdown() {
1292 sc.serveG.checkNotOn() // NOT
1293 sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) })
1294}
1295
1296// After sending GOAWAY with an error code (non-graceful shutdown), the
1297// connection will close after goAwayTimeout.
1298//
1299// If we close the connection immediately after sending GOAWAY, there may
1300// be unsent data in our kernel receive buffer, which will cause the kernel
1301// to send a TCP RST on close() instead of a FIN. This RST will abort the
1302// connection immediately, whether or not the client had received the GOAWAY.
1303//
1304// Ideally we should delay for at least 1 RTT + epsilon so the client has
1305// a chance to read the GOAWAY and stop sending messages. Measuring RTT
1306// is hard, so we approximate with 1 second. See golang.org/issue/18701.
1307//
1308// This is a var so it can be shorter in tests, where all requests uses the
1309// loopback interface making the expected RTT very small.
1310//
1311// TODO: configurable?
1312var goAwayTimeout = 1 * time.Second
1313
1314func (sc *serverConn) startGracefulShutdownInternal() {
1315 sc.goAway(ErrCodeNo)
1316}
1317
1318func (sc *serverConn) goAway(code ErrCode) {
1319 sc.serveG.check()
1320 if sc.inGoAway {
1321 return
1322 }
1323 sc.inGoAway = true
1324 sc.needToSendGoAway = true
1325 sc.goAwayCode = code
1326 sc.scheduleFrameWrite()
1327}
1328
1329func (sc *serverConn) shutDownIn(d time.Duration) {
1330 sc.serveG.check()
1331 sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
1332}
1333
1334func (sc *serverConn) resetStream(se StreamError) {
1335 sc.serveG.check()
1336 sc.writeFrame(FrameWriteRequest{write: se})
1337 if st, ok := sc.streams[se.StreamID]; ok {
1338 st.resetQueued = true
1339 }
1340}
1341
1342// processFrameFromReader processes the serve loop's read from readFrameCh from the
1343// frame-reading goroutine.
1344// processFrameFromReader returns whether the connection should be kept open.
1345func (sc *serverConn) processFrameFromReader(res readFrameResult) bool {
1346 sc.serveG.check()
1347 err := res.err
1348 if err != nil {
1349 if err == ErrFrameTooLarge {
1350 sc.goAway(ErrCodeFrameSize)
1351 return true // goAway will close the loop
1352 }
1353 clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err)
1354 if clientGone {
1355 // TODO: could we also get into this state if
1356 // the peer does a half close
1357 // (e.g. CloseWrite) because they're done
1358 // sending frames but they're still wanting
1359 // our open replies? Investigate.
1360 // TODO: add CloseWrite to crypto/tls.Conn first
1361 // so we have a way to test this? I suppose
1362 // just for testing we could have a non-TLS mode.
1363 return false
1364 }
1365 } else {
1366 f := res.f
1367 if VerboseLogs {
1368 sc.vlogf("http2: server read frame %v", summarizeFrame(f))
1369 }
1370 err = sc.processFrame(f)
1371 if err == nil {
1372 return true
1373 }
1374 }
1375
1376 switch ev := err.(type) {
1377 case StreamError:
1378 sc.resetStream(ev)
1379 return true
1380 case goAwayFlowError:
1381 sc.goAway(ErrCodeFlowControl)
1382 return true
1383 case ConnectionError:
1384 sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
1385 sc.goAway(ErrCode(ev))
1386 return true // goAway will handle shutdown
1387 default:
1388 if res.err != nil {
1389 sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
1390 } else {
1391 sc.logf("http2: server closing client connection: %v", err)
1392 }
1393 return false
1394 }
1395}
1396
1397func (sc *serverConn) processFrame(f Frame) error {
1398 sc.serveG.check()
1399
1400 // First frame received must be SETTINGS.
1401 if !sc.sawFirstSettings {
1402 if _, ok := f.(*SettingsFrame); !ok {
1403 return ConnectionError(ErrCodeProtocol)
1404 }
1405 sc.sawFirstSettings = true
1406 }
1407
1408 switch f := f.(type) {
1409 case *SettingsFrame:
1410 return sc.processSettings(f)
1411 case *MetaHeadersFrame:
1412 return sc.processHeaders(f)
1413 case *WindowUpdateFrame:
1414 return sc.processWindowUpdate(f)
1415 case *PingFrame:
1416 return sc.processPing(f)
1417 case *DataFrame:
1418 return sc.processData(f)
1419 case *RSTStreamFrame:
1420 return sc.processResetStream(f)
1421 case *PriorityFrame:
1422 return sc.processPriority(f)
1423 case *GoAwayFrame:
1424 return sc.processGoAway(f)
1425 case *PushPromiseFrame:
1426 // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
1427 // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1428 return ConnectionError(ErrCodeProtocol)
1429 default:
1430 sc.vlogf("http2: server ignoring frame: %v", f.Header())
1431 return nil
1432 }
1433}
1434
1435func (sc *serverConn) processPing(f *PingFrame) error {
1436 sc.serveG.check()
1437 if f.IsAck() {
1438 // 6.7 PING: " An endpoint MUST NOT respond to PING frames
1439 // containing this flag."
1440 return nil
1441 }
1442 if f.StreamID != 0 {
1443 // "PING frames are not associated with any individual
1444 // stream. If a PING frame is received with a stream
1445 // identifier field value other than 0x0, the recipient MUST
1446 // respond with a connection error (Section 5.4.1) of type
1447 // PROTOCOL_ERROR."
1448 return ConnectionError(ErrCodeProtocol)
1449 }
1450 if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1451 return nil
1452 }
1453 sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
1454 return nil
1455}
1456
1457func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
1458 sc.serveG.check()
1459 switch {
1460 case f.StreamID != 0: // stream-level flow control
1461 state, st := sc.state(f.StreamID)
1462 if state == stateIdle {
1463 // Section 5.1: "Receiving any frame other than HEADERS
1464 // or PRIORITY on a stream in this state MUST be
1465 // treated as a connection error (Section 5.4.1) of
1466 // type PROTOCOL_ERROR."
1467 return ConnectionError(ErrCodeProtocol)
1468 }
1469 if st == nil {
1470 // "WINDOW_UPDATE can be sent by a peer that has sent a
1471 // frame bearing the END_STREAM flag. This means that a
1472 // receiver could receive a WINDOW_UPDATE frame on a "half
1473 // closed (remote)" or "closed" stream. A receiver MUST
1474 // NOT treat this as an error, see Section 5.1."
1475 return nil
1476 }
1477 if !st.flow.add(int32(f.Increment)) {
1478 return streamError(f.StreamID, ErrCodeFlowControl)
1479 }
1480 default: // connection-level flow control
1481 if !sc.flow.add(int32(f.Increment)) {
1482 return goAwayFlowError{}
1483 }
1484 }
1485 sc.scheduleFrameWrite()
1486 return nil
1487}
1488
1489func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
1490 sc.serveG.check()
1491
1492 state, st := sc.state(f.StreamID)
1493 if state == stateIdle {
1494 // 6.4 "RST_STREAM frames MUST NOT be sent for a
1495 // stream in the "idle" state. If a RST_STREAM frame
1496 // identifying an idle stream is received, the
1497 // recipient MUST treat this as a connection error
1498 // (Section 5.4.1) of type PROTOCOL_ERROR.
1499 return ConnectionError(ErrCodeProtocol)
1500 }
1501 if st != nil {
1502 st.cancelCtx()
1503 sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
1504 }
1505 return nil
1506}
1507
1508func (sc *serverConn) closeStream(st *stream, err error) {
1509 sc.serveG.check()
1510 if st.state == stateIdle || st.state == stateClosed {
1511 panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
1512 }
1513 st.state = stateClosed
1514 if st.writeDeadline != nil {
1515 st.writeDeadline.Stop()
1516 }
1517 if st.isPushed() {
1518 sc.curPushedStreams--
1519 } else {
1520 sc.curClientStreams--
1521 }
1522 delete(sc.streams, st.id)
1523 if len(sc.streams) == 0 {
1524 sc.setConnState(http.StateIdle)
1525 if sc.srv.IdleTimeout != 0 {
1526 sc.idleTimer.Reset(sc.srv.IdleTimeout)
1527 }
1528 if h1ServerKeepAlivesDisabled(sc.hs) {
1529 sc.startGracefulShutdownInternal()
1530 }
1531 }
1532 if p := st.body; p != nil {
1533 // Return any buffered unread bytes worth of conn-level flow control.
1534 // See golang.org/issue/16481
1535 sc.sendWindowUpdate(nil, p.Len())
1536
1537 p.CloseWithError(err)
1538 }
1539 st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
1540 sc.writeSched.CloseStream(st.id)
1541}
1542
1543func (sc *serverConn) processSettings(f *SettingsFrame) error {
1544 sc.serveG.check()
1545 if f.IsAck() {
1546 sc.unackedSettings--
1547 if sc.unackedSettings < 0 {
1548 // Why is the peer ACKing settings we never sent?
1549 // The spec doesn't mention this case, but
1550 // hang up on them anyway.
1551 return ConnectionError(ErrCodeProtocol)
1552 }
1553 return nil
1554 }
1555 if f.NumSettings() > 100 || f.HasDuplicates() {
1556 // This isn't actually in the spec, but hang up on
1557 // suspiciously large settings frames or those with
1558 // duplicate entries.
1559 return ConnectionError(ErrCodeProtocol)
1560 }
1561 if err := f.ForeachSetting(sc.processSetting); err != nil {
1562 return err
1563 }
1564 // TODO: judging by RFC 7540, Section 6.5.3 each SETTINGS frame should be
1565 // acknowledged individually, even if multiple are received before the ACK.
1566 sc.needToSendSettingsAck = true
1567 sc.scheduleFrameWrite()
1568 return nil
1569}
1570
1571func (sc *serverConn) processSetting(s Setting) error {
1572 sc.serveG.check()
1573 if err := s.Valid(); err != nil {
1574 return err
1575 }
1576 if VerboseLogs {
1577 sc.vlogf("http2: server processing setting %v", s)
1578 }
1579 switch s.ID {
1580 case SettingHeaderTableSize:
1581 sc.headerTableSize = s.Val
1582 sc.hpackEncoder.SetMaxDynamicTableSize(s.Val)
1583 case SettingEnablePush:
1584 sc.pushEnabled = s.Val != 0
1585 case SettingMaxConcurrentStreams:
1586 sc.clientMaxStreams = s.Val
1587 case SettingInitialWindowSize:
1588 return sc.processSettingInitialWindowSize(s.Val)
1589 case SettingMaxFrameSize:
1590 sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
1591 case SettingMaxHeaderListSize:
1592 sc.peerMaxHeaderListSize = s.Val
1593 default:
1594 // Unknown setting: "An endpoint that receives a SETTINGS
1595 // frame with any unknown or unsupported identifier MUST
1596 // ignore that setting."
1597 if VerboseLogs {
1598 sc.vlogf("http2: server ignoring unknown setting %v", s)
1599 }
1600 }
1601 return nil
1602}
1603
1604func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
1605 sc.serveG.check()
1606 // Note: val already validated to be within range by
1607 // processSetting's Valid call.
1608
1609 // "A SETTINGS frame can alter the initial flow control window
1610 // size for all current streams. When the value of
1611 // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
1612 // adjust the size of all stream flow control windows that it
1613 // maintains by the difference between the new value and the
1614 // old value."
1615 old := sc.initialStreamSendWindowSize
1616 sc.initialStreamSendWindowSize = int32(val)
1617 growth := int32(val) - old // may be negative
1618 for _, st := range sc.streams {
1619 if !st.flow.add(growth) {
1620 // 6.9.2 Initial Flow Control Window Size
1621 // "An endpoint MUST treat a change to
1622 // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
1623 // control window to exceed the maximum size as a
1624 // connection error (Section 5.4.1) of type
1625 // FLOW_CONTROL_ERROR."
1626 return ConnectionError(ErrCodeFlowControl)
1627 }
1628 }
1629 return nil
1630}
1631
1632func (sc *serverConn) processData(f *DataFrame) error {
1633 sc.serveG.check()
1634 id := f.Header().StreamID
1635 if sc.inGoAway && (sc.goAwayCode != ErrCodeNo || id > sc.maxClientStreamID) {
1636 // Discard all DATA frames if the GOAWAY is due to an
1637 // error, or:
1638 //
1639 // Section 6.8: After sending a GOAWAY frame, the sender
1640 // can discard frames for streams initiated by the
1641 // receiver with identifiers higher than the identified
1642 // last stream.
1643 return nil
1644 }
1645
1646 data := f.Data()
1647 state, st := sc.state(id)
1648 if id == 0 || state == stateIdle {
1649 // Section 6.1: "DATA frames MUST be associated with a
1650 // stream. If a DATA frame is received whose stream
1651 // identifier field is 0x0, the recipient MUST respond
1652 // with a connection error (Section 5.4.1) of type
1653 // PROTOCOL_ERROR."
1654 //
1655 // Section 5.1: "Receiving any frame other than HEADERS
1656 // or PRIORITY on a stream in this state MUST be
1657 // treated as a connection error (Section 5.4.1) of
1658 // type PROTOCOL_ERROR."
1659 return ConnectionError(ErrCodeProtocol)
1660 }
1661
1662 // "If a DATA frame is received whose stream is not in "open"
1663 // or "half closed (local)" state, the recipient MUST respond
1664 // with a stream error (Section 5.4.2) of type STREAM_CLOSED."
1665 if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
1666 // This includes sending a RST_STREAM if the stream is
1667 // in stateHalfClosedLocal (which currently means that
1668 // the http.Handler returned, so it's done reading &
1669 // done writing). Try to stop the client from sending
1670 // more DATA.
1671
1672 // But still enforce their connection-level flow control,
1673 // and return any flow control bytes since we're not going
1674 // to consume them.
1675 if sc.inflow.available() < int32(f.Length) {
1676 return streamError(id, ErrCodeFlowControl)
1677 }
1678 // Deduct the flow control from inflow, since we're
1679 // going to immediately add it back in
1680 // sendWindowUpdate, which also schedules sending the
1681 // frames.
1682 sc.inflow.take(int32(f.Length))
1683 sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
1684
1685 if st != nil && st.resetQueued {
1686 // Already have a stream error in flight. Don't send another.
1687 return nil
1688 }
1689 return streamError(id, ErrCodeStreamClosed)
1690 }
1691 if st.body == nil {
1692 panic("internal error: should have a body in this state")
1693 }
1694
1695 // Sender sending more than they'd declared?
1696 if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
1697 st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
1698 // RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
1699 // value of a content-length header field does not equal the sum of the
1700 // DATA frame payload lengths that form the body.
1701 return streamError(id, ErrCodeProtocol)
1702 }
1703 if f.Length > 0 {
1704 // Check whether the client has flow control quota.
1705 if st.inflow.available() < int32(f.Length) {
1706 return streamError(id, ErrCodeFlowControl)
1707 }
1708 st.inflow.take(int32(f.Length))
1709
1710 if len(data) > 0 {
1711 wrote, err := st.body.Write(data)
1712 if err != nil {
1713 sc.sendWindowUpdate(nil, int(f.Length)-wrote)
1714 return streamError(id, ErrCodeStreamClosed)
1715 }
1716 if wrote != len(data) {
1717 panic("internal error: bad Writer")
1718 }
1719 st.bodyBytes += int64(len(data))
1720 }
1721
1722 // Return any padded flow control now, since we won't
1723 // refund it later on body reads.
1724 if pad := int32(f.Length) - int32(len(data)); pad > 0 {
1725 sc.sendWindowUpdate32(nil, pad)
1726 sc.sendWindowUpdate32(st, pad)
1727 }
1728 }
1729 if f.StreamEnded() {
1730 st.endStream()
1731 }
1732 return nil
1733}
1734
1735func (sc *serverConn) processGoAway(f *GoAwayFrame) error {
1736 sc.serveG.check()
1737 if f.ErrCode != ErrCodeNo {
1738 sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1739 } else {
1740 sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1741 }
1742 sc.startGracefulShutdownInternal()
1743 // http://tools.ietf.org/html/rfc7540#section-6.8
1744 // We should not create any new streams, which means we should disable push.
1745 sc.pushEnabled = false
1746 return nil
1747}
1748
1749// isPushed reports whether the stream is server-initiated.
1750func (st *stream) isPushed() bool {
1751 return st.id%2 == 0
1752}
1753
1754// endStream closes a Request.Body's pipe. It is called when a DATA
1755// frame says a request body is over (or after trailers).
1756func (st *stream) endStream() {
1757 sc := st.sc
1758 sc.serveG.check()
1759
1760 if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
1761 st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
1762 st.declBodyBytes, st.bodyBytes))
1763 } else {
1764 st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest)
1765 st.body.CloseWithError(io.EOF)
1766 }
1767 st.state = stateHalfClosedRemote
1768}
1769
1770// copyTrailersToHandlerRequest is run in the Handler's goroutine in
1771// its Request.Body.Read just before it gets io.EOF.
1772func (st *stream) copyTrailersToHandlerRequest() {
1773 for k, vv := range st.trailer {
1774 if _, ok := st.reqTrailer[k]; ok {
1775 // Only copy it over it was pre-declared.
1776 st.reqTrailer[k] = vv
1777 }
1778 }
1779}
1780
1781// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
1782// when the stream's WriteTimeout has fired.
1783func (st *stream) onWriteTimeout() {
1784 st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)})
1785}
1786
1787func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
1788 sc.serveG.check()
1789 id := f.StreamID
1790 if sc.inGoAway {
1791 // Ignore.
1792 return nil
1793 }
1794 // http://tools.ietf.org/html/rfc7540#section-5.1.1
1795 // Streams initiated by a client MUST use odd-numbered stream
1796 // identifiers. [...] An endpoint that receives an unexpected
1797 // stream identifier MUST respond with a connection error
1798 // (Section 5.4.1) of type PROTOCOL_ERROR.
1799 if id%2 != 1 {
1800 return ConnectionError(ErrCodeProtocol)
1801 }
1802 // A HEADERS frame can be used to create a new stream or
1803 // send a trailer for an open one. If we already have a stream
1804 // open, let it process its own HEADERS frame (trailers at this
1805 // point, if it's valid).
1806 if st := sc.streams[f.StreamID]; st != nil {
1807 if st.resetQueued {
1808 // We're sending RST_STREAM to close the stream, so don't bother
1809 // processing this frame.
1810 return nil
1811 }
1812 // RFC 7540, sec 5.1: If an endpoint receives additional frames, other than
1813 // WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in
1814 // this state, it MUST respond with a stream error (Section 5.4.2) of
1815 // type STREAM_CLOSED.
1816 if st.state == stateHalfClosedRemote {
1817 return streamError(id, ErrCodeStreamClosed)
1818 }
1819 return st.processTrailerHeaders(f)
1820 }
1821
1822 // [...] The identifier of a newly established stream MUST be
1823 // numerically greater than all streams that the initiating
1824 // endpoint has opened or reserved. [...] An endpoint that
1825 // receives an unexpected stream identifier MUST respond with
1826 // a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1827 if id <= sc.maxClientStreamID {
1828 return ConnectionError(ErrCodeProtocol)
1829 }
1830 sc.maxClientStreamID = id
1831
1832 if sc.idleTimer != nil {
1833 sc.idleTimer.Stop()
1834 }
1835
1836 // http://tools.ietf.org/html/rfc7540#section-5.1.2
1837 // [...] Endpoints MUST NOT exceed the limit set by their peer. An
1838 // endpoint that receives a HEADERS frame that causes their
1839 // advertised concurrent stream limit to be exceeded MUST treat
1840 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
1841 // or REFUSED_STREAM.
1842 if sc.curClientStreams+1 > sc.advMaxStreams {
1843 if sc.unackedSettings == 0 {
1844 // They should know better.
1845 return streamError(id, ErrCodeProtocol)
1846 }
1847 // Assume it's a network race, where they just haven't
1848 // received our last SETTINGS update. But actually
1849 // this can't happen yet, because we don't yet provide
1850 // a way for users to adjust server parameters at
1851 // runtime.
1852 return streamError(id, ErrCodeRefusedStream)
1853 }
1854
1855 initialState := stateOpen
1856 if f.StreamEnded() {
1857 initialState = stateHalfClosedRemote
1858 }
1859 st := sc.newStream(id, 0, initialState)
1860
1861 if f.HasPriority() {
1862 if err := checkPriority(f.StreamID, f.Priority); err != nil {
1863 return err
1864 }
1865 sc.writeSched.AdjustStream(st.id, f.Priority)
1866 }
1867
1868 rw, req, err := sc.newWriterAndRequest(st, f)
1869 if err != nil {
1870 return err
1871 }
1872 st.reqTrailer = req.Trailer
1873 if st.reqTrailer != nil {
1874 st.trailer = make(http.Header)
1875 }
1876 st.body = req.Body.(*requestBody).pipe // may be nil
1877 st.declBodyBytes = req.ContentLength
1878
1879 handler := sc.handler.ServeHTTP
1880 if f.Truncated {
1881 // Their header list was too long. Send a 431 error.
1882 handler = handleHeaderListTooLong
1883 } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
1884 handler = new400Handler(err)
1885 }
1886
1887 // The net/http package sets the read deadline from the
1888 // http.Server.ReadTimeout during the TLS handshake, but then
1889 // passes the connection off to us with the deadline already
1890 // set. Disarm it here after the request headers are read,
1891 // similar to how the http1 server works. Here it's
1892 // technically more like the http1 Server's ReadHeaderTimeout
1893 // (in Go 1.8), though. That's a more sane option anyway.
1894 if sc.hs.ReadTimeout != 0 {
1895 sc.conn.SetReadDeadline(time.Time{})
1896 }
1897
1898 go sc.runHandler(rw, req, handler)
1899 return nil
1900}
1901
1902func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
1903 sc := st.sc
1904 sc.serveG.check()
1905 if st.gotTrailerHeader {
1906 return ConnectionError(ErrCodeProtocol)
1907 }
1908 st.gotTrailerHeader = true
1909 if !f.StreamEnded() {
1910 return streamError(st.id, ErrCodeProtocol)
1911 }
1912
1913 if len(f.PseudoFields()) > 0 {
1914 return streamError(st.id, ErrCodeProtocol)
1915 }
1916 if st.trailer != nil {
1917 for _, hf := range f.RegularFields() {
1918 key := sc.canonicalHeader(hf.Name)
1919 if !httpguts.ValidTrailerHeader(key) {
1920 // TODO: send more details to the peer somehow. But http2 has
1921 // no way to send debug data at a stream level. Discuss with
1922 // HTTP folk.
1923 return streamError(st.id, ErrCodeProtocol)
1924 }
1925 st.trailer[key] = append(st.trailer[key], hf.Value)
1926 }
1927 }
1928 st.endStream()
1929 return nil
1930}
1931
1932func checkPriority(streamID uint32, p PriorityParam) error {
1933 if streamID == p.StreamDep {
1934 // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
1935 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
1936 // Section 5.3.3 says that a stream can depend on one of its dependencies,
1937 // so it's only self-dependencies that are forbidden.
1938 return streamError(streamID, ErrCodeProtocol)
1939 }
1940 return nil
1941}
1942
1943func (sc *serverConn) processPriority(f *PriorityFrame) error {
1944 if sc.inGoAway {
1945 return nil
1946 }
1947 if err := checkPriority(f.StreamID, f.PriorityParam); err != nil {
1948 return err
1949 }
1950 sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
1951 return nil
1952}
1953
1954func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
1955 sc.serveG.check()
1956 if id == 0 {
1957 panic("internal error: cannot create stream with id 0")
1958 }
1959
1960 ctx, cancelCtx := context.WithCancel(sc.baseCtx)
1961 st := &stream{
1962 sc: sc,
1963 id: id,
1964 state: state,
1965 ctx: ctx,
1966 cancelCtx: cancelCtx,
1967 }
1968 st.cw.Init()
1969 st.flow.conn = &sc.flow // link to conn-level counter
1970 st.flow.add(sc.initialStreamSendWindowSize)
1971 st.inflow.conn = &sc.inflow // link to conn-level counter
1972 st.inflow.add(sc.srv.initialStreamRecvWindowSize())
1973 if sc.hs.WriteTimeout != 0 {
1974 st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
1975 }
1976
1977 sc.streams[id] = st
1978 sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
1979 if st.isPushed() {
1980 sc.curPushedStreams++
1981 } else {
1982 sc.curClientStreams++
1983 }
1984 if sc.curOpenStreams() == 1 {
1985 sc.setConnState(http.StateActive)
1986 }
1987
1988 return st
1989}
1990
1991func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
1992 sc.serveG.check()
1993
1994 rp := requestParam{
1995 method: f.PseudoValue("method"),
1996 scheme: f.PseudoValue("scheme"),
1997 authority: f.PseudoValue("authority"),
1998 path: f.PseudoValue("path"),
1999 }
2000
2001 isConnect := rp.method == "CONNECT"
2002 if isConnect {
2003 if rp.path != "" || rp.scheme != "" || rp.authority == "" {
2004 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
2005 }
2006 } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
2007 // See 8.1.2.6 Malformed Requests and Responses:
2008 //
2009 // Malformed requests or responses that are detected
2010 // MUST be treated as a stream error (Section 5.4.2)
2011 // of type PROTOCOL_ERROR."
2012 //
2013 // 8.1.2.3 Request Pseudo-Header Fields
2014 // "All HTTP/2 requests MUST include exactly one valid
2015 // value for the :method, :scheme, and :path
2016 // pseudo-header fields"
2017 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
2018 }
2019
2020 bodyOpen := !f.StreamEnded()
2021 if rp.method == "HEAD" && bodyOpen {
2022 // HEAD requests can't have bodies
2023 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
2024 }
2025
2026 rp.header = make(http.Header)
2027 for _, hf := range f.RegularFields() {
2028 rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value)
2029 }
2030 if rp.authority == "" {
2031 rp.authority = rp.header.Get("Host")
2032 }
2033
2034 rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
2035 if err != nil {
2036 return nil, nil, err
2037 }
2038 if bodyOpen {
2039 if vv, ok := rp.header["Content-Length"]; ok {
2040 if cl, err := strconv.ParseUint(vv[0], 10, 63); err == nil {
2041 req.ContentLength = int64(cl)
2042 } else {
2043 req.ContentLength = 0
2044 }
2045 } else {
2046 req.ContentLength = -1
2047 }
2048 req.Body.(*requestBody).pipe = &pipe{
2049 b: &dataBuffer{expected: req.ContentLength},
2050 }
2051 }
2052 return rw, req, nil
2053}
2054
2055type requestParam struct {
2056 method string
2057 scheme, authority, path string
2058 header http.Header
2059}
2060
2061func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) {
2062 sc.serveG.check()
2063
2064 var tlsState *tls.ConnectionState // nil if not scheme https
2065 if rp.scheme == "https" {
2066 tlsState = sc.tlsState
2067 }
2068
2069 needsContinue := rp.header.Get("Expect") == "100-continue"
2070 if needsContinue {
2071 rp.header.Del("Expect")
2072 }
2073 // Merge Cookie headers into one "; "-delimited value.
2074 if cookies := rp.header["Cookie"]; len(cookies) > 1 {
2075 rp.header.Set("Cookie", strings.Join(cookies, "; "))
2076 }
2077
2078 // Setup Trailers
2079 var trailer http.Header
2080 for _, v := range rp.header["Trailer"] {
2081 for _, key := range strings.Split(v, ",") {
2082 key = http.CanonicalHeaderKey(textproto.TrimString(key))
2083 switch key {
2084 case "Transfer-Encoding", "Trailer", "Content-Length":
2085 // Bogus. (copy of http1 rules)
2086 // Ignore.
2087 default:
2088 if trailer == nil {
2089 trailer = make(http.Header)
2090 }
2091 trailer[key] = nil
2092 }
2093 }
2094 }
2095 delete(rp.header, "Trailer")
2096
2097 var url_ *url.URL
2098 var requestURI string
2099 if rp.method == "CONNECT" {
2100 url_ = &url.URL{Host: rp.authority}
2101 requestURI = rp.authority // mimic HTTP/1 server behavior
2102 } else {
2103 var err error
2104 url_, err = url.ParseRequestURI(rp.path)
2105 if err != nil {
2106 return nil, nil, streamError(st.id, ErrCodeProtocol)
2107 }
2108 requestURI = rp.path
2109 }
2110
2111 body := &requestBody{
2112 conn: sc,
2113 stream: st,
2114 needsContinue: needsContinue,
2115 }
2116 req := &http.Request{
2117 Method: rp.method,
2118 URL: url_,
2119 RemoteAddr: sc.remoteAddrStr,
2120 Header: rp.header,
2121 RequestURI: requestURI,
2122 Proto: "HTTP/2.0",
2123 ProtoMajor: 2,
2124 ProtoMinor: 0,
2125 TLS: tlsState,
2126 Host: rp.authority,
2127 Body: body,
2128 Trailer: trailer,
2129 }
2130 req = req.WithContext(st.ctx)
2131
2132 rws := responseWriterStatePool.Get().(*responseWriterState)
2133 bwSave := rws.bw
2134 *rws = responseWriterState{} // zero all the fields
2135 rws.conn = sc
2136 rws.bw = bwSave
2137 rws.bw.Reset(chunkWriter{rws})
2138 rws.stream = st
2139 rws.req = req
2140 rws.body = body
2141
2142 rw := &responseWriter{rws: rws}
2143 return rw, req, nil
2144}
2145
2146// Run on its own goroutine.
2147func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
2148 didPanic := true
2149 defer func() {
2150 rw.rws.stream.cancelCtx()
2151 if didPanic {
2152 e := recover()
2153 sc.writeFrameFromHandler(FrameWriteRequest{
2154 write: handlerPanicRST{rw.rws.stream.id},
2155 stream: rw.rws.stream,
2156 })
2157 // Same as net/http:
2158 if e != nil && e != http.ErrAbortHandler {
2159 const size = 64 << 10
2160 buf := make([]byte, size)
2161 buf = buf[:runtime.Stack(buf, false)]
2162 sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
2163 }
2164 return
2165 }
2166 rw.handlerDone()
2167 }()
2168 handler(rw, req)
2169 didPanic = false
2170}
2171
2172func handleHeaderListTooLong(w http.ResponseWriter, r *http.Request) {
2173 // 10.5.1 Limits on Header Block Size:
2174 // .. "A server that receives a larger header block than it is
2175 // willing to handle can send an HTTP 431 (Request Header Fields Too
2176 // Large) status code"
2177 const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+
2178 w.WriteHeader(statusRequestHeaderFieldsTooLarge)
2179 io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>")
2180}
2181
2182// called from handler goroutines.
2183// h may be nil.
2184func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error {
2185 sc.serveG.checkNotOn() // NOT on
2186 var errc chan error
2187 if headerData.h != nil {
2188 // If there's a header map (which we don't own), so we have to block on
2189 // waiting for this frame to be written, so an http.Flush mid-handler
2190 // writes out the correct value of keys, before a handler later potentially
2191 // mutates it.
2192 errc = errChanPool.Get().(chan error)
2193 }
2194 if err := sc.writeFrameFromHandler(FrameWriteRequest{
2195 write: headerData,
2196 stream: st,
2197 done: errc,
2198 }); err != nil {
2199 return err
2200 }
2201 if errc != nil {
2202 select {
2203 case err := <-errc:
2204 errChanPool.Put(errc)
2205 return err
2206 case <-sc.doneServing:
2207 return errClientDisconnected
2208 case <-st.cw:
2209 return errStreamClosed
2210 }
2211 }
2212 return nil
2213}
2214
2215// called from handler goroutines.
2216func (sc *serverConn) write100ContinueHeaders(st *stream) {
2217 sc.writeFrameFromHandler(FrameWriteRequest{
2218 write: write100ContinueHeadersFrame{st.id},
2219 stream: st,
2220 })
2221}
2222
2223// A bodyReadMsg tells the server loop that the http.Handler read n
2224// bytes of the DATA from the client on the given stream.
2225type bodyReadMsg struct {
2226 st *stream
2227 n int
2228}
2229
2230// called from handler goroutines.
2231// Notes that the handler for the given stream ID read n bytes of its body
2232// and schedules flow control tokens to be sent.
2233func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
2234 sc.serveG.checkNotOn() // NOT on
2235 if n > 0 {
2236 select {
2237 case sc.bodyReadCh <- bodyReadMsg{st, n}:
2238 case <-sc.doneServing:
2239 }
2240 }
2241}
2242
2243func (sc *serverConn) noteBodyRead(st *stream, n int) {
2244 sc.serveG.check()
2245 sc.sendWindowUpdate(nil, n) // conn-level
2246 if st.state != stateHalfClosedRemote && st.state != stateClosed {
2247 // Don't send this WINDOW_UPDATE if the stream is closed
2248 // remotely.
2249 sc.sendWindowUpdate(st, n)
2250 }
2251}
2252
2253// st may be nil for conn-level
2254func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
2255 sc.serveG.check()
2256 // "The legal range for the increment to the flow control
2257 // window is 1 to 2^31-1 (2,147,483,647) octets."
2258 // A Go Read call on 64-bit machines could in theory read
2259 // a larger Read than this. Very unlikely, but we handle it here
2260 // rather than elsewhere for now.
2261 const maxUint31 = 1<<31 - 1
2262 for n >= maxUint31 {
2263 sc.sendWindowUpdate32(st, maxUint31)
2264 n -= maxUint31
2265 }
2266 sc.sendWindowUpdate32(st, int32(n))
2267}
2268
2269// st may be nil for conn-level
2270func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
2271 sc.serveG.check()
2272 if n == 0 {
2273 return
2274 }
2275 if n < 0 {
2276 panic("negative update")
2277 }
2278 var streamID uint32
2279 if st != nil {
2280 streamID = st.id
2281 }
2282 sc.writeFrame(FrameWriteRequest{
2283 write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
2284 stream: st,
2285 })
2286 var ok bool
2287 if st == nil {
2288 ok = sc.inflow.add(n)
2289 } else {
2290 ok = st.inflow.add(n)
2291 }
2292 if !ok {
2293 panic("internal error; sent too many window updates without decrements?")
2294 }
2295}
2296
2297// requestBody is the Handler's Request.Body type.
2298// Read and Close may be called concurrently.
2299type requestBody struct {
2300 _ incomparable
2301 stream *stream
2302 conn *serverConn
2303 closed bool // for use by Close only
2304 sawEOF bool // for use by Read only
2305 pipe *pipe // non-nil if we have a HTTP entity message body
2306 needsContinue bool // need to send a 100-continue
2307}
2308
2309func (b *requestBody) Close() error {
2310 if b.pipe != nil && !b.closed {
2311 b.pipe.BreakWithError(errClosedBody)
2312 }
2313 b.closed = true
2314 return nil
2315}
2316
2317func (b *requestBody) Read(p []byte) (n int, err error) {
2318 if b.needsContinue {
2319 b.needsContinue = false
2320 b.conn.write100ContinueHeaders(b.stream)
2321 }
2322 if b.pipe == nil || b.sawEOF {
2323 return 0, io.EOF
2324 }
2325 n, err = b.pipe.Read(p)
2326 if err == io.EOF {
2327 b.sawEOF = true
2328 }
2329 if b.conn == nil && inTests {
2330 return
2331 }
2332 b.conn.noteBodyReadFromHandler(b.stream, n, err)
2333 return
2334}
2335
2336// responseWriter is the http.ResponseWriter implementation. It's
2337// intentionally small (1 pointer wide) to minimize garbage. The
2338// responseWriterState pointer inside is zeroed at the end of a
2339// request (in handlerDone) and calls on the responseWriter thereafter
2340// simply crash (caller's mistake), but the much larger responseWriterState
2341// and buffers are reused between multiple requests.
2342type responseWriter struct {
2343 rws *responseWriterState
2344}
2345
2346// Optional http.ResponseWriter interfaces implemented.
2347var (
2348 _ http.CloseNotifier = (*responseWriter)(nil)
2349 _ http.Flusher = (*responseWriter)(nil)
2350 _ stringWriter = (*responseWriter)(nil)
2351)
2352
2353type responseWriterState struct {
2354 // immutable within a request:
2355 stream *stream
2356 req *http.Request
2357 body *requestBody // to close at end of request, if DATA frames didn't
2358 conn *serverConn
2359
2360 // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
2361 bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState}
2362
2363 // mutated by http.Handler goroutine:
2364 handlerHeader http.Header // nil until called
2365 snapHeader http.Header // snapshot of handlerHeader at WriteHeader time
2366 trailers []string // set in writeChunk
2367 status int // status code passed to WriteHeader
2368 wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
2369 sentHeader bool // have we sent the header frame?
2370 handlerDone bool // handler has finished
2371 dirty bool // a Write failed; don't reuse this responseWriterState
2372
2373 sentContentLen int64 // non-zero if handler set a Content-Length header
2374 wroteBytes int64
2375
2376 closeNotifierMu sync.Mutex // guards closeNotifierCh
2377 closeNotifierCh chan bool // nil until first used
2378}
2379
2380type chunkWriter struct{ rws *responseWriterState }
2381
2382func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) }
2383
2384func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 }
2385
2386func (rws *responseWriterState) hasNonemptyTrailers() bool {
2387 for _, trailer := range rws.trailers {
2388 if _, ok := rws.handlerHeader[trailer]; ok {
2389 return true
2390 }
2391 }
2392 return false
2393}
2394
2395// declareTrailer is called for each Trailer header when the
2396// response header is written. It notes that a header will need to be
2397// written in the trailers at the end of the response.
2398func (rws *responseWriterState) declareTrailer(k string) {
2399 k = http.CanonicalHeaderKey(k)
2400 if !httpguts.ValidTrailerHeader(k) {
2401 // Forbidden by RFC 7230, section 4.1.2.
2402 rws.conn.logf("ignoring invalid trailer %q", k)
2403 return
2404 }
2405 if !strSliceContains(rws.trailers, k) {
2406 rws.trailers = append(rws.trailers, k)
2407 }
2408}
2409
2410// writeChunk writes chunks from the bufio.Writer. But because
2411// bufio.Writer may bypass its chunking, sometimes p may be
2412// arbitrarily large.
2413//
2414// writeChunk is also responsible (on the first chunk) for sending the
2415// HEADER response.
2416func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
2417 if !rws.wroteHeader {
2418 rws.writeHeader(200)
2419 }
2420
2421 isHeadResp := rws.req.Method == "HEAD"
2422 if !rws.sentHeader {
2423 rws.sentHeader = true
2424 var ctype, clen string
2425 if clen = rws.snapHeader.Get("Content-Length"); clen != "" {
2426 rws.snapHeader.Del("Content-Length")
2427 if cl, err := strconv.ParseUint(clen, 10, 63); err == nil {
2428 rws.sentContentLen = int64(cl)
2429 } else {
2430 clen = ""
2431 }
2432 }
2433 if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
2434 clen = strconv.Itoa(len(p))
2435 }
2436 _, hasContentType := rws.snapHeader["Content-Type"]
2437 // If the Content-Encoding is non-blank, we shouldn't
2438 // sniff the body. See Issue golang.org/issue/31753.
2439 ce := rws.snapHeader.Get("Content-Encoding")
2440 hasCE := len(ce) > 0
2441 if !hasCE && !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 {
2442 ctype = http.DetectContentType(p)
2443 }
2444 var date string
2445 if _, ok := rws.snapHeader["Date"]; !ok {
2446 // TODO(bradfitz): be faster here, like net/http? measure.
2447 date = time.Now().UTC().Format(http.TimeFormat)
2448 }
2449
2450 for _, v := range rws.snapHeader["Trailer"] {
2451 foreachHeaderElement(v, rws.declareTrailer)
2452 }
2453
2454 // "Connection" headers aren't allowed in HTTP/2 (RFC 7540, 8.1.2.2),
2455 // but respect "Connection" == "close" to mean sending a GOAWAY and tearing
2456 // down the TCP connection when idle, like we do for HTTP/1.
2457 // TODO: remove more Connection-specific header fields here, in addition
2458 // to "Connection".
2459 if _, ok := rws.snapHeader["Connection"]; ok {
2460 v := rws.snapHeader.Get("Connection")
2461 delete(rws.snapHeader, "Connection")
2462 if v == "close" {
2463 rws.conn.startGracefulShutdown()
2464 }
2465 }
2466
2467 endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp
2468 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2469 streamID: rws.stream.id,
2470 httpResCode: rws.status,
2471 h: rws.snapHeader,
2472 endStream: endStream,
2473 contentType: ctype,
2474 contentLength: clen,
2475 date: date,
2476 })
2477 if err != nil {
2478 rws.dirty = true
2479 return 0, err
2480 }
2481 if endStream {
2482 return 0, nil
2483 }
2484 }
2485 if isHeadResp {
2486 return len(p), nil
2487 }
2488 if len(p) == 0 && !rws.handlerDone {
2489 return 0, nil
2490 }
2491
2492 if rws.handlerDone {
2493 rws.promoteUndeclaredTrailers()
2494 }
2495
2496 // only send trailers if they have actually been defined by the
2497 // server handler.
2498 hasNonemptyTrailers := rws.hasNonemptyTrailers()
2499 endStream := rws.handlerDone && !hasNonemptyTrailers
2500 if len(p) > 0 || endStream {
2501 // only send a 0 byte DATA frame if we're ending the stream.
2502 if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
2503 rws.dirty = true
2504 return 0, err
2505 }
2506 }
2507
2508 if rws.handlerDone && hasNonemptyTrailers {
2509 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2510 streamID: rws.stream.id,
2511 h: rws.handlerHeader,
2512 trailers: rws.trailers,
2513 endStream: true,
2514 })
2515 if err != nil {
2516 rws.dirty = true
2517 }
2518 return len(p), err
2519 }
2520 return len(p), nil
2521}
2522
2523// TrailerPrefix is a magic prefix for ResponseWriter.Header map keys
2524// that, if present, signals that the map entry is actually for
2525// the response trailers, and not the response headers. The prefix
2526// is stripped after the ServeHTTP call finishes and the values are
2527// sent in the trailers.
2528//
2529// This mechanism is intended only for trailers that are not known
2530// prior to the headers being written. If the set of trailers is fixed
2531// or known before the header is written, the normal Go trailers mechanism
2532// is preferred:
Joey Armstrongba3d9d12024-01-15 14:22:11 -05002533//
2534// https://golang.org/pkg/net/http/#ResponseWriter
2535// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
khenaidoo5fc5cea2021-08-11 17:39:16 -04002536const TrailerPrefix = "Trailer:"
2537
2538// promoteUndeclaredTrailers permits http.Handlers to set trailers
2539// after the header has already been flushed. Because the Go
2540// ResponseWriter interface has no way to set Trailers (only the
2541// Header), and because we didn't want to expand the ResponseWriter
2542// interface, and because nobody used trailers, and because RFC 7230
2543// says you SHOULD (but not must) predeclare any trailers in the
2544// header, the official ResponseWriter rules said trailers in Go must
2545// be predeclared, and then we reuse the same ResponseWriter.Header()
2546// map to mean both Headers and Trailers. When it's time to write the
2547// Trailers, we pick out the fields of Headers that were declared as
2548// trailers. That worked for a while, until we found the first major
2549// user of Trailers in the wild: gRPC (using them only over http2),
2550// and gRPC libraries permit setting trailers mid-stream without
2551// predeclaring them. So: change of plans. We still permit the old
2552// way, but we also permit this hack: if a Header() key begins with
2553// "Trailer:", the suffix of that key is a Trailer. Because ':' is an
2554// invalid token byte anyway, there is no ambiguity. (And it's already
2555// filtered out) It's mildly hacky, but not terrible.
2556//
2557// This method runs after the Handler is done and promotes any Header
2558// fields to be trailers.
2559func (rws *responseWriterState) promoteUndeclaredTrailers() {
2560 for k, vv := range rws.handlerHeader {
2561 if !strings.HasPrefix(k, TrailerPrefix) {
2562 continue
2563 }
2564 trailerKey := strings.TrimPrefix(k, TrailerPrefix)
2565 rws.declareTrailer(trailerKey)
2566 rws.handlerHeader[http.CanonicalHeaderKey(trailerKey)] = vv
2567 }
2568
2569 if len(rws.trailers) > 1 {
2570 sorter := sorterPool.Get().(*sorter)
2571 sorter.SortStrings(rws.trailers)
2572 sorterPool.Put(sorter)
2573 }
2574}
2575
2576func (w *responseWriter) Flush() {
2577 rws := w.rws
2578 if rws == nil {
2579 panic("Header called after Handler finished")
2580 }
2581 if rws.bw.Buffered() > 0 {
2582 if err := rws.bw.Flush(); err != nil {
2583 // Ignore the error. The frame writer already knows.
2584 return
2585 }
2586 } else {
2587 // The bufio.Writer won't call chunkWriter.Write
2588 // (writeChunk with zero bytes, so we have to do it
2589 // ourselves to force the HTTP response header and/or
2590 // final DATA frame (with END_STREAM) to be sent.
2591 rws.writeChunk(nil)
2592 }
2593}
2594
2595func (w *responseWriter) CloseNotify() <-chan bool {
2596 rws := w.rws
2597 if rws == nil {
2598 panic("CloseNotify called after Handler finished")
2599 }
2600 rws.closeNotifierMu.Lock()
2601 ch := rws.closeNotifierCh
2602 if ch == nil {
2603 ch = make(chan bool, 1)
2604 rws.closeNotifierCh = ch
2605 cw := rws.stream.cw
2606 go func() {
2607 cw.Wait() // wait for close
2608 ch <- true
2609 }()
2610 }
2611 rws.closeNotifierMu.Unlock()
2612 return ch
2613}
2614
2615func (w *responseWriter) Header() http.Header {
2616 rws := w.rws
2617 if rws == nil {
2618 panic("Header called after Handler finished")
2619 }
2620 if rws.handlerHeader == nil {
2621 rws.handlerHeader = make(http.Header)
2622 }
2623 return rws.handlerHeader
2624}
2625
2626// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode.
2627func checkWriteHeaderCode(code int) {
2628 // Issue 22880: require valid WriteHeader status codes.
2629 // For now we only enforce that it's three digits.
2630 // In the future we might block things over 599 (600 and above aren't defined
2631 // at http://httpwg.org/specs/rfc7231.html#status.codes)
2632 // and we might block under 200 (once we have more mature 1xx support).
2633 // But for now any three digits.
2634 //
2635 // We used to send "HTTP/1.1 000 0" on the wire in responses but there's
2636 // no equivalent bogus thing we can realistically send in HTTP/2,
2637 // so we'll consistently panic instead and help people find their bugs
2638 // early. (We can't return an error from WriteHeader even if we wanted to.)
2639 if code < 100 || code > 999 {
2640 panic(fmt.Sprintf("invalid WriteHeader code %v", code))
2641 }
2642}
2643
2644func (w *responseWriter) WriteHeader(code int) {
2645 rws := w.rws
2646 if rws == nil {
2647 panic("WriteHeader called after Handler finished")
2648 }
2649 rws.writeHeader(code)
2650}
2651
2652func (rws *responseWriterState) writeHeader(code int) {
2653 if !rws.wroteHeader {
2654 checkWriteHeaderCode(code)
2655 rws.wroteHeader = true
2656 rws.status = code
2657 if len(rws.handlerHeader) > 0 {
2658 rws.snapHeader = cloneHeader(rws.handlerHeader)
2659 }
2660 }
2661}
2662
2663func cloneHeader(h http.Header) http.Header {
2664 h2 := make(http.Header, len(h))
2665 for k, vv := range h {
2666 vv2 := make([]string, len(vv))
2667 copy(vv2, vv)
2668 h2[k] = vv2
2669 }
2670 return h2
2671}
2672
2673// The Life Of A Write is like this:
2674//
2675// * Handler calls w.Write or w.WriteString ->
2676// * -> rws.bw (*bufio.Writer) ->
2677// * (Handler might call Flush)
2678// * -> chunkWriter{rws}
2679// * -> responseWriterState.writeChunk(p []byte)
2680// * -> responseWriterState.writeChunk (most of the magic; see comment there)
2681func (w *responseWriter) Write(p []byte) (n int, err error) {
2682 return w.write(len(p), p, "")
2683}
2684
2685func (w *responseWriter) WriteString(s string) (n int, err error) {
2686 return w.write(len(s), nil, s)
2687}
2688
2689// either dataB or dataS is non-zero.
2690func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) {
2691 rws := w.rws
2692 if rws == nil {
2693 panic("Write called after Handler finished")
2694 }
2695 if !rws.wroteHeader {
2696 w.WriteHeader(200)
2697 }
2698 if !bodyAllowedForStatus(rws.status) {
2699 return 0, http.ErrBodyNotAllowed
2700 }
2701 rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set
2702 if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen {
2703 // TODO: send a RST_STREAM
2704 return 0, errors.New("http2: handler wrote more than declared Content-Length")
2705 }
2706
2707 if dataB != nil {
2708 return rws.bw.Write(dataB)
2709 } else {
2710 return rws.bw.WriteString(dataS)
2711 }
2712}
2713
2714func (w *responseWriter) handlerDone() {
2715 rws := w.rws
2716 dirty := rws.dirty
2717 rws.handlerDone = true
2718 w.Flush()
2719 w.rws = nil
2720 if !dirty {
2721 // Only recycle the pool if all prior Write calls to
2722 // the serverConn goroutine completed successfully. If
2723 // they returned earlier due to resets from the peer
2724 // there might still be write goroutines outstanding
2725 // from the serverConn referencing the rws memory. See
2726 // issue 20704.
2727 responseWriterStatePool.Put(rws)
2728 }
2729}
2730
2731// Push errors.
2732var (
2733 ErrRecursivePush = errors.New("http2: recursive push not allowed")
2734 ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
2735)
2736
2737var _ http.Pusher = (*responseWriter)(nil)
2738
2739func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
2740 st := w.rws.stream
2741 sc := st.sc
2742 sc.serveG.checkNotOn()
2743
2744 // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
2745 // http://tools.ietf.org/html/rfc7540#section-6.6
2746 if st.isPushed() {
2747 return ErrRecursivePush
2748 }
2749
2750 if opts == nil {
2751 opts = new(http.PushOptions)
2752 }
2753
2754 // Default options.
2755 if opts.Method == "" {
2756 opts.Method = "GET"
2757 }
2758 if opts.Header == nil {
2759 opts.Header = http.Header{}
2760 }
2761 wantScheme := "http"
2762 if w.rws.req.TLS != nil {
2763 wantScheme = "https"
2764 }
2765
2766 // Validate the request.
2767 u, err := url.Parse(target)
2768 if err != nil {
2769 return err
2770 }
2771 if u.Scheme == "" {
2772 if !strings.HasPrefix(target, "/") {
2773 return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
2774 }
2775 u.Scheme = wantScheme
2776 u.Host = w.rws.req.Host
2777 } else {
2778 if u.Scheme != wantScheme {
2779 return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
2780 }
2781 if u.Host == "" {
2782 return errors.New("URL must have a host")
2783 }
2784 }
2785 for k := range opts.Header {
2786 if strings.HasPrefix(k, ":") {
2787 return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
2788 }
2789 // These headers are meaningful only if the request has a body,
2790 // but PUSH_PROMISE requests cannot have a body.
2791 // http://tools.ietf.org/html/rfc7540#section-8.2
2792 // Also disallow Host, since the promised URL must be absolute.
2793 switch strings.ToLower(k) {
2794 case "content-length", "content-encoding", "trailer", "te", "expect", "host":
2795 return fmt.Errorf("promised request headers cannot include %q", k)
2796 }
2797 }
2798 if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil {
2799 return err
2800 }
2801
2802 // The RFC effectively limits promised requests to GET and HEAD:
2803 // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
2804 // http://tools.ietf.org/html/rfc7540#section-8.2
2805 if opts.Method != "GET" && opts.Method != "HEAD" {
2806 return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
2807 }
2808
2809 msg := &startPushRequest{
2810 parent: st,
2811 method: opts.Method,
2812 url: u,
2813 header: cloneHeader(opts.Header),
2814 done: errChanPool.Get().(chan error),
2815 }
2816
2817 select {
2818 case <-sc.doneServing:
2819 return errClientDisconnected
2820 case <-st.cw:
2821 return errStreamClosed
2822 case sc.serveMsgCh <- msg:
2823 }
2824
2825 select {
2826 case <-sc.doneServing:
2827 return errClientDisconnected
2828 case <-st.cw:
2829 return errStreamClosed
2830 case err := <-msg.done:
2831 errChanPool.Put(msg.done)
2832 return err
2833 }
2834}
2835
2836type startPushRequest struct {
2837 parent *stream
2838 method string
2839 url *url.URL
2840 header http.Header
2841 done chan error
2842}
2843
2844func (sc *serverConn) startPush(msg *startPushRequest) {
2845 sc.serveG.check()
2846
2847 // http://tools.ietf.org/html/rfc7540#section-6.6.
2848 // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
2849 // is in either the "open" or "half-closed (remote)" state.
2850 if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
2851 // responseWriter.Push checks that the stream is peer-initiated.
2852 msg.done <- errStreamClosed
2853 return
2854 }
2855
2856 // http://tools.ietf.org/html/rfc7540#section-6.6.
2857 if !sc.pushEnabled {
2858 msg.done <- http.ErrNotSupported
2859 return
2860 }
2861
2862 // PUSH_PROMISE frames must be sent in increasing order by stream ID, so
2863 // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
2864 // is written. Once the ID is allocated, we start the request handler.
2865 allocatePromisedID := func() (uint32, error) {
2866 sc.serveG.check()
2867
2868 // Check this again, just in case. Technically, we might have received
2869 // an updated SETTINGS by the time we got around to writing this frame.
2870 if !sc.pushEnabled {
2871 return 0, http.ErrNotSupported
2872 }
2873 // http://tools.ietf.org/html/rfc7540#section-6.5.2.
2874 if sc.curPushedStreams+1 > sc.clientMaxStreams {
2875 return 0, ErrPushLimitReached
2876 }
2877
2878 // http://tools.ietf.org/html/rfc7540#section-5.1.1.
2879 // Streams initiated by the server MUST use even-numbered identifiers.
2880 // A server that is unable to establish a new stream identifier can send a GOAWAY
2881 // frame so that the client is forced to open a new connection for new streams.
2882 if sc.maxPushPromiseID+2 >= 1<<31 {
2883 sc.startGracefulShutdownInternal()
2884 return 0, ErrPushLimitReached
2885 }
2886 sc.maxPushPromiseID += 2
2887 promisedID := sc.maxPushPromiseID
2888
2889 // http://tools.ietf.org/html/rfc7540#section-8.2.
2890 // Strictly speaking, the new stream should start in "reserved (local)", then
2891 // transition to "half closed (remote)" after sending the initial HEADERS, but
2892 // we start in "half closed (remote)" for simplicity.
2893 // See further comments at the definition of stateHalfClosedRemote.
2894 promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote)
2895 rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{
2896 method: msg.method,
2897 scheme: msg.url.Scheme,
2898 authority: msg.url.Host,
2899 path: msg.url.RequestURI(),
2900 header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
2901 })
2902 if err != nil {
2903 // Should not happen, since we've already validated msg.url.
2904 panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
2905 }
2906
2907 go sc.runHandler(rw, req, sc.handler.ServeHTTP)
2908 return promisedID, nil
2909 }
2910
2911 sc.writeFrame(FrameWriteRequest{
2912 write: &writePushPromise{
2913 streamID: msg.parent.id,
2914 method: msg.method,
2915 url: msg.url,
2916 h: msg.header,
2917 allocatePromisedID: allocatePromisedID,
2918 },
2919 stream: msg.parent,
2920 done: msg.done,
2921 })
2922}
2923
2924// foreachHeaderElement splits v according to the "#rule" construction
2925// in RFC 7230 section 7 and calls fn for each non-empty element.
2926func foreachHeaderElement(v string, fn func(string)) {
2927 v = textproto.TrimString(v)
2928 if v == "" {
2929 return
2930 }
2931 if !strings.Contains(v, ",") {
2932 fn(v)
2933 return
2934 }
2935 for _, f := range strings.Split(v, ",") {
2936 if f = textproto.TrimString(f); f != "" {
2937 fn(f)
2938 }
2939 }
2940}
2941
2942// From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2
2943var connHeaders = []string{
2944 "Connection",
2945 "Keep-Alive",
2946 "Proxy-Connection",
2947 "Transfer-Encoding",
2948 "Upgrade",
2949}
2950
2951// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request,
2952// per RFC 7540 Section 8.1.2.2.
2953// The returned error is reported to users.
2954func checkValidHTTP2RequestHeaders(h http.Header) error {
2955 for _, k := range connHeaders {
2956 if _, ok := h[k]; ok {
2957 return fmt.Errorf("request header %q is not valid in HTTP/2", k)
2958 }
2959 }
2960 te := h["Te"]
2961 if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) {
2962 return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
2963 }
2964 return nil
2965}
2966
2967func new400Handler(err error) http.HandlerFunc {
2968 return func(w http.ResponseWriter, r *http.Request) {
2969 http.Error(w, err.Error(), http.StatusBadRequest)
2970 }
2971}
2972
2973// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
2974// disabled. See comments on h1ServerShutdownChan above for why
2975// the code is written this way.
2976func h1ServerKeepAlivesDisabled(hs *http.Server) bool {
2977 var x interface{} = hs
2978 type I interface {
2979 doKeepAlives() bool
2980 }
2981 if hs, ok := x.(I); ok {
2982 return !hs.doKeepAlives()
2983 }
2984 return false
2985}