blob: 345b7cd85dbf6b4d167b4e4073a6684e939e152a [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// TODO: turn off the serve goroutine when idle, so
6// an idle conn only has the readFrames goroutine active. (which could
7// also be optimized probably to pin less memory in crypto/tls). This
8// would involve tracking when the serve goroutine is active (atomic
9// int32 read/CAS probably?) and starting it up when frames arrive,
10// and shutting it down when all handlers exit. the occasional PING
11// packets could use time.AfterFunc to call sc.wakeStartServeLoop()
12// (which is a no-op if already running) and then queue the PING write
13// as normal. The serve loop would then exit in most cases (if no
14// Handlers running) and not be woken up again until the PING packet
15// returns.
16
17// TODO (maybe): add a mechanism for Handlers to going into
18// half-closed-local mode (rw.(io.Closer) test?) but not exit their
19// handler, and continue to be able to read from the
20// Request.Body. This would be a somewhat semantic change from HTTP/1
21// (or at least what we expose in net/http), so I'd probably want to
22// add it there too. For now, this package says that returning from
23// the Handler ServeHTTP function means you're both done reading and
24// done writing, without a way to stop just one or the other.
25
26package http2
27
28import (
29 "bufio"
30 "bytes"
31 "context"
32 "crypto/tls"
33 "errors"
34 "fmt"
35 "io"
36 "log"
37 "math"
38 "net"
39 "net/http"
40 "net/textproto"
41 "net/url"
42 "os"
43 "reflect"
44 "runtime"
45 "strconv"
46 "strings"
47 "sync"
48 "time"
49
50 "golang.org/x/net/http/httpguts"
51 "golang.org/x/net/http2/hpack"
52)
53
54const (
55 prefaceTimeout = 10 * time.Second
56 firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
57 handlerChunkWriteSize = 4 << 10
58 defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
59 maxQueuedControlFrames = 10000
60)
61
62var (
63 errClientDisconnected = errors.New("client disconnected")
64 errClosedBody = errors.New("body closed by handler")
65 errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
66 errStreamClosed = errors.New("http2: stream closed")
67)
68
69var responseWriterStatePool = sync.Pool{
70 New: func() interface{} {
71 rws := &responseWriterState{}
72 rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
73 return rws
74 },
75}
76
77// Test hooks.
78var (
79 testHookOnConn func()
80 testHookGetServerConn func(*serverConn)
81 testHookOnPanicMu *sync.Mutex // nil except in tests
82 testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool)
83)
84
85// Server is an HTTP/2 server.
86type Server struct {
87 // MaxHandlers limits the number of http.Handler ServeHTTP goroutines
88 // which may run at a time over all connections.
89 // Negative or zero no limit.
90 // TODO: implement
91 MaxHandlers int
92
93 // MaxConcurrentStreams optionally specifies the number of
94 // concurrent streams that each client may have open at a
95 // time. This is unrelated to the number of http.Handler goroutines
96 // which may be active globally, which is MaxHandlers.
97 // If zero, MaxConcurrentStreams defaults to at least 100, per
98 // the HTTP/2 spec's recommendations.
99 MaxConcurrentStreams uint32
100
101 // MaxReadFrameSize optionally specifies the largest frame
102 // this server is willing to read. A valid value is between
103 // 16k and 16M, inclusive. If zero or otherwise invalid, a
104 // default value is used.
105 MaxReadFrameSize uint32
106
107 // PermitProhibitedCipherSuites, if true, permits the use of
108 // cipher suites prohibited by the HTTP/2 spec.
109 PermitProhibitedCipherSuites bool
110
111 // IdleTimeout specifies how long until idle clients should be
112 // closed with a GOAWAY frame. PING frames are not considered
113 // activity for the purposes of IdleTimeout.
114 IdleTimeout time.Duration
115
116 // MaxUploadBufferPerConnection is the size of the initial flow
117 // control window for each connections. The HTTP/2 spec does not
118 // allow this to be smaller than 65535 or larger than 2^32-1.
119 // If the value is outside this range, a default value will be
120 // used instead.
121 MaxUploadBufferPerConnection int32
122
123 // MaxUploadBufferPerStream is the size of the initial flow control
124 // window for each stream. The HTTP/2 spec does not allow this to
125 // be larger than 2^32-1. If the value is zero or larger than the
126 // maximum, a default value will be used instead.
127 MaxUploadBufferPerStream int32
128
129 // NewWriteScheduler constructs a write scheduler for a connection.
130 // If nil, a default scheduler is chosen.
131 NewWriteScheduler func() WriteScheduler
132
133 // Internal state. This is a pointer (rather than embedded directly)
134 // so that we don't embed a Mutex in this struct, which will make the
135 // struct non-copyable, which might break some callers.
136 state *serverInternalState
137}
138
139func (s *Server) initialConnRecvWindowSize() int32 {
140 if s.MaxUploadBufferPerConnection > initialWindowSize {
141 return s.MaxUploadBufferPerConnection
142 }
143 return 1 << 20
144}
145
146func (s *Server) initialStreamRecvWindowSize() int32 {
147 if s.MaxUploadBufferPerStream > 0 {
148 return s.MaxUploadBufferPerStream
149 }
150 return 1 << 20
151}
152
153func (s *Server) maxReadFrameSize() uint32 {
154 if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
155 return v
156 }
157 return defaultMaxReadFrameSize
158}
159
160func (s *Server) maxConcurrentStreams() uint32 {
161 if v := s.MaxConcurrentStreams; v > 0 {
162 return v
163 }
164 return defaultMaxStreams
165}
166
167// maxQueuedControlFrames is the maximum number of control frames like
168// SETTINGS, PING and RST_STREAM that will be queued for writing before
169// the connection is closed to prevent memory exhaustion attacks.
170func (s *Server) maxQueuedControlFrames() int {
171 // TODO: if anybody asks, add a Server field, and remember to define the
172 // behavior of negative values.
173 return maxQueuedControlFrames
174}
175
176type serverInternalState struct {
177 mu sync.Mutex
178 activeConns map[*serverConn]struct{}
179}
180
181func (s *serverInternalState) registerConn(sc *serverConn) {
182 if s == nil {
183 return // if the Server was used without calling ConfigureServer
184 }
185 s.mu.Lock()
186 s.activeConns[sc] = struct{}{}
187 s.mu.Unlock()
188}
189
190func (s *serverInternalState) unregisterConn(sc *serverConn) {
191 if s == nil {
192 return // if the Server was used without calling ConfigureServer
193 }
194 s.mu.Lock()
195 delete(s.activeConns, sc)
196 s.mu.Unlock()
197}
198
199func (s *serverInternalState) startGracefulShutdown() {
200 if s == nil {
201 return // if the Server was used without calling ConfigureServer
202 }
203 s.mu.Lock()
204 for sc := range s.activeConns {
205 sc.startGracefulShutdown()
206 }
207 s.mu.Unlock()
208}
209
210// ConfigureServer adds HTTP/2 support to a net/http Server.
211//
212// The configuration conf may be nil.
213//
214// ConfigureServer must be called before s begins serving.
215func ConfigureServer(s *http.Server, conf *Server) error {
216 if s == nil {
217 panic("nil *http.Server")
218 }
219 if conf == nil {
220 conf = new(Server)
221 }
222 conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
223 if h1, h2 := s, conf; h2.IdleTimeout == 0 {
224 if h1.IdleTimeout != 0 {
225 h2.IdleTimeout = h1.IdleTimeout
226 } else {
227 h2.IdleTimeout = h1.ReadTimeout
228 }
229 }
230 s.RegisterOnShutdown(conf.state.startGracefulShutdown)
231
232 if s.TLSConfig == nil {
233 s.TLSConfig = new(tls.Config)
234 } else if s.TLSConfig.CipherSuites != nil {
235 // If they already provided a CipherSuite list, return
236 // an error if it has a bad order or is missing
237 // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
238 haveRequired := false
239 sawBad := false
240 for i, cs := range s.TLSConfig.CipherSuites {
241 switch cs {
242 case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
243 // Alternative MTI cipher to not discourage ECDSA-only servers.
244 // See http://golang.org/cl/30721 for further information.
245 tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
246 haveRequired = true
247 }
248 if isBadCipher(cs) {
249 sawBad = true
250 } else if sawBad {
251 return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", i, cs)
252 }
253 }
254 if !haveRequired {
255 return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher (need at least one of TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 or TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256).")
256 }
257 }
258
259 // Note: not setting MinVersion to tls.VersionTLS12,
260 // as we don't want to interfere with HTTP/1.1 traffic
261 // on the user's server. We enforce TLS 1.2 later once
262 // we accept a connection. Ideally this should be done
263 // during next-proto selection, but using TLS <1.2 with
264 // HTTP/2 is still the client's bug.
265
266 s.TLSConfig.PreferServerCipherSuites = true
267
268 haveNPN := false
269 for _, p := range s.TLSConfig.NextProtos {
270 if p == NextProtoTLS {
271 haveNPN = true
272 break
273 }
274 }
275 if !haveNPN {
276 s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS)
277 }
278
279 if s.TLSNextProto == nil {
280 s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
281 }
282 protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) {
283 if testHookOnConn != nil {
284 testHookOnConn()
285 }
286 // The TLSNextProto interface predates contexts, so
287 // the net/http package passes down its per-connection
288 // base context via an exported but unadvertised
289 // method on the Handler. This is for internal
290 // net/http<=>http2 use only.
291 var ctx context.Context
292 type baseContexter interface {
293 BaseContext() context.Context
294 }
295 if bc, ok := h.(baseContexter); ok {
296 ctx = bc.BaseContext()
297 }
298 conf.ServeConn(c, &ServeConnOpts{
299 Context: ctx,
300 Handler: h,
301 BaseConfig: hs,
302 })
303 }
304 s.TLSNextProto[NextProtoTLS] = protoHandler
305 return nil
306}
307
308// ServeConnOpts are options for the Server.ServeConn method.
309type ServeConnOpts struct {
310 // Context is the base context to use.
311 // If nil, context.Background is used.
312 Context context.Context
313
314 // BaseConfig optionally sets the base configuration
315 // for values. If nil, defaults are used.
316 BaseConfig *http.Server
317
318 // Handler specifies which handler to use for processing
319 // requests. If nil, BaseConfig.Handler is used. If BaseConfig
320 // or BaseConfig.Handler is nil, http.DefaultServeMux is used.
321 Handler http.Handler
322}
323
324func (o *ServeConnOpts) context() context.Context {
325 if o != nil && o.Context != nil {
326 return o.Context
327 }
328 return context.Background()
329}
330
331func (o *ServeConnOpts) baseConfig() *http.Server {
332 if o != nil && o.BaseConfig != nil {
333 return o.BaseConfig
334 }
335 return new(http.Server)
336}
337
338func (o *ServeConnOpts) handler() http.Handler {
339 if o != nil {
340 if o.Handler != nil {
341 return o.Handler
342 }
343 if o.BaseConfig != nil && o.BaseConfig.Handler != nil {
344 return o.BaseConfig.Handler
345 }
346 }
347 return http.DefaultServeMux
348}
349
350// ServeConn serves HTTP/2 requests on the provided connection and
351// blocks until the connection is no longer readable.
352//
353// ServeConn starts speaking HTTP/2 assuming that c has not had any
354// reads or writes. It writes its initial settings frame and expects
355// to be able to read the preface and settings frame from the
356// client. If c has a ConnectionState method like a *tls.Conn, the
357// ConnectionState is used to verify the TLS ciphersuite and to set
358// the Request.TLS field in Handlers.
359//
360// ServeConn does not support h2c by itself. Any h2c support must be
361// implemented in terms of providing a suitably-behaving net.Conn.
362//
363// The opts parameter is optional. If nil, default values are used.
364func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
365 baseCtx, cancel := serverConnBaseContext(c, opts)
366 defer cancel()
367
368 sc := &serverConn{
369 srv: s,
370 hs: opts.baseConfig(),
371 conn: c,
372 baseCtx: baseCtx,
373 remoteAddrStr: c.RemoteAddr().String(),
374 bw: newBufferedWriter(c),
375 handler: opts.handler(),
376 streams: make(map[uint32]*stream),
377 readFrameCh: make(chan readFrameResult),
378 wantWriteFrameCh: make(chan FrameWriteRequest, 8),
379 serveMsgCh: make(chan interface{}, 8),
380 wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
381 bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
382 doneServing: make(chan struct{}),
383 clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
384 advMaxStreams: s.maxConcurrentStreams(),
385 initialStreamSendWindowSize: initialWindowSize,
386 maxFrameSize: initialMaxFrameSize,
387 headerTableSize: initialHeaderTableSize,
388 serveG: newGoroutineLock(),
389 pushEnabled: true,
390 }
391
392 s.state.registerConn(sc)
393 defer s.state.unregisterConn(sc)
394
395 // The net/http package sets the write deadline from the
396 // http.Server.WriteTimeout during the TLS handshake, but then
397 // passes the connection off to us with the deadline already set.
398 // Write deadlines are set per stream in serverConn.newStream.
399 // Disarm the net.Conn write deadline here.
400 if sc.hs.WriteTimeout != 0 {
401 sc.conn.SetWriteDeadline(time.Time{})
402 }
403
404 if s.NewWriteScheduler != nil {
405 sc.writeSched = s.NewWriteScheduler()
406 } else {
407 sc.writeSched = NewRandomWriteScheduler()
408 }
409
410 // These start at the RFC-specified defaults. If there is a higher
411 // configured value for inflow, that will be updated when we send a
412 // WINDOW_UPDATE shortly after sending SETTINGS.
413 sc.flow.add(initialWindowSize)
414 sc.inflow.add(initialWindowSize)
415 sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
416
417 fr := NewFramer(sc.bw, c)
418 fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
419 fr.MaxHeaderListSize = sc.maxHeaderListSize()
420 fr.SetMaxReadFrameSize(s.maxReadFrameSize())
421 sc.framer = fr
422
423 if tc, ok := c.(connectionStater); ok {
424 sc.tlsState = new(tls.ConnectionState)
425 *sc.tlsState = tc.ConnectionState()
426 // 9.2 Use of TLS Features
427 // An implementation of HTTP/2 over TLS MUST use TLS
428 // 1.2 or higher with the restrictions on feature set
429 // and cipher suite described in this section. Due to
430 // implementation limitations, it might not be
431 // possible to fail TLS negotiation. An endpoint MUST
432 // immediately terminate an HTTP/2 connection that
433 // does not meet the TLS requirements described in
434 // this section with a connection error (Section
435 // 5.4.1) of type INADEQUATE_SECURITY.
436 if sc.tlsState.Version < tls.VersionTLS12 {
437 sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low")
438 return
439 }
440
441 if sc.tlsState.ServerName == "" {
442 // Client must use SNI, but we don't enforce that anymore,
443 // since it was causing problems when connecting to bare IP
444 // addresses during development.
445 //
446 // TODO: optionally enforce? Or enforce at the time we receive
447 // a new request, and verify the ServerName matches the :authority?
448 // But that precludes proxy situations, perhaps.
449 //
450 // So for now, do nothing here again.
451 }
452
453 if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
454 // "Endpoints MAY choose to generate a connection error
455 // (Section 5.4.1) of type INADEQUATE_SECURITY if one of
456 // the prohibited cipher suites are negotiated."
457 //
458 // We choose that. In my opinion, the spec is weak
459 // here. It also says both parties must support at least
460 // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no
461 // excuses here. If we really must, we could allow an
462 // "AllowInsecureWeakCiphers" option on the server later.
463 // Let's see how it plays out first.
464 sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite))
465 return
466 }
467 }
468
469 if hook := testHookGetServerConn; hook != nil {
470 hook(sc)
471 }
472 sc.serve()
473}
474
475func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) {
476 ctx, cancel = context.WithCancel(opts.context())
477 ctx = context.WithValue(ctx, http.LocalAddrContextKey, c.LocalAddr())
478 if hs := opts.baseConfig(); hs != nil {
479 ctx = context.WithValue(ctx, http.ServerContextKey, hs)
480 }
481 return
482}
483
484func (sc *serverConn) rejectConn(err ErrCode, debug string) {
485 sc.vlogf("http2: server rejecting conn: %v, %s", err, debug)
486 // ignoring errors. hanging up anyway.
487 sc.framer.WriteGoAway(0, err, []byte(debug))
488 sc.bw.Flush()
489 sc.conn.Close()
490}
491
492type serverConn struct {
493 // Immutable:
494 srv *Server
495 hs *http.Server
496 conn net.Conn
497 bw *bufferedWriter // writing to conn
498 handler http.Handler
499 baseCtx context.Context
500 framer *Framer
501 doneServing chan struct{} // closed when serverConn.serve ends
502 readFrameCh chan readFrameResult // written by serverConn.readFrames
503 wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
504 wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
505 bodyReadCh chan bodyReadMsg // from handlers -> serve
506 serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
507 flow flow // conn-wide (not stream-specific) outbound flow control
508 inflow flow // conn-wide inbound flow control
509 tlsState *tls.ConnectionState // shared by all handlers, like net/http
510 remoteAddrStr string
511 writeSched WriteScheduler
512
513 // Everything following is owned by the serve loop; use serveG.check():
514 serveG goroutineLock // used to verify funcs are on serve()
515 pushEnabled bool
516 sawFirstSettings bool // got the initial SETTINGS frame after the preface
517 needToSendSettingsAck bool
518 unackedSettings int // how many SETTINGS have we sent without ACKs?
519 queuedControlFrames int // control frames in the writeSched queue
520 clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
521 advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
522 curClientStreams uint32 // number of open streams initiated by the client
523 curPushedStreams uint32 // number of open streams initiated by server push
524 maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
525 maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
526 streams map[uint32]*stream
527 initialStreamSendWindowSize int32
528 maxFrameSize int32
529 headerTableSize uint32
530 peerMaxHeaderListSize uint32 // zero means unknown (default)
531 canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
532 writingFrame bool // started writing a frame (on serve goroutine or separate)
533 writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
534 needsFrameFlush bool // last frame write wasn't a flush
535 inGoAway bool // we've started to or sent GOAWAY
536 inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
537 needToSendGoAway bool // we need to schedule a GOAWAY frame write
538 goAwayCode ErrCode
539 shutdownTimer *time.Timer // nil until used
540 idleTimer *time.Timer // nil if unused
541
542 // Owned by the writeFrameAsync goroutine:
543 headerWriteBuf bytes.Buffer
544 hpackEncoder *hpack.Encoder
545
546 // Used by startGracefulShutdown.
547 shutdownOnce sync.Once
548}
549
550func (sc *serverConn) maxHeaderListSize() uint32 {
551 n := sc.hs.MaxHeaderBytes
552 if n <= 0 {
553 n = http.DefaultMaxHeaderBytes
554 }
555 // http2's count is in a slightly different unit and includes 32 bytes per pair.
556 // So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
557 const perFieldOverhead = 32 // per http2 spec
558 const typicalHeaders = 10 // conservative
559 return uint32(n + typicalHeaders*perFieldOverhead)
560}
561
562func (sc *serverConn) curOpenStreams() uint32 {
563 sc.serveG.check()
564 return sc.curClientStreams + sc.curPushedStreams
565}
566
567// stream represents a stream. This is the minimal metadata needed by
568// the serve goroutine. Most of the actual stream state is owned by
569// the http.Handler's goroutine in the responseWriter. Because the
570// responseWriter's responseWriterState is recycled at the end of a
571// handler, this struct intentionally has no pointer to the
572// *responseWriter{,State} itself, as the Handler ending nils out the
573// responseWriter's state field.
574type stream struct {
575 // immutable:
576 sc *serverConn
577 id uint32
578 body *pipe // non-nil if expecting DATA frames
579 cw closeWaiter // closed wait stream transitions to closed state
580 ctx context.Context
581 cancelCtx func()
582
583 // owned by serverConn's serve loop:
584 bodyBytes int64 // body bytes seen so far
585 declBodyBytes int64 // or -1 if undeclared
586 flow flow // limits writing from Handler to client
587 inflow flow // what the client is allowed to POST/etc to us
588 state streamState
589 resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
590 gotTrailerHeader bool // HEADER frame for trailers was seen
591 wroteHeaders bool // whether we wrote headers (not status 100)
592 writeDeadline *time.Timer // nil if unused
593
594 trailer http.Header // accumulated trailers
595 reqTrailer http.Header // handler's Request.Trailer
596}
597
598func (sc *serverConn) Framer() *Framer { return sc.framer }
599func (sc *serverConn) CloseConn() error { return sc.conn.Close() }
600func (sc *serverConn) Flush() error { return sc.bw.Flush() }
601func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
602 return sc.hpackEncoder, &sc.headerWriteBuf
603}
604
605func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
606 sc.serveG.check()
607 // http://tools.ietf.org/html/rfc7540#section-5.1
608 if st, ok := sc.streams[streamID]; ok {
609 return st.state, st
610 }
611 // "The first use of a new stream identifier implicitly closes all
612 // streams in the "idle" state that might have been initiated by
613 // that peer with a lower-valued stream identifier. For example, if
614 // a client sends a HEADERS frame on stream 7 without ever sending a
615 // frame on stream 5, then stream 5 transitions to the "closed"
616 // state when the first frame for stream 7 is sent or received."
617 if streamID%2 == 1 {
618 if streamID <= sc.maxClientStreamID {
619 return stateClosed, nil
620 }
621 } else {
622 if streamID <= sc.maxPushPromiseID {
623 return stateClosed, nil
624 }
625 }
626 return stateIdle, nil
627}
628
629// setConnState calls the net/http ConnState hook for this connection, if configured.
630// Note that the net/http package does StateNew and StateClosed for us.
631// There is currently no plan for StateHijacked or hijacking HTTP/2 connections.
632func (sc *serverConn) setConnState(state http.ConnState) {
633 if sc.hs.ConnState != nil {
634 sc.hs.ConnState(sc.conn, state)
635 }
636}
637
638func (sc *serverConn) vlogf(format string, args ...interface{}) {
639 if VerboseLogs {
640 sc.logf(format, args...)
641 }
642}
643
644func (sc *serverConn) logf(format string, args ...interface{}) {
645 if lg := sc.hs.ErrorLog; lg != nil {
646 lg.Printf(format, args...)
647 } else {
648 log.Printf(format, args...)
649 }
650}
651
652// errno returns v's underlying uintptr, else 0.
653//
654// TODO: remove this helper function once http2 can use build
655// tags. See comment in isClosedConnError.
656func errno(v error) uintptr {
657 if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr {
658 return uintptr(rv.Uint())
659 }
660 return 0
661}
662
663// isClosedConnError reports whether err is an error from use of a closed
664// network connection.
665func isClosedConnError(err error) bool {
666 if err == nil {
667 return false
668 }
669
670 // TODO: remove this string search and be more like the Windows
671 // case below. That might involve modifying the standard library
672 // to return better error types.
673 str := err.Error()
674 if strings.Contains(str, "use of closed network connection") {
675 return true
676 }
677
678 // TODO(bradfitz): x/tools/cmd/bundle doesn't really support
679 // build tags, so I can't make an http2_windows.go file with
680 // Windows-specific stuff. Fix that and move this, once we
681 // have a way to bundle this into std's net/http somehow.
682 if runtime.GOOS == "windows" {
683 if oe, ok := err.(*net.OpError); ok && oe.Op == "read" {
684 if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" {
685 const WSAECONNABORTED = 10053
686 const WSAECONNRESET = 10054
687 if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED {
688 return true
689 }
690 }
691 }
692 }
693 return false
694}
695
696func (sc *serverConn) condlogf(err error, format string, args ...interface{}) {
697 if err == nil {
698 return
699 }
700 if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout {
701 // Boring, expected errors.
702 sc.vlogf(format, args...)
703 } else {
704 sc.logf(format, args...)
705 }
706}
707
708func (sc *serverConn) canonicalHeader(v string) string {
709 sc.serveG.check()
710 buildCommonHeaderMapsOnce()
711 cv, ok := commonCanonHeader[v]
712 if ok {
713 return cv
714 }
715 cv, ok = sc.canonHeader[v]
716 if ok {
717 return cv
718 }
719 if sc.canonHeader == nil {
720 sc.canonHeader = make(map[string]string)
721 }
722 cv = http.CanonicalHeaderKey(v)
723 sc.canonHeader[v] = cv
724 return cv
725}
726
727type readFrameResult struct {
728 f Frame // valid until readMore is called
729 err error
730
731 // readMore should be called once the consumer no longer needs or
732 // retains f. After readMore, f is invalid and more frames can be
733 // read.
734 readMore func()
735}
736
737// readFrames is the loop that reads incoming frames.
738// It takes care to only read one frame at a time, blocking until the
739// consumer is done with the frame.
740// It's run on its own goroutine.
741func (sc *serverConn) readFrames() {
742 gate := make(gate)
743 gateDone := gate.Done
744 for {
745 f, err := sc.framer.ReadFrame()
746 select {
747 case sc.readFrameCh <- readFrameResult{f, err, gateDone}:
748 case <-sc.doneServing:
749 return
750 }
751 select {
752 case <-gate:
753 case <-sc.doneServing:
754 return
755 }
756 if terminalReadFrameError(err) {
757 return
758 }
759 }
760}
761
762// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine.
763type frameWriteResult struct {
764 _ incomparable
765 wr FrameWriteRequest // what was written (or attempted)
766 err error // result of the writeFrame call
767}
768
769// writeFrameAsync runs in its own goroutine and writes a single frame
770// and then reports when it's done.
771// At most one goroutine can be running writeFrameAsync at a time per
772// serverConn.
773func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) {
774 err := wr.write.writeFrame(sc)
775 sc.wroteFrameCh <- frameWriteResult{wr: wr, err: err}
776}
777
778func (sc *serverConn) closeAllStreamsOnConnClose() {
779 sc.serveG.check()
780 for _, st := range sc.streams {
781 sc.closeStream(st, errClientDisconnected)
782 }
783}
784
785func (sc *serverConn) stopShutdownTimer() {
786 sc.serveG.check()
787 if t := sc.shutdownTimer; t != nil {
788 t.Stop()
789 }
790}
791
792func (sc *serverConn) notePanic() {
793 // Note: this is for serverConn.serve panicking, not http.Handler code.
794 if testHookOnPanicMu != nil {
795 testHookOnPanicMu.Lock()
796 defer testHookOnPanicMu.Unlock()
797 }
798 if testHookOnPanic != nil {
799 if e := recover(); e != nil {
800 if testHookOnPanic(sc, e) {
801 panic(e)
802 }
803 }
804 }
805}
806
807func (sc *serverConn) serve() {
808 sc.serveG.check()
809 defer sc.notePanic()
810 defer sc.conn.Close()
811 defer sc.closeAllStreamsOnConnClose()
812 defer sc.stopShutdownTimer()
813 defer close(sc.doneServing) // unblocks handlers trying to send
814
815 if VerboseLogs {
816 sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
817 }
818
819 sc.writeFrame(FrameWriteRequest{
820 write: writeSettings{
821 {SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
822 {SettingMaxConcurrentStreams, sc.advMaxStreams},
823 {SettingMaxHeaderListSize, sc.maxHeaderListSize()},
824 {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
825 },
826 })
827 sc.unackedSettings++
828
829 // Each connection starts with intialWindowSize inflow tokens.
830 // If a higher value is configured, we add more tokens.
831 if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
832 sc.sendWindowUpdate(nil, int(diff))
833 }
834
835 if err := sc.readPreface(); err != nil {
836 sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
837 return
838 }
839 // Now that we've got the preface, get us out of the
840 // "StateNew" state. We can't go directly to idle, though.
841 // Active means we read some data and anticipate a request. We'll
842 // do another Active when we get a HEADERS frame.
843 sc.setConnState(http.StateActive)
844 sc.setConnState(http.StateIdle)
845
846 if sc.srv.IdleTimeout != 0 {
847 sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
848 defer sc.idleTimer.Stop()
849 }
850
851 go sc.readFrames() // closed by defer sc.conn.Close above
852
853 settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
854 defer settingsTimer.Stop()
855
856 loopNum := 0
857 for {
858 loopNum++
859 select {
860 case wr := <-sc.wantWriteFrameCh:
861 if se, ok := wr.write.(StreamError); ok {
862 sc.resetStream(se)
863 break
864 }
865 sc.writeFrame(wr)
866 case res := <-sc.wroteFrameCh:
867 sc.wroteFrame(res)
868 case res := <-sc.readFrameCh:
869 if !sc.processFrameFromReader(res) {
870 return
871 }
872 res.readMore()
873 if settingsTimer != nil {
874 settingsTimer.Stop()
875 settingsTimer = nil
876 }
877 case m := <-sc.bodyReadCh:
878 sc.noteBodyRead(m.st, m.n)
879 case msg := <-sc.serveMsgCh:
880 switch v := msg.(type) {
881 case func(int):
882 v(loopNum) // for testing
883 case *serverMessage:
884 switch v {
885 case settingsTimerMsg:
886 sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
887 return
888 case idleTimerMsg:
889 sc.vlogf("connection is idle")
890 sc.goAway(ErrCodeNo)
891 case shutdownTimerMsg:
892 sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
893 return
894 case gracefulShutdownMsg:
895 sc.startGracefulShutdownInternal()
896 default:
897 panic("unknown timer")
898 }
899 case *startPushRequest:
900 sc.startPush(v)
901 default:
902 panic(fmt.Sprintf("unexpected type %T", v))
903 }
904 }
905
906 // If the peer is causing us to generate a lot of control frames,
907 // but not reading them from us, assume they are trying to make us
908 // run out of memory.
909 if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() {
910 sc.vlogf("http2: too many control frames in send queue, closing connection")
911 return
912 }
913
914 // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY
915 // with no error code (graceful shutdown), don't start the timer until
916 // all open streams have been completed.
917 sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame
918 gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0
919 if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) {
920 sc.shutDownIn(goAwayTimeout)
921 }
922 }
923}
924
925func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
926 select {
927 case <-sc.doneServing:
928 case <-sharedCh:
929 close(privateCh)
930 }
931}
932
933type serverMessage int
934
935// Message values sent to serveMsgCh.
936var (
937 settingsTimerMsg = new(serverMessage)
938 idleTimerMsg = new(serverMessage)
939 shutdownTimerMsg = new(serverMessage)
940 gracefulShutdownMsg = new(serverMessage)
941)
942
943func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
944func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) }
945func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
946
947func (sc *serverConn) sendServeMsg(msg interface{}) {
948 sc.serveG.checkNotOn() // NOT
949 select {
950 case sc.serveMsgCh <- msg:
951 case <-sc.doneServing:
952 }
953}
954
955var errPrefaceTimeout = errors.New("timeout waiting for client preface")
956
957// readPreface reads the ClientPreface greeting from the peer or
958// returns errPrefaceTimeout on timeout, or an error if the greeting
959// is invalid.
960func (sc *serverConn) readPreface() error {
961 errc := make(chan error, 1)
962 go func() {
963 // Read the client preface
964 buf := make([]byte, len(ClientPreface))
965 if _, err := io.ReadFull(sc.conn, buf); err != nil {
966 errc <- err
967 } else if !bytes.Equal(buf, clientPreface) {
968 errc <- fmt.Errorf("bogus greeting %q", buf)
969 } else {
970 errc <- nil
971 }
972 }()
973 timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
974 defer timer.Stop()
975 select {
976 case <-timer.C:
977 return errPrefaceTimeout
978 case err := <-errc:
979 if err == nil {
980 if VerboseLogs {
981 sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr())
982 }
983 }
984 return err
985 }
986}
987
988var errChanPool = sync.Pool{
989 New: func() interface{} { return make(chan error, 1) },
990}
991
992var writeDataPool = sync.Pool{
993 New: func() interface{} { return new(writeData) },
994}
995
996// writeDataFromHandler writes DATA response frames from a handler on
997// the given stream.
998func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
999 ch := errChanPool.Get().(chan error)
1000 writeArg := writeDataPool.Get().(*writeData)
1001 *writeArg = writeData{stream.id, data, endStream}
1002 err := sc.writeFrameFromHandler(FrameWriteRequest{
1003 write: writeArg,
1004 stream: stream,
1005 done: ch,
1006 })
1007 if err != nil {
1008 return err
1009 }
1010 var frameWriteDone bool // the frame write is done (successfully or not)
1011 select {
1012 case err = <-ch:
1013 frameWriteDone = true
1014 case <-sc.doneServing:
1015 return errClientDisconnected
1016 case <-stream.cw:
1017 // If both ch and stream.cw were ready (as might
1018 // happen on the final Write after an http.Handler
1019 // ends), prefer the write result. Otherwise this
1020 // might just be us successfully closing the stream.
1021 // The writeFrameAsync and serve goroutines guarantee
1022 // that the ch send will happen before the stream.cw
1023 // close.
1024 select {
1025 case err = <-ch:
1026 frameWriteDone = true
1027 default:
1028 return errStreamClosed
1029 }
1030 }
1031 errChanPool.Put(ch)
1032 if frameWriteDone {
1033 writeDataPool.Put(writeArg)
1034 }
1035 return err
1036}
1037
1038// writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts
1039// if the connection has gone away.
1040//
1041// This must not be run from the serve goroutine itself, else it might
1042// deadlock writing to sc.wantWriteFrameCh (which is only mildly
1043// buffered and is read by serve itself). If you're on the serve
1044// goroutine, call writeFrame instead.
1045func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
1046 sc.serveG.checkNotOn() // NOT
1047 select {
1048 case sc.wantWriteFrameCh <- wr:
1049 return nil
1050 case <-sc.doneServing:
1051 // Serve loop is gone.
1052 // Client has closed their connection to the server.
1053 return errClientDisconnected
1054 }
1055}
1056
1057// writeFrame schedules a frame to write and sends it if there's nothing
1058// already being written.
1059//
1060// There is no pushback here (the serve goroutine never blocks). It's
1061// the http.Handlers that block, waiting for their previous frames to
1062// make it onto the wire
1063//
1064// If you're not on the serve goroutine, use writeFrameFromHandler instead.
1065func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
1066 sc.serveG.check()
1067
1068 // If true, wr will not be written and wr.done will not be signaled.
1069 var ignoreWrite bool
1070
1071 // We are not allowed to write frames on closed streams. RFC 7540 Section
1072 // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
1073 // a closed stream." Our server never sends PRIORITY, so that exception
1074 // does not apply.
1075 //
1076 // The serverConn might close an open stream while the stream's handler
1077 // is still running. For example, the server might close a stream when it
1078 // receives bad data from the client. If this happens, the handler might
1079 // attempt to write a frame after the stream has been closed (since the
1080 // handler hasn't yet been notified of the close). In this case, we simply
1081 // ignore the frame. The handler will notice that the stream is closed when
1082 // it waits for the frame to be written.
1083 //
1084 // As an exception to this rule, we allow sending RST_STREAM after close.
1085 // This allows us to immediately reject new streams without tracking any
1086 // state for those streams (except for the queued RST_STREAM frame). This
1087 // may result in duplicate RST_STREAMs in some cases, but the client should
1088 // ignore those.
1089 if wr.StreamID() != 0 {
1090 _, isReset := wr.write.(StreamError)
1091 if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
1092 ignoreWrite = true
1093 }
1094 }
1095
1096 // Don't send a 100-continue response if we've already sent headers.
1097 // See golang.org/issue/14030.
1098 switch wr.write.(type) {
1099 case *writeResHeaders:
1100 wr.stream.wroteHeaders = true
1101 case write100ContinueHeadersFrame:
1102 if wr.stream.wroteHeaders {
1103 // We do not need to notify wr.done because this frame is
1104 // never written with wr.done != nil.
1105 if wr.done != nil {
1106 panic("wr.done != nil for write100ContinueHeadersFrame")
1107 }
1108 ignoreWrite = true
1109 }
1110 }
1111
1112 if !ignoreWrite {
1113 if wr.isControl() {
1114 sc.queuedControlFrames++
1115 // For extra safety, detect wraparounds, which should not happen,
1116 // and pull the plug.
1117 if sc.queuedControlFrames < 0 {
1118 sc.conn.Close()
1119 }
1120 }
1121 sc.writeSched.Push(wr)
1122 }
1123 sc.scheduleFrameWrite()
1124}
1125
1126// startFrameWrite starts a goroutine to write wr (in a separate
1127// goroutine since that might block on the network), and updates the
1128// serve goroutine's state about the world, updated from info in wr.
1129func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
1130 sc.serveG.check()
1131 if sc.writingFrame {
1132 panic("internal error: can only be writing one frame at a time")
1133 }
1134
1135 st := wr.stream
1136 if st != nil {
1137 switch st.state {
1138 case stateHalfClosedLocal:
1139 switch wr.write.(type) {
1140 case StreamError, handlerPanicRST, writeWindowUpdate:
1141 // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
1142 // in this state. (We never send PRIORITY from the server, so that is not checked.)
1143 default:
1144 panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
1145 }
1146 case stateClosed:
1147 panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr))
1148 }
1149 }
1150 if wpp, ok := wr.write.(*writePushPromise); ok {
1151 var err error
1152 wpp.promisedID, err = wpp.allocatePromisedID()
1153 if err != nil {
1154 sc.writingFrameAsync = false
1155 wr.replyToWriter(err)
1156 return
1157 }
1158 }
1159
1160 sc.writingFrame = true
1161 sc.needsFrameFlush = true
1162 if wr.write.staysWithinBuffer(sc.bw.Available()) {
1163 sc.writingFrameAsync = false
1164 err := wr.write.writeFrame(sc)
1165 sc.wroteFrame(frameWriteResult{wr: wr, err: err})
1166 } else {
1167 sc.writingFrameAsync = true
1168 go sc.writeFrameAsync(wr)
1169 }
1170}
1171
1172// errHandlerPanicked is the error given to any callers blocked in a read from
1173// Request.Body when the main goroutine panics. Since most handlers read in the
1174// main ServeHTTP goroutine, this will show up rarely.
1175var errHandlerPanicked = errors.New("http2: handler panicked")
1176
1177// wroteFrame is called on the serve goroutine with the result of
1178// whatever happened on writeFrameAsync.
1179func (sc *serverConn) wroteFrame(res frameWriteResult) {
1180 sc.serveG.check()
1181 if !sc.writingFrame {
1182 panic("internal error: expected to be already writing a frame")
1183 }
1184 sc.writingFrame = false
1185 sc.writingFrameAsync = false
1186
1187 wr := res.wr
1188
1189 if writeEndsStream(wr.write) {
1190 st := wr.stream
1191 if st == nil {
1192 panic("internal error: expecting non-nil stream")
1193 }
1194 switch st.state {
1195 case stateOpen:
1196 // Here we would go to stateHalfClosedLocal in
1197 // theory, but since our handler is done and
1198 // the net/http package provides no mechanism
1199 // for closing a ResponseWriter while still
1200 // reading data (see possible TODO at top of
1201 // this file), we go into closed state here
1202 // anyway, after telling the peer we're
1203 // hanging up on them. We'll transition to
1204 // stateClosed after the RST_STREAM frame is
1205 // written.
1206 st.state = stateHalfClosedLocal
1207 // Section 8.1: a server MAY request that the client abort
1208 // transmission of a request without error by sending a
1209 // RST_STREAM with an error code of NO_ERROR after sending
1210 // a complete response.
1211 sc.resetStream(streamError(st.id, ErrCodeNo))
1212 case stateHalfClosedRemote:
1213 sc.closeStream(st, errHandlerComplete)
1214 }
1215 } else {
1216 switch v := wr.write.(type) {
1217 case StreamError:
1218 // st may be unknown if the RST_STREAM was generated to reject bad input.
1219 if st, ok := sc.streams[v.StreamID]; ok {
1220 sc.closeStream(st, v)
1221 }
1222 case handlerPanicRST:
1223 sc.closeStream(wr.stream, errHandlerPanicked)
1224 }
1225 }
1226
1227 // Reply (if requested) to unblock the ServeHTTP goroutine.
1228 wr.replyToWriter(res.err)
1229
1230 sc.scheduleFrameWrite()
1231}
1232
1233// scheduleFrameWrite tickles the frame writing scheduler.
1234//
1235// If a frame is already being written, nothing happens. This will be called again
1236// when the frame is done being written.
1237//
1238// If a frame isn't being written and we need to send one, the best frame
1239// to send is selected by writeSched.
1240//
1241// If a frame isn't being written and there's nothing else to send, we
1242// flush the write buffer.
1243func (sc *serverConn) scheduleFrameWrite() {
1244 sc.serveG.check()
1245 if sc.writingFrame || sc.inFrameScheduleLoop {
1246 return
1247 }
1248 sc.inFrameScheduleLoop = true
1249 for !sc.writingFrameAsync {
1250 if sc.needToSendGoAway {
1251 sc.needToSendGoAway = false
1252 sc.startFrameWrite(FrameWriteRequest{
1253 write: &writeGoAway{
1254 maxStreamID: sc.maxClientStreamID,
1255 code: sc.goAwayCode,
1256 },
1257 })
1258 continue
1259 }
1260 if sc.needToSendSettingsAck {
1261 sc.needToSendSettingsAck = false
1262 sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
1263 continue
1264 }
1265 if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
1266 if wr, ok := sc.writeSched.Pop(); ok {
1267 if wr.isControl() {
1268 sc.queuedControlFrames--
1269 }
1270 sc.startFrameWrite(wr)
1271 continue
1272 }
1273 }
1274 if sc.needsFrameFlush {
1275 sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
1276 sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
1277 continue
1278 }
1279 break
1280 }
1281 sc.inFrameScheduleLoop = false
1282}
1283
1284// startGracefulShutdown gracefully shuts down a connection. This
1285// sends GOAWAY with ErrCodeNo to tell the client we're gracefully
1286// shutting down. The connection isn't closed until all current
1287// streams are done.
1288//
1289// startGracefulShutdown returns immediately; it does not wait until
1290// the connection has shut down.
1291func (sc *serverConn) startGracefulShutdown() {
1292 sc.serveG.checkNotOn() // NOT
1293 sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) })
1294}
1295
1296// After sending GOAWAY, the connection will close after goAwayTimeout.
1297// If we close the connection immediately after sending GOAWAY, there may
1298// be unsent data in our kernel receive buffer, which will cause the kernel
1299// to send a TCP RST on close() instead of a FIN. This RST will abort the
1300// connection immediately, whether or not the client had received the GOAWAY.
1301//
1302// Ideally we should delay for at least 1 RTT + epsilon so the client has
1303// a chance to read the GOAWAY and stop sending messages. Measuring RTT
1304// is hard, so we approximate with 1 second. See golang.org/issue/18701.
1305//
1306// This is a var so it can be shorter in tests, where all requests uses the
1307// loopback interface making the expected RTT very small.
1308//
1309// TODO: configurable?
1310var goAwayTimeout = 1 * time.Second
1311
1312func (sc *serverConn) startGracefulShutdownInternal() {
1313 sc.goAway(ErrCodeNo)
1314}
1315
1316func (sc *serverConn) goAway(code ErrCode) {
1317 sc.serveG.check()
1318 if sc.inGoAway {
1319 return
1320 }
1321 sc.inGoAway = true
1322 sc.needToSendGoAway = true
1323 sc.goAwayCode = code
1324 sc.scheduleFrameWrite()
1325}
1326
1327func (sc *serverConn) shutDownIn(d time.Duration) {
1328 sc.serveG.check()
1329 sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
1330}
1331
1332func (sc *serverConn) resetStream(se StreamError) {
1333 sc.serveG.check()
1334 sc.writeFrame(FrameWriteRequest{write: se})
1335 if st, ok := sc.streams[se.StreamID]; ok {
1336 st.resetQueued = true
1337 }
1338}
1339
1340// processFrameFromReader processes the serve loop's read from readFrameCh from the
1341// frame-reading goroutine.
1342// processFrameFromReader returns whether the connection should be kept open.
1343func (sc *serverConn) processFrameFromReader(res readFrameResult) bool {
1344 sc.serveG.check()
1345 err := res.err
1346 if err != nil {
1347 if err == ErrFrameTooLarge {
1348 sc.goAway(ErrCodeFrameSize)
1349 return true // goAway will close the loop
1350 }
1351 clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err)
1352 if clientGone {
1353 // TODO: could we also get into this state if
1354 // the peer does a half close
1355 // (e.g. CloseWrite) because they're done
1356 // sending frames but they're still wanting
1357 // our open replies? Investigate.
1358 // TODO: add CloseWrite to crypto/tls.Conn first
1359 // so we have a way to test this? I suppose
1360 // just for testing we could have a non-TLS mode.
1361 return false
1362 }
1363 } else {
1364 f := res.f
1365 if VerboseLogs {
1366 sc.vlogf("http2: server read frame %v", summarizeFrame(f))
1367 }
1368 err = sc.processFrame(f)
1369 if err == nil {
1370 return true
1371 }
1372 }
1373
1374 switch ev := err.(type) {
1375 case StreamError:
1376 sc.resetStream(ev)
1377 return true
1378 case goAwayFlowError:
1379 sc.goAway(ErrCodeFlowControl)
1380 return true
1381 case ConnectionError:
1382 sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
1383 sc.goAway(ErrCode(ev))
1384 return true // goAway will handle shutdown
1385 default:
1386 if res.err != nil {
1387 sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
1388 } else {
1389 sc.logf("http2: server closing client connection: %v", err)
1390 }
1391 return false
1392 }
1393}
1394
1395func (sc *serverConn) processFrame(f Frame) error {
1396 sc.serveG.check()
1397
1398 // First frame received must be SETTINGS.
1399 if !sc.sawFirstSettings {
1400 if _, ok := f.(*SettingsFrame); !ok {
1401 return ConnectionError(ErrCodeProtocol)
1402 }
1403 sc.sawFirstSettings = true
1404 }
1405
1406 switch f := f.(type) {
1407 case *SettingsFrame:
1408 return sc.processSettings(f)
1409 case *MetaHeadersFrame:
1410 return sc.processHeaders(f)
1411 case *WindowUpdateFrame:
1412 return sc.processWindowUpdate(f)
1413 case *PingFrame:
1414 return sc.processPing(f)
1415 case *DataFrame:
1416 return sc.processData(f)
1417 case *RSTStreamFrame:
1418 return sc.processResetStream(f)
1419 case *PriorityFrame:
1420 return sc.processPriority(f)
1421 case *GoAwayFrame:
1422 return sc.processGoAway(f)
1423 case *PushPromiseFrame:
1424 // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
1425 // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1426 return ConnectionError(ErrCodeProtocol)
1427 default:
1428 sc.vlogf("http2: server ignoring frame: %v", f.Header())
1429 return nil
1430 }
1431}
1432
1433func (sc *serverConn) processPing(f *PingFrame) error {
1434 sc.serveG.check()
1435 if f.IsAck() {
1436 // 6.7 PING: " An endpoint MUST NOT respond to PING frames
1437 // containing this flag."
1438 return nil
1439 }
1440 if f.StreamID != 0 {
1441 // "PING frames are not associated with any individual
1442 // stream. If a PING frame is received with a stream
1443 // identifier field value other than 0x0, the recipient MUST
1444 // respond with a connection error (Section 5.4.1) of type
1445 // PROTOCOL_ERROR."
1446 return ConnectionError(ErrCodeProtocol)
1447 }
1448 if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1449 return nil
1450 }
1451 sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
1452 return nil
1453}
1454
1455func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
1456 sc.serveG.check()
1457 switch {
1458 case f.StreamID != 0: // stream-level flow control
1459 state, st := sc.state(f.StreamID)
1460 if state == stateIdle {
1461 // Section 5.1: "Receiving any frame other than HEADERS
1462 // or PRIORITY on a stream in this state MUST be
1463 // treated as a connection error (Section 5.4.1) of
1464 // type PROTOCOL_ERROR."
1465 return ConnectionError(ErrCodeProtocol)
1466 }
1467 if st == nil {
1468 // "WINDOW_UPDATE can be sent by a peer that has sent a
1469 // frame bearing the END_STREAM flag. This means that a
1470 // receiver could receive a WINDOW_UPDATE frame on a "half
1471 // closed (remote)" or "closed" stream. A receiver MUST
1472 // NOT treat this as an error, see Section 5.1."
1473 return nil
1474 }
1475 if !st.flow.add(int32(f.Increment)) {
1476 return streamError(f.StreamID, ErrCodeFlowControl)
1477 }
1478 default: // connection-level flow control
1479 if !sc.flow.add(int32(f.Increment)) {
1480 return goAwayFlowError{}
1481 }
1482 }
1483 sc.scheduleFrameWrite()
1484 return nil
1485}
1486
1487func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
1488 sc.serveG.check()
1489
1490 state, st := sc.state(f.StreamID)
1491 if state == stateIdle {
1492 // 6.4 "RST_STREAM frames MUST NOT be sent for a
1493 // stream in the "idle" state. If a RST_STREAM frame
1494 // identifying an idle stream is received, the
1495 // recipient MUST treat this as a connection error
1496 // (Section 5.4.1) of type PROTOCOL_ERROR.
1497 return ConnectionError(ErrCodeProtocol)
1498 }
1499 if st != nil {
1500 st.cancelCtx()
1501 sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
1502 }
1503 return nil
1504}
1505
1506func (sc *serverConn) closeStream(st *stream, err error) {
1507 sc.serveG.check()
1508 if st.state == stateIdle || st.state == stateClosed {
1509 panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
1510 }
1511 st.state = stateClosed
1512 if st.writeDeadline != nil {
1513 st.writeDeadline.Stop()
1514 }
1515 if st.isPushed() {
1516 sc.curPushedStreams--
1517 } else {
1518 sc.curClientStreams--
1519 }
1520 delete(sc.streams, st.id)
1521 if len(sc.streams) == 0 {
1522 sc.setConnState(http.StateIdle)
1523 if sc.srv.IdleTimeout != 0 {
1524 sc.idleTimer.Reset(sc.srv.IdleTimeout)
1525 }
1526 if h1ServerKeepAlivesDisabled(sc.hs) {
1527 sc.startGracefulShutdownInternal()
1528 }
1529 }
1530 if p := st.body; p != nil {
1531 // Return any buffered unread bytes worth of conn-level flow control.
1532 // See golang.org/issue/16481
1533 sc.sendWindowUpdate(nil, p.Len())
1534
1535 p.CloseWithError(err)
1536 }
1537 st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
1538 sc.writeSched.CloseStream(st.id)
1539}
1540
1541func (sc *serverConn) processSettings(f *SettingsFrame) error {
1542 sc.serveG.check()
1543 if f.IsAck() {
1544 sc.unackedSettings--
1545 if sc.unackedSettings < 0 {
1546 // Why is the peer ACKing settings we never sent?
1547 // The spec doesn't mention this case, but
1548 // hang up on them anyway.
1549 return ConnectionError(ErrCodeProtocol)
1550 }
1551 return nil
1552 }
1553 if f.NumSettings() > 100 || f.HasDuplicates() {
1554 // This isn't actually in the spec, but hang up on
1555 // suspiciously large settings frames or those with
1556 // duplicate entries.
1557 return ConnectionError(ErrCodeProtocol)
1558 }
1559 if err := f.ForeachSetting(sc.processSetting); err != nil {
1560 return err
1561 }
1562 // TODO: judging by RFC 7540, Section 6.5.3 each SETTINGS frame should be
1563 // acknowledged individually, even if multiple are received before the ACK.
1564 sc.needToSendSettingsAck = true
1565 sc.scheduleFrameWrite()
1566 return nil
1567}
1568
1569func (sc *serverConn) processSetting(s Setting) error {
1570 sc.serveG.check()
1571 if err := s.Valid(); err != nil {
1572 return err
1573 }
1574 if VerboseLogs {
1575 sc.vlogf("http2: server processing setting %v", s)
1576 }
1577 switch s.ID {
1578 case SettingHeaderTableSize:
1579 sc.headerTableSize = s.Val
1580 sc.hpackEncoder.SetMaxDynamicTableSize(s.Val)
1581 case SettingEnablePush:
1582 sc.pushEnabled = s.Val != 0
1583 case SettingMaxConcurrentStreams:
1584 sc.clientMaxStreams = s.Val
1585 case SettingInitialWindowSize:
1586 return sc.processSettingInitialWindowSize(s.Val)
1587 case SettingMaxFrameSize:
1588 sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
1589 case SettingMaxHeaderListSize:
1590 sc.peerMaxHeaderListSize = s.Val
1591 default:
1592 // Unknown setting: "An endpoint that receives a SETTINGS
1593 // frame with any unknown or unsupported identifier MUST
1594 // ignore that setting."
1595 if VerboseLogs {
1596 sc.vlogf("http2: server ignoring unknown setting %v", s)
1597 }
1598 }
1599 return nil
1600}
1601
1602func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
1603 sc.serveG.check()
1604 // Note: val already validated to be within range by
1605 // processSetting's Valid call.
1606
1607 // "A SETTINGS frame can alter the initial flow control window
1608 // size for all current streams. When the value of
1609 // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
1610 // adjust the size of all stream flow control windows that it
1611 // maintains by the difference between the new value and the
1612 // old value."
1613 old := sc.initialStreamSendWindowSize
1614 sc.initialStreamSendWindowSize = int32(val)
1615 growth := int32(val) - old // may be negative
1616 for _, st := range sc.streams {
1617 if !st.flow.add(growth) {
1618 // 6.9.2 Initial Flow Control Window Size
1619 // "An endpoint MUST treat a change to
1620 // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
1621 // control window to exceed the maximum size as a
1622 // connection error (Section 5.4.1) of type
1623 // FLOW_CONTROL_ERROR."
1624 return ConnectionError(ErrCodeFlowControl)
1625 }
1626 }
1627 return nil
1628}
1629
1630func (sc *serverConn) processData(f *DataFrame) error {
1631 sc.serveG.check()
1632 if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1633 return nil
1634 }
1635 data := f.Data()
1636
1637 // "If a DATA frame is received whose stream is not in "open"
1638 // or "half closed (local)" state, the recipient MUST respond
1639 // with a stream error (Section 5.4.2) of type STREAM_CLOSED."
1640 id := f.Header().StreamID
1641 state, st := sc.state(id)
1642 if id == 0 || state == stateIdle {
1643 // Section 5.1: "Receiving any frame other than HEADERS
1644 // or PRIORITY on a stream in this state MUST be
1645 // treated as a connection error (Section 5.4.1) of
1646 // type PROTOCOL_ERROR."
1647 return ConnectionError(ErrCodeProtocol)
1648 }
1649 if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
1650 // This includes sending a RST_STREAM if the stream is
1651 // in stateHalfClosedLocal (which currently means that
1652 // the http.Handler returned, so it's done reading &
1653 // done writing). Try to stop the client from sending
1654 // more DATA.
1655
1656 // But still enforce their connection-level flow control,
1657 // and return any flow control bytes since we're not going
1658 // to consume them.
1659 if sc.inflow.available() < int32(f.Length) {
1660 return streamError(id, ErrCodeFlowControl)
1661 }
1662 // Deduct the flow control from inflow, since we're
1663 // going to immediately add it back in
1664 // sendWindowUpdate, which also schedules sending the
1665 // frames.
1666 sc.inflow.take(int32(f.Length))
1667 sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
1668
1669 if st != nil && st.resetQueued {
1670 // Already have a stream error in flight. Don't send another.
1671 return nil
1672 }
1673 return streamError(id, ErrCodeStreamClosed)
1674 }
1675 if st.body == nil {
1676 panic("internal error: should have a body in this state")
1677 }
1678
1679 // Sender sending more than they'd declared?
1680 if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
1681 st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
1682 // RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
1683 // value of a content-length header field does not equal the sum of the
1684 // DATA frame payload lengths that form the body.
1685 return streamError(id, ErrCodeProtocol)
1686 }
1687 if f.Length > 0 {
1688 // Check whether the client has flow control quota.
1689 if st.inflow.available() < int32(f.Length) {
1690 return streamError(id, ErrCodeFlowControl)
1691 }
1692 st.inflow.take(int32(f.Length))
1693
1694 if len(data) > 0 {
1695 wrote, err := st.body.Write(data)
1696 if err != nil {
1697 return streamError(id, ErrCodeStreamClosed)
1698 }
1699 if wrote != len(data) {
1700 panic("internal error: bad Writer")
1701 }
1702 st.bodyBytes += int64(len(data))
1703 }
1704
1705 // Return any padded flow control now, since we won't
1706 // refund it later on body reads.
1707 if pad := int32(f.Length) - int32(len(data)); pad > 0 {
1708 sc.sendWindowUpdate32(nil, pad)
1709 sc.sendWindowUpdate32(st, pad)
1710 }
1711 }
1712 if f.StreamEnded() {
1713 st.endStream()
1714 }
1715 return nil
1716}
1717
1718func (sc *serverConn) processGoAway(f *GoAwayFrame) error {
1719 sc.serveG.check()
1720 if f.ErrCode != ErrCodeNo {
1721 sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1722 } else {
1723 sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1724 }
1725 sc.startGracefulShutdownInternal()
1726 // http://tools.ietf.org/html/rfc7540#section-6.8
1727 // We should not create any new streams, which means we should disable push.
1728 sc.pushEnabled = false
1729 return nil
1730}
1731
1732// isPushed reports whether the stream is server-initiated.
1733func (st *stream) isPushed() bool {
1734 return st.id%2 == 0
1735}
1736
1737// endStream closes a Request.Body's pipe. It is called when a DATA
1738// frame says a request body is over (or after trailers).
1739func (st *stream) endStream() {
1740 sc := st.sc
1741 sc.serveG.check()
1742
1743 if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
1744 st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
1745 st.declBodyBytes, st.bodyBytes))
1746 } else {
1747 st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest)
1748 st.body.CloseWithError(io.EOF)
1749 }
1750 st.state = stateHalfClosedRemote
1751}
1752
1753// copyTrailersToHandlerRequest is run in the Handler's goroutine in
1754// its Request.Body.Read just before it gets io.EOF.
1755func (st *stream) copyTrailersToHandlerRequest() {
1756 for k, vv := range st.trailer {
1757 if _, ok := st.reqTrailer[k]; ok {
1758 // Only copy it over it was pre-declared.
1759 st.reqTrailer[k] = vv
1760 }
1761 }
1762}
1763
1764// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
1765// when the stream's WriteTimeout has fired.
1766func (st *stream) onWriteTimeout() {
1767 st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)})
1768}
1769
1770func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
1771 sc.serveG.check()
1772 id := f.StreamID
1773 if sc.inGoAway {
1774 // Ignore.
1775 return nil
1776 }
1777 // http://tools.ietf.org/html/rfc7540#section-5.1.1
1778 // Streams initiated by a client MUST use odd-numbered stream
1779 // identifiers. [...] An endpoint that receives an unexpected
1780 // stream identifier MUST respond with a connection error
1781 // (Section 5.4.1) of type PROTOCOL_ERROR.
1782 if id%2 != 1 {
1783 return ConnectionError(ErrCodeProtocol)
1784 }
1785 // A HEADERS frame can be used to create a new stream or
1786 // send a trailer for an open one. If we already have a stream
1787 // open, let it process its own HEADERS frame (trailers at this
1788 // point, if it's valid).
1789 if st := sc.streams[f.StreamID]; st != nil {
1790 if st.resetQueued {
1791 // We're sending RST_STREAM to close the stream, so don't bother
1792 // processing this frame.
1793 return nil
1794 }
1795 // RFC 7540, sec 5.1: If an endpoint receives additional frames, other than
1796 // WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in
1797 // this state, it MUST respond with a stream error (Section 5.4.2) of
1798 // type STREAM_CLOSED.
1799 if st.state == stateHalfClosedRemote {
1800 return streamError(id, ErrCodeStreamClosed)
1801 }
1802 return st.processTrailerHeaders(f)
1803 }
1804
1805 // [...] The identifier of a newly established stream MUST be
1806 // numerically greater than all streams that the initiating
1807 // endpoint has opened or reserved. [...] An endpoint that
1808 // receives an unexpected stream identifier MUST respond with
1809 // a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1810 if id <= sc.maxClientStreamID {
1811 return ConnectionError(ErrCodeProtocol)
1812 }
1813 sc.maxClientStreamID = id
1814
1815 if sc.idleTimer != nil {
1816 sc.idleTimer.Stop()
1817 }
1818
1819 // http://tools.ietf.org/html/rfc7540#section-5.1.2
1820 // [...] Endpoints MUST NOT exceed the limit set by their peer. An
1821 // endpoint that receives a HEADERS frame that causes their
1822 // advertised concurrent stream limit to be exceeded MUST treat
1823 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
1824 // or REFUSED_STREAM.
1825 if sc.curClientStreams+1 > sc.advMaxStreams {
1826 if sc.unackedSettings == 0 {
1827 // They should know better.
1828 return streamError(id, ErrCodeProtocol)
1829 }
1830 // Assume it's a network race, where they just haven't
1831 // received our last SETTINGS update. But actually
1832 // this can't happen yet, because we don't yet provide
1833 // a way for users to adjust server parameters at
1834 // runtime.
1835 return streamError(id, ErrCodeRefusedStream)
1836 }
1837
1838 initialState := stateOpen
1839 if f.StreamEnded() {
1840 initialState = stateHalfClosedRemote
1841 }
1842 st := sc.newStream(id, 0, initialState)
1843
1844 if f.HasPriority() {
1845 if err := checkPriority(f.StreamID, f.Priority); err != nil {
1846 return err
1847 }
1848 sc.writeSched.AdjustStream(st.id, f.Priority)
1849 }
1850
1851 rw, req, err := sc.newWriterAndRequest(st, f)
1852 if err != nil {
1853 return err
1854 }
1855 st.reqTrailer = req.Trailer
1856 if st.reqTrailer != nil {
1857 st.trailer = make(http.Header)
1858 }
1859 st.body = req.Body.(*requestBody).pipe // may be nil
1860 st.declBodyBytes = req.ContentLength
1861
1862 handler := sc.handler.ServeHTTP
1863 if f.Truncated {
1864 // Their header list was too long. Send a 431 error.
1865 handler = handleHeaderListTooLong
1866 } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
1867 handler = new400Handler(err)
1868 }
1869
1870 // The net/http package sets the read deadline from the
1871 // http.Server.ReadTimeout during the TLS handshake, but then
1872 // passes the connection off to us with the deadline already
1873 // set. Disarm it here after the request headers are read,
1874 // similar to how the http1 server works. Here it's
1875 // technically more like the http1 Server's ReadHeaderTimeout
1876 // (in Go 1.8), though. That's a more sane option anyway.
1877 if sc.hs.ReadTimeout != 0 {
1878 sc.conn.SetReadDeadline(time.Time{})
1879 }
1880
1881 go sc.runHandler(rw, req, handler)
1882 return nil
1883}
1884
1885func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
1886 sc := st.sc
1887 sc.serveG.check()
1888 if st.gotTrailerHeader {
1889 return ConnectionError(ErrCodeProtocol)
1890 }
1891 st.gotTrailerHeader = true
1892 if !f.StreamEnded() {
1893 return streamError(st.id, ErrCodeProtocol)
1894 }
1895
1896 if len(f.PseudoFields()) > 0 {
1897 return streamError(st.id, ErrCodeProtocol)
1898 }
1899 if st.trailer != nil {
1900 for _, hf := range f.RegularFields() {
1901 key := sc.canonicalHeader(hf.Name)
1902 if !httpguts.ValidTrailerHeader(key) {
1903 // TODO: send more details to the peer somehow. But http2 has
1904 // no way to send debug data at a stream level. Discuss with
1905 // HTTP folk.
1906 return streamError(st.id, ErrCodeProtocol)
1907 }
1908 st.trailer[key] = append(st.trailer[key], hf.Value)
1909 }
1910 }
1911 st.endStream()
1912 return nil
1913}
1914
1915func checkPriority(streamID uint32, p PriorityParam) error {
1916 if streamID == p.StreamDep {
1917 // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
1918 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
1919 // Section 5.3.3 says that a stream can depend on one of its dependencies,
1920 // so it's only self-dependencies that are forbidden.
1921 return streamError(streamID, ErrCodeProtocol)
1922 }
1923 return nil
1924}
1925
1926func (sc *serverConn) processPriority(f *PriorityFrame) error {
1927 if sc.inGoAway {
1928 return nil
1929 }
1930 if err := checkPriority(f.StreamID, f.PriorityParam); err != nil {
1931 return err
1932 }
1933 sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
1934 return nil
1935}
1936
1937func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
1938 sc.serveG.check()
1939 if id == 0 {
1940 panic("internal error: cannot create stream with id 0")
1941 }
1942
1943 ctx, cancelCtx := context.WithCancel(sc.baseCtx)
1944 st := &stream{
1945 sc: sc,
1946 id: id,
1947 state: state,
1948 ctx: ctx,
1949 cancelCtx: cancelCtx,
1950 }
1951 st.cw.Init()
1952 st.flow.conn = &sc.flow // link to conn-level counter
1953 st.flow.add(sc.initialStreamSendWindowSize)
1954 st.inflow.conn = &sc.inflow // link to conn-level counter
1955 st.inflow.add(sc.srv.initialStreamRecvWindowSize())
1956 if sc.hs.WriteTimeout != 0 {
1957 st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
1958 }
1959
1960 sc.streams[id] = st
1961 sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
1962 if st.isPushed() {
1963 sc.curPushedStreams++
1964 } else {
1965 sc.curClientStreams++
1966 }
1967 if sc.curOpenStreams() == 1 {
1968 sc.setConnState(http.StateActive)
1969 }
1970
1971 return st
1972}
1973
1974func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
1975 sc.serveG.check()
1976
1977 rp := requestParam{
1978 method: f.PseudoValue("method"),
1979 scheme: f.PseudoValue("scheme"),
1980 authority: f.PseudoValue("authority"),
1981 path: f.PseudoValue("path"),
1982 }
1983
1984 isConnect := rp.method == "CONNECT"
1985 if isConnect {
1986 if rp.path != "" || rp.scheme != "" || rp.authority == "" {
1987 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
1988 }
1989 } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
1990 // See 8.1.2.6 Malformed Requests and Responses:
1991 //
1992 // Malformed requests or responses that are detected
1993 // MUST be treated as a stream error (Section 5.4.2)
1994 // of type PROTOCOL_ERROR."
1995 //
1996 // 8.1.2.3 Request Pseudo-Header Fields
1997 // "All HTTP/2 requests MUST include exactly one valid
1998 // value for the :method, :scheme, and :path
1999 // pseudo-header fields"
2000 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
2001 }
2002
2003 bodyOpen := !f.StreamEnded()
2004 if rp.method == "HEAD" && bodyOpen {
2005 // HEAD requests can't have bodies
2006 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
2007 }
2008
2009 rp.header = make(http.Header)
2010 for _, hf := range f.RegularFields() {
2011 rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value)
2012 }
2013 if rp.authority == "" {
2014 rp.authority = rp.header.Get("Host")
2015 }
2016
2017 rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
2018 if err != nil {
2019 return nil, nil, err
2020 }
2021 if bodyOpen {
2022 if vv, ok := rp.header["Content-Length"]; ok {
2023 req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
2024 } else {
2025 req.ContentLength = -1
2026 }
2027 req.Body.(*requestBody).pipe = &pipe{
2028 b: &dataBuffer{expected: req.ContentLength},
2029 }
2030 }
2031 return rw, req, nil
2032}
2033
2034type requestParam struct {
2035 method string
2036 scheme, authority, path string
2037 header http.Header
2038}
2039
2040func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) {
2041 sc.serveG.check()
2042
2043 var tlsState *tls.ConnectionState // nil if not scheme https
2044 if rp.scheme == "https" {
2045 tlsState = sc.tlsState
2046 }
2047
2048 needsContinue := rp.header.Get("Expect") == "100-continue"
2049 if needsContinue {
2050 rp.header.Del("Expect")
2051 }
2052 // Merge Cookie headers into one "; "-delimited value.
2053 if cookies := rp.header["Cookie"]; len(cookies) > 1 {
2054 rp.header.Set("Cookie", strings.Join(cookies, "; "))
2055 }
2056
2057 // Setup Trailers
2058 var trailer http.Header
2059 for _, v := range rp.header["Trailer"] {
2060 for _, key := range strings.Split(v, ",") {
2061 key = http.CanonicalHeaderKey(textproto.TrimString(key))
2062 switch key {
2063 case "Transfer-Encoding", "Trailer", "Content-Length":
2064 // Bogus. (copy of http1 rules)
2065 // Ignore.
2066 default:
2067 if trailer == nil {
2068 trailer = make(http.Header)
2069 }
2070 trailer[key] = nil
2071 }
2072 }
2073 }
2074 delete(rp.header, "Trailer")
2075
2076 var url_ *url.URL
2077 var requestURI string
2078 if rp.method == "CONNECT" {
2079 url_ = &url.URL{Host: rp.authority}
2080 requestURI = rp.authority // mimic HTTP/1 server behavior
2081 } else {
2082 var err error
2083 url_, err = url.ParseRequestURI(rp.path)
2084 if err != nil {
2085 return nil, nil, streamError(st.id, ErrCodeProtocol)
2086 }
2087 requestURI = rp.path
2088 }
2089
2090 body := &requestBody{
2091 conn: sc,
2092 stream: st,
2093 needsContinue: needsContinue,
2094 }
2095 req := &http.Request{
2096 Method: rp.method,
2097 URL: url_,
2098 RemoteAddr: sc.remoteAddrStr,
2099 Header: rp.header,
2100 RequestURI: requestURI,
2101 Proto: "HTTP/2.0",
2102 ProtoMajor: 2,
2103 ProtoMinor: 0,
2104 TLS: tlsState,
2105 Host: rp.authority,
2106 Body: body,
2107 Trailer: trailer,
2108 }
2109 req = req.WithContext(st.ctx)
2110
2111 rws := responseWriterStatePool.Get().(*responseWriterState)
2112 bwSave := rws.bw
2113 *rws = responseWriterState{} // zero all the fields
2114 rws.conn = sc
2115 rws.bw = bwSave
2116 rws.bw.Reset(chunkWriter{rws})
2117 rws.stream = st
2118 rws.req = req
2119 rws.body = body
2120
2121 rw := &responseWriter{rws: rws}
2122 return rw, req, nil
2123}
2124
2125// Run on its own goroutine.
2126func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
2127 didPanic := true
2128 defer func() {
2129 rw.rws.stream.cancelCtx()
2130 if didPanic {
2131 e := recover()
2132 sc.writeFrameFromHandler(FrameWriteRequest{
2133 write: handlerPanicRST{rw.rws.stream.id},
2134 stream: rw.rws.stream,
2135 })
2136 // Same as net/http:
2137 if e != nil && e != http.ErrAbortHandler {
2138 const size = 64 << 10
2139 buf := make([]byte, size)
2140 buf = buf[:runtime.Stack(buf, false)]
2141 sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
2142 }
2143 return
2144 }
2145 rw.handlerDone()
2146 }()
2147 handler(rw, req)
2148 didPanic = false
2149}
2150
2151func handleHeaderListTooLong(w http.ResponseWriter, r *http.Request) {
2152 // 10.5.1 Limits on Header Block Size:
2153 // .. "A server that receives a larger header block than it is
2154 // willing to handle can send an HTTP 431 (Request Header Fields Too
2155 // Large) status code"
2156 const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+
2157 w.WriteHeader(statusRequestHeaderFieldsTooLarge)
2158 io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>")
2159}
2160
2161// called from handler goroutines.
2162// h may be nil.
2163func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error {
2164 sc.serveG.checkNotOn() // NOT on
2165 var errc chan error
2166 if headerData.h != nil {
2167 // If there's a header map (which we don't own), so we have to block on
2168 // waiting for this frame to be written, so an http.Flush mid-handler
2169 // writes out the correct value of keys, before a handler later potentially
2170 // mutates it.
2171 errc = errChanPool.Get().(chan error)
2172 }
2173 if err := sc.writeFrameFromHandler(FrameWriteRequest{
2174 write: headerData,
2175 stream: st,
2176 done: errc,
2177 }); err != nil {
2178 return err
2179 }
2180 if errc != nil {
2181 select {
2182 case err := <-errc:
2183 errChanPool.Put(errc)
2184 return err
2185 case <-sc.doneServing:
2186 return errClientDisconnected
2187 case <-st.cw:
2188 return errStreamClosed
2189 }
2190 }
2191 return nil
2192}
2193
2194// called from handler goroutines.
2195func (sc *serverConn) write100ContinueHeaders(st *stream) {
2196 sc.writeFrameFromHandler(FrameWriteRequest{
2197 write: write100ContinueHeadersFrame{st.id},
2198 stream: st,
2199 })
2200}
2201
2202// A bodyReadMsg tells the server loop that the http.Handler read n
2203// bytes of the DATA from the client on the given stream.
2204type bodyReadMsg struct {
2205 st *stream
2206 n int
2207}
2208
2209// called from handler goroutines.
2210// Notes that the handler for the given stream ID read n bytes of its body
2211// and schedules flow control tokens to be sent.
2212func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
2213 sc.serveG.checkNotOn() // NOT on
2214 if n > 0 {
2215 select {
2216 case sc.bodyReadCh <- bodyReadMsg{st, n}:
2217 case <-sc.doneServing:
2218 }
2219 }
2220}
2221
2222func (sc *serverConn) noteBodyRead(st *stream, n int) {
2223 sc.serveG.check()
2224 sc.sendWindowUpdate(nil, n) // conn-level
2225 if st.state != stateHalfClosedRemote && st.state != stateClosed {
2226 // Don't send this WINDOW_UPDATE if the stream is closed
2227 // remotely.
2228 sc.sendWindowUpdate(st, n)
2229 }
2230}
2231
2232// st may be nil for conn-level
2233func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
2234 sc.serveG.check()
2235 // "The legal range for the increment to the flow control
2236 // window is 1 to 2^31-1 (2,147,483,647) octets."
2237 // A Go Read call on 64-bit machines could in theory read
2238 // a larger Read than this. Very unlikely, but we handle it here
2239 // rather than elsewhere for now.
2240 const maxUint31 = 1<<31 - 1
2241 for n >= maxUint31 {
2242 sc.sendWindowUpdate32(st, maxUint31)
2243 n -= maxUint31
2244 }
2245 sc.sendWindowUpdate32(st, int32(n))
2246}
2247
2248// st may be nil for conn-level
2249func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
2250 sc.serveG.check()
2251 if n == 0 {
2252 return
2253 }
2254 if n < 0 {
2255 panic("negative update")
2256 }
2257 var streamID uint32
2258 if st != nil {
2259 streamID = st.id
2260 }
2261 sc.writeFrame(FrameWriteRequest{
2262 write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
2263 stream: st,
2264 })
2265 var ok bool
2266 if st == nil {
2267 ok = sc.inflow.add(n)
2268 } else {
2269 ok = st.inflow.add(n)
2270 }
2271 if !ok {
2272 panic("internal error; sent too many window updates without decrements?")
2273 }
2274}
2275
2276// requestBody is the Handler's Request.Body type.
2277// Read and Close may be called concurrently.
2278type requestBody struct {
2279 _ incomparable
2280 stream *stream
2281 conn *serverConn
2282 closed bool // for use by Close only
2283 sawEOF bool // for use by Read only
2284 pipe *pipe // non-nil if we have a HTTP entity message body
2285 needsContinue bool // need to send a 100-continue
2286}
2287
2288func (b *requestBody) Close() error {
2289 if b.pipe != nil && !b.closed {
2290 b.pipe.BreakWithError(errClosedBody)
2291 }
2292 b.closed = true
2293 return nil
2294}
2295
2296func (b *requestBody) Read(p []byte) (n int, err error) {
2297 if b.needsContinue {
2298 b.needsContinue = false
2299 b.conn.write100ContinueHeaders(b.stream)
2300 }
2301 if b.pipe == nil || b.sawEOF {
2302 return 0, io.EOF
2303 }
2304 n, err = b.pipe.Read(p)
2305 if err == io.EOF {
2306 b.sawEOF = true
2307 }
2308 if b.conn == nil && inTests {
2309 return
2310 }
2311 b.conn.noteBodyReadFromHandler(b.stream, n, err)
2312 return
2313}
2314
2315// responseWriter is the http.ResponseWriter implementation. It's
2316// intentionally small (1 pointer wide) to minimize garbage. The
2317// responseWriterState pointer inside is zeroed at the end of a
2318// request (in handlerDone) and calls on the responseWriter thereafter
2319// simply crash (caller's mistake), but the much larger responseWriterState
2320// and buffers are reused between multiple requests.
2321type responseWriter struct {
2322 rws *responseWriterState
2323}
2324
2325// Optional http.ResponseWriter interfaces implemented.
2326var (
2327 _ http.CloseNotifier = (*responseWriter)(nil)
2328 _ http.Flusher = (*responseWriter)(nil)
2329 _ stringWriter = (*responseWriter)(nil)
2330)
2331
2332type responseWriterState struct {
2333 // immutable within a request:
2334 stream *stream
2335 req *http.Request
2336 body *requestBody // to close at end of request, if DATA frames didn't
2337 conn *serverConn
2338
2339 // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
2340 bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState}
2341
2342 // mutated by http.Handler goroutine:
2343 handlerHeader http.Header // nil until called
2344 snapHeader http.Header // snapshot of handlerHeader at WriteHeader time
2345 trailers []string // set in writeChunk
2346 status int // status code passed to WriteHeader
2347 wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
2348 sentHeader bool // have we sent the header frame?
2349 handlerDone bool // handler has finished
2350 dirty bool // a Write failed; don't reuse this responseWriterState
2351
2352 sentContentLen int64 // non-zero if handler set a Content-Length header
2353 wroteBytes int64
2354
2355 closeNotifierMu sync.Mutex // guards closeNotifierCh
2356 closeNotifierCh chan bool // nil until first used
2357}
2358
2359type chunkWriter struct{ rws *responseWriterState }
2360
2361func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) }
2362
2363func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 }
2364
2365func (rws *responseWriterState) hasNonemptyTrailers() bool {
2366 for _, trailer := range rws.trailers {
2367 if _, ok := rws.handlerHeader[trailer]; ok {
2368 return true
2369 }
2370 }
2371 return false
2372}
2373
2374// declareTrailer is called for each Trailer header when the
2375// response header is written. It notes that a header will need to be
2376// written in the trailers at the end of the response.
2377func (rws *responseWriterState) declareTrailer(k string) {
2378 k = http.CanonicalHeaderKey(k)
2379 if !httpguts.ValidTrailerHeader(k) {
2380 // Forbidden by RFC 7230, section 4.1.2.
2381 rws.conn.logf("ignoring invalid trailer %q", k)
2382 return
2383 }
2384 if !strSliceContains(rws.trailers, k) {
2385 rws.trailers = append(rws.trailers, k)
2386 }
2387}
2388
2389// writeChunk writes chunks from the bufio.Writer. But because
2390// bufio.Writer may bypass its chunking, sometimes p may be
2391// arbitrarily large.
2392//
2393// writeChunk is also responsible (on the first chunk) for sending the
2394// HEADER response.
2395func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
2396 if !rws.wroteHeader {
2397 rws.writeHeader(200)
2398 }
2399
2400 isHeadResp := rws.req.Method == "HEAD"
2401 if !rws.sentHeader {
2402 rws.sentHeader = true
2403 var ctype, clen string
2404 if clen = rws.snapHeader.Get("Content-Length"); clen != "" {
2405 rws.snapHeader.Del("Content-Length")
2406 clen64, err := strconv.ParseInt(clen, 10, 64)
2407 if err == nil && clen64 >= 0 {
2408 rws.sentContentLen = clen64
2409 } else {
2410 clen = ""
2411 }
2412 }
2413 if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
2414 clen = strconv.Itoa(len(p))
2415 }
2416 _, hasContentType := rws.snapHeader["Content-Type"]
2417 // If the Content-Encoding is non-blank, we shouldn't
2418 // sniff the body. See Issue golang.org/issue/31753.
2419 ce := rws.snapHeader.Get("Content-Encoding")
2420 hasCE := len(ce) > 0
2421 if !hasCE && !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 {
2422 ctype = http.DetectContentType(p)
2423 }
2424 var date string
2425 if _, ok := rws.snapHeader["Date"]; !ok {
2426 // TODO(bradfitz): be faster here, like net/http? measure.
2427 date = time.Now().UTC().Format(http.TimeFormat)
2428 }
2429
2430 for _, v := range rws.snapHeader["Trailer"] {
2431 foreachHeaderElement(v, rws.declareTrailer)
2432 }
2433
2434 // "Connection" headers aren't allowed in HTTP/2 (RFC 7540, 8.1.2.2),
2435 // but respect "Connection" == "close" to mean sending a GOAWAY and tearing
2436 // down the TCP connection when idle, like we do for HTTP/1.
2437 // TODO: remove more Connection-specific header fields here, in addition
2438 // to "Connection".
2439 if _, ok := rws.snapHeader["Connection"]; ok {
2440 v := rws.snapHeader.Get("Connection")
2441 delete(rws.snapHeader, "Connection")
2442 if v == "close" {
2443 rws.conn.startGracefulShutdown()
2444 }
2445 }
2446
2447 endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp
2448 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2449 streamID: rws.stream.id,
2450 httpResCode: rws.status,
2451 h: rws.snapHeader,
2452 endStream: endStream,
2453 contentType: ctype,
2454 contentLength: clen,
2455 date: date,
2456 })
2457 if err != nil {
2458 rws.dirty = true
2459 return 0, err
2460 }
2461 if endStream {
2462 return 0, nil
2463 }
2464 }
2465 if isHeadResp {
2466 return len(p), nil
2467 }
2468 if len(p) == 0 && !rws.handlerDone {
2469 return 0, nil
2470 }
2471
2472 if rws.handlerDone {
2473 rws.promoteUndeclaredTrailers()
2474 }
2475
2476 // only send trailers if they have actually been defined by the
2477 // server handler.
2478 hasNonemptyTrailers := rws.hasNonemptyTrailers()
2479 endStream := rws.handlerDone && !hasNonemptyTrailers
2480 if len(p) > 0 || endStream {
2481 // only send a 0 byte DATA frame if we're ending the stream.
2482 if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
2483 rws.dirty = true
2484 return 0, err
2485 }
2486 }
2487
2488 if rws.handlerDone && hasNonemptyTrailers {
2489 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2490 streamID: rws.stream.id,
2491 h: rws.handlerHeader,
2492 trailers: rws.trailers,
2493 endStream: true,
2494 })
2495 if err != nil {
2496 rws.dirty = true
2497 }
2498 return len(p), err
2499 }
2500 return len(p), nil
2501}
2502
2503// TrailerPrefix is a magic prefix for ResponseWriter.Header map keys
2504// that, if present, signals that the map entry is actually for
2505// the response trailers, and not the response headers. The prefix
2506// is stripped after the ServeHTTP call finishes and the values are
2507// sent in the trailers.
2508//
2509// This mechanism is intended only for trailers that are not known
2510// prior to the headers being written. If the set of trailers is fixed
2511// or known before the header is written, the normal Go trailers mechanism
2512// is preferred:
2513// https://golang.org/pkg/net/http/#ResponseWriter
2514// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
2515const TrailerPrefix = "Trailer:"
2516
2517// promoteUndeclaredTrailers permits http.Handlers to set trailers
2518// after the header has already been flushed. Because the Go
2519// ResponseWriter interface has no way to set Trailers (only the
2520// Header), and because we didn't want to expand the ResponseWriter
2521// interface, and because nobody used trailers, and because RFC 7230
2522// says you SHOULD (but not must) predeclare any trailers in the
2523// header, the official ResponseWriter rules said trailers in Go must
2524// be predeclared, and then we reuse the same ResponseWriter.Header()
2525// map to mean both Headers and Trailers. When it's time to write the
2526// Trailers, we pick out the fields of Headers that were declared as
2527// trailers. That worked for a while, until we found the first major
2528// user of Trailers in the wild: gRPC (using them only over http2),
2529// and gRPC libraries permit setting trailers mid-stream without
2530// predeclaring them. So: change of plans. We still permit the old
2531// way, but we also permit this hack: if a Header() key begins with
2532// "Trailer:", the suffix of that key is a Trailer. Because ':' is an
2533// invalid token byte anyway, there is no ambiguity. (And it's already
2534// filtered out) It's mildly hacky, but not terrible.
2535//
2536// This method runs after the Handler is done and promotes any Header
2537// fields to be trailers.
2538func (rws *responseWriterState) promoteUndeclaredTrailers() {
2539 for k, vv := range rws.handlerHeader {
2540 if !strings.HasPrefix(k, TrailerPrefix) {
2541 continue
2542 }
2543 trailerKey := strings.TrimPrefix(k, TrailerPrefix)
2544 rws.declareTrailer(trailerKey)
2545 rws.handlerHeader[http.CanonicalHeaderKey(trailerKey)] = vv
2546 }
2547
2548 if len(rws.trailers) > 1 {
2549 sorter := sorterPool.Get().(*sorter)
2550 sorter.SortStrings(rws.trailers)
2551 sorterPool.Put(sorter)
2552 }
2553}
2554
2555func (w *responseWriter) Flush() {
2556 rws := w.rws
2557 if rws == nil {
2558 panic("Header called after Handler finished")
2559 }
2560 if rws.bw.Buffered() > 0 {
2561 if err := rws.bw.Flush(); err != nil {
2562 // Ignore the error. The frame writer already knows.
2563 return
2564 }
2565 } else {
2566 // The bufio.Writer won't call chunkWriter.Write
2567 // (writeChunk with zero bytes, so we have to do it
2568 // ourselves to force the HTTP response header and/or
2569 // final DATA frame (with END_STREAM) to be sent.
2570 rws.writeChunk(nil)
2571 }
2572}
2573
2574func (w *responseWriter) CloseNotify() <-chan bool {
2575 rws := w.rws
2576 if rws == nil {
2577 panic("CloseNotify called after Handler finished")
2578 }
2579 rws.closeNotifierMu.Lock()
2580 ch := rws.closeNotifierCh
2581 if ch == nil {
2582 ch = make(chan bool, 1)
2583 rws.closeNotifierCh = ch
2584 cw := rws.stream.cw
2585 go func() {
2586 cw.Wait() // wait for close
2587 ch <- true
2588 }()
2589 }
2590 rws.closeNotifierMu.Unlock()
2591 return ch
2592}
2593
2594func (w *responseWriter) Header() http.Header {
2595 rws := w.rws
2596 if rws == nil {
2597 panic("Header called after Handler finished")
2598 }
2599 if rws.handlerHeader == nil {
2600 rws.handlerHeader = make(http.Header)
2601 }
2602 return rws.handlerHeader
2603}
2604
2605// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode.
2606func checkWriteHeaderCode(code int) {
2607 // Issue 22880: require valid WriteHeader status codes.
2608 // For now we only enforce that it's three digits.
2609 // In the future we might block things over 599 (600 and above aren't defined
2610 // at http://httpwg.org/specs/rfc7231.html#status.codes)
2611 // and we might block under 200 (once we have more mature 1xx support).
2612 // But for now any three digits.
2613 //
2614 // We used to send "HTTP/1.1 000 0" on the wire in responses but there's
2615 // no equivalent bogus thing we can realistically send in HTTP/2,
2616 // so we'll consistently panic instead and help people find their bugs
2617 // early. (We can't return an error from WriteHeader even if we wanted to.)
2618 if code < 100 || code > 999 {
2619 panic(fmt.Sprintf("invalid WriteHeader code %v", code))
2620 }
2621}
2622
2623func (w *responseWriter) WriteHeader(code int) {
2624 rws := w.rws
2625 if rws == nil {
2626 panic("WriteHeader called after Handler finished")
2627 }
2628 rws.writeHeader(code)
2629}
2630
2631func (rws *responseWriterState) writeHeader(code int) {
2632 if !rws.wroteHeader {
2633 checkWriteHeaderCode(code)
2634 rws.wroteHeader = true
2635 rws.status = code
2636 if len(rws.handlerHeader) > 0 {
2637 rws.snapHeader = cloneHeader(rws.handlerHeader)
2638 }
2639 }
2640}
2641
2642func cloneHeader(h http.Header) http.Header {
2643 h2 := make(http.Header, len(h))
2644 for k, vv := range h {
2645 vv2 := make([]string, len(vv))
2646 copy(vv2, vv)
2647 h2[k] = vv2
2648 }
2649 return h2
2650}
2651
2652// The Life Of A Write is like this:
2653//
2654// * Handler calls w.Write or w.WriteString ->
2655// * -> rws.bw (*bufio.Writer) ->
2656// * (Handler might call Flush)
2657// * -> chunkWriter{rws}
2658// * -> responseWriterState.writeChunk(p []byte)
2659// * -> responseWriterState.writeChunk (most of the magic; see comment there)
2660func (w *responseWriter) Write(p []byte) (n int, err error) {
2661 return w.write(len(p), p, "")
2662}
2663
2664func (w *responseWriter) WriteString(s string) (n int, err error) {
2665 return w.write(len(s), nil, s)
2666}
2667
2668// either dataB or dataS is non-zero.
2669func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) {
2670 rws := w.rws
2671 if rws == nil {
2672 panic("Write called after Handler finished")
2673 }
2674 if !rws.wroteHeader {
2675 w.WriteHeader(200)
2676 }
2677 if !bodyAllowedForStatus(rws.status) {
2678 return 0, http.ErrBodyNotAllowed
2679 }
2680 rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set
2681 if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen {
2682 // TODO: send a RST_STREAM
2683 return 0, errors.New("http2: handler wrote more than declared Content-Length")
2684 }
2685
2686 if dataB != nil {
2687 return rws.bw.Write(dataB)
2688 } else {
2689 return rws.bw.WriteString(dataS)
2690 }
2691}
2692
2693func (w *responseWriter) handlerDone() {
2694 rws := w.rws
2695 dirty := rws.dirty
2696 rws.handlerDone = true
2697 w.Flush()
2698 w.rws = nil
2699 if !dirty {
2700 // Only recycle the pool if all prior Write calls to
2701 // the serverConn goroutine completed successfully. If
2702 // they returned earlier due to resets from the peer
2703 // there might still be write goroutines outstanding
2704 // from the serverConn referencing the rws memory. See
2705 // issue 20704.
2706 responseWriterStatePool.Put(rws)
2707 }
2708}
2709
2710// Push errors.
2711var (
2712 ErrRecursivePush = errors.New("http2: recursive push not allowed")
2713 ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
2714)
2715
2716var _ http.Pusher = (*responseWriter)(nil)
2717
2718func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
2719 st := w.rws.stream
2720 sc := st.sc
2721 sc.serveG.checkNotOn()
2722
2723 // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
2724 // http://tools.ietf.org/html/rfc7540#section-6.6
2725 if st.isPushed() {
2726 return ErrRecursivePush
2727 }
2728
2729 if opts == nil {
2730 opts = new(http.PushOptions)
2731 }
2732
2733 // Default options.
2734 if opts.Method == "" {
2735 opts.Method = "GET"
2736 }
2737 if opts.Header == nil {
2738 opts.Header = http.Header{}
2739 }
2740 wantScheme := "http"
2741 if w.rws.req.TLS != nil {
2742 wantScheme = "https"
2743 }
2744
2745 // Validate the request.
2746 u, err := url.Parse(target)
2747 if err != nil {
2748 return err
2749 }
2750 if u.Scheme == "" {
2751 if !strings.HasPrefix(target, "/") {
2752 return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
2753 }
2754 u.Scheme = wantScheme
2755 u.Host = w.rws.req.Host
2756 } else {
2757 if u.Scheme != wantScheme {
2758 return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
2759 }
2760 if u.Host == "" {
2761 return errors.New("URL must have a host")
2762 }
2763 }
2764 for k := range opts.Header {
2765 if strings.HasPrefix(k, ":") {
2766 return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
2767 }
2768 // These headers are meaningful only if the request has a body,
2769 // but PUSH_PROMISE requests cannot have a body.
2770 // http://tools.ietf.org/html/rfc7540#section-8.2
2771 // Also disallow Host, since the promised URL must be absolute.
2772 switch strings.ToLower(k) {
2773 case "content-length", "content-encoding", "trailer", "te", "expect", "host":
2774 return fmt.Errorf("promised request headers cannot include %q", k)
2775 }
2776 }
2777 if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil {
2778 return err
2779 }
2780
2781 // The RFC effectively limits promised requests to GET and HEAD:
2782 // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
2783 // http://tools.ietf.org/html/rfc7540#section-8.2
2784 if opts.Method != "GET" && opts.Method != "HEAD" {
2785 return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
2786 }
2787
2788 msg := &startPushRequest{
2789 parent: st,
2790 method: opts.Method,
2791 url: u,
2792 header: cloneHeader(opts.Header),
2793 done: errChanPool.Get().(chan error),
2794 }
2795
2796 select {
2797 case <-sc.doneServing:
2798 return errClientDisconnected
2799 case <-st.cw:
2800 return errStreamClosed
2801 case sc.serveMsgCh <- msg:
2802 }
2803
2804 select {
2805 case <-sc.doneServing:
2806 return errClientDisconnected
2807 case <-st.cw:
2808 return errStreamClosed
2809 case err := <-msg.done:
2810 errChanPool.Put(msg.done)
2811 return err
2812 }
2813}
2814
2815type startPushRequest struct {
2816 parent *stream
2817 method string
2818 url *url.URL
2819 header http.Header
2820 done chan error
2821}
2822
2823func (sc *serverConn) startPush(msg *startPushRequest) {
2824 sc.serveG.check()
2825
2826 // http://tools.ietf.org/html/rfc7540#section-6.6.
2827 // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
2828 // is in either the "open" or "half-closed (remote)" state.
2829 if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
2830 // responseWriter.Push checks that the stream is peer-initiated.
2831 msg.done <- errStreamClosed
2832 return
2833 }
2834
2835 // http://tools.ietf.org/html/rfc7540#section-6.6.
2836 if !sc.pushEnabled {
2837 msg.done <- http.ErrNotSupported
2838 return
2839 }
2840
2841 // PUSH_PROMISE frames must be sent in increasing order by stream ID, so
2842 // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
2843 // is written. Once the ID is allocated, we start the request handler.
2844 allocatePromisedID := func() (uint32, error) {
2845 sc.serveG.check()
2846
2847 // Check this again, just in case. Technically, we might have received
2848 // an updated SETTINGS by the time we got around to writing this frame.
2849 if !sc.pushEnabled {
2850 return 0, http.ErrNotSupported
2851 }
2852 // http://tools.ietf.org/html/rfc7540#section-6.5.2.
2853 if sc.curPushedStreams+1 > sc.clientMaxStreams {
2854 return 0, ErrPushLimitReached
2855 }
2856
2857 // http://tools.ietf.org/html/rfc7540#section-5.1.1.
2858 // Streams initiated by the server MUST use even-numbered identifiers.
2859 // A server that is unable to establish a new stream identifier can send a GOAWAY
2860 // frame so that the client is forced to open a new connection for new streams.
2861 if sc.maxPushPromiseID+2 >= 1<<31 {
2862 sc.startGracefulShutdownInternal()
2863 return 0, ErrPushLimitReached
2864 }
2865 sc.maxPushPromiseID += 2
2866 promisedID := sc.maxPushPromiseID
2867
2868 // http://tools.ietf.org/html/rfc7540#section-8.2.
2869 // Strictly speaking, the new stream should start in "reserved (local)", then
2870 // transition to "half closed (remote)" after sending the initial HEADERS, but
2871 // we start in "half closed (remote)" for simplicity.
2872 // See further comments at the definition of stateHalfClosedRemote.
2873 promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote)
2874 rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{
2875 method: msg.method,
2876 scheme: msg.url.Scheme,
2877 authority: msg.url.Host,
2878 path: msg.url.RequestURI(),
2879 header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
2880 })
2881 if err != nil {
2882 // Should not happen, since we've already validated msg.url.
2883 panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
2884 }
2885
2886 go sc.runHandler(rw, req, sc.handler.ServeHTTP)
2887 return promisedID, nil
2888 }
2889
2890 sc.writeFrame(FrameWriteRequest{
2891 write: &writePushPromise{
2892 streamID: msg.parent.id,
2893 method: msg.method,
2894 url: msg.url,
2895 h: msg.header,
2896 allocatePromisedID: allocatePromisedID,
2897 },
2898 stream: msg.parent,
2899 done: msg.done,
2900 })
2901}
2902
2903// foreachHeaderElement splits v according to the "#rule" construction
2904// in RFC 7230 section 7 and calls fn for each non-empty element.
2905func foreachHeaderElement(v string, fn func(string)) {
2906 v = textproto.TrimString(v)
2907 if v == "" {
2908 return
2909 }
2910 if !strings.Contains(v, ",") {
2911 fn(v)
2912 return
2913 }
2914 for _, f := range strings.Split(v, ",") {
2915 if f = textproto.TrimString(f); f != "" {
2916 fn(f)
2917 }
2918 }
2919}
2920
2921// From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2
2922var connHeaders = []string{
2923 "Connection",
2924 "Keep-Alive",
2925 "Proxy-Connection",
2926 "Transfer-Encoding",
2927 "Upgrade",
2928}
2929
2930// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request,
2931// per RFC 7540 Section 8.1.2.2.
2932// The returned error is reported to users.
2933func checkValidHTTP2RequestHeaders(h http.Header) error {
2934 for _, k := range connHeaders {
2935 if _, ok := h[k]; ok {
2936 return fmt.Errorf("request header %q is not valid in HTTP/2", k)
2937 }
2938 }
2939 te := h["Te"]
2940 if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) {
2941 return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
2942 }
2943 return nil
2944}
2945
2946func new400Handler(err error) http.HandlerFunc {
2947 return func(w http.ResponseWriter, r *http.Request) {
2948 http.Error(w, err.Error(), http.StatusBadRequest)
2949 }
2950}
2951
2952// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
2953// disabled. See comments on h1ServerShutdownChan above for why
2954// the code is written this way.
2955func h1ServerKeepAlivesDisabled(hs *http.Server) bool {
2956 var x interface{} = hs
2957 type I interface {
2958 doKeepAlives() bool
2959 }
2960 if hs, ok := x.(I); ok {
2961 return !hs.doKeepAlives()
2962 }
2963 return false
2964}