blob: 6f8e8b0d98b18e6ccb708410326524570198a081 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// TODO: turn off the serve goroutine when idle, so
6// an idle conn only has the readFrames goroutine active. (which could
7// also be optimized probably to pin less memory in crypto/tls). This
8// would involve tracking when the serve goroutine is active (atomic
9// int32 read/CAS probably?) and starting it up when frames arrive,
10// and shutting it down when all handlers exit. the occasional PING
11// packets could use time.AfterFunc to call sc.wakeStartServeLoop()
12// (which is a no-op if already running) and then queue the PING write
13// as normal. The serve loop would then exit in most cases (if no
14// Handlers running) and not be woken up again until the PING packet
15// returns.
16
17// TODO (maybe): add a mechanism for Handlers to going into
18// half-closed-local mode (rw.(io.Closer) test?) but not exit their
19// handler, and continue to be able to read from the
20// Request.Body. This would be a somewhat semantic change from HTTP/1
21// (or at least what we expose in net/http), so I'd probably want to
22// add it there too. For now, this package says that returning from
23// the Handler ServeHTTP function means you're both done reading and
24// done writing, without a way to stop just one or the other.
25
26package http2
27
28import (
29 "bufio"
30 "bytes"
31 "context"
32 "crypto/tls"
33 "errors"
34 "fmt"
35 "io"
36 "log"
37 "math"
38 "net"
39 "net/http"
40 "net/textproto"
41 "net/url"
42 "os"
43 "reflect"
44 "runtime"
45 "strconv"
46 "strings"
47 "sync"
48 "time"
49
50 "golang.org/x/net/http/httpguts"
51 "golang.org/x/net/http2/hpack"
52)
53
54const (
Scott Baker4a35a702019-11-26 08:17:33 -080055 prefaceTimeout = 10 * time.Second
56 firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
57 handlerChunkWriteSize = 4 << 10
58 defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
59 maxQueuedControlFrames = 10000
Zack Williamse940c7a2019-08-21 14:25:39 -070060)
61
62var (
63 errClientDisconnected = errors.New("client disconnected")
64 errClosedBody = errors.New("body closed by handler")
65 errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
66 errStreamClosed = errors.New("http2: stream closed")
67)
68
69var responseWriterStatePool = sync.Pool{
70 New: func() interface{} {
71 rws := &responseWriterState{}
72 rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
73 return rws
74 },
75}
76
77// Test hooks.
78var (
79 testHookOnConn func()
80 testHookGetServerConn func(*serverConn)
81 testHookOnPanicMu *sync.Mutex // nil except in tests
82 testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool)
83)
84
85// Server is an HTTP/2 server.
86type Server struct {
87 // MaxHandlers limits the number of http.Handler ServeHTTP goroutines
88 // which may run at a time over all connections.
89 // Negative or zero no limit.
90 // TODO: implement
91 MaxHandlers int
92
93 // MaxConcurrentStreams optionally specifies the number of
94 // concurrent streams that each client may have open at a
95 // time. This is unrelated to the number of http.Handler goroutines
96 // which may be active globally, which is MaxHandlers.
97 // If zero, MaxConcurrentStreams defaults to at least 100, per
98 // the HTTP/2 spec's recommendations.
99 MaxConcurrentStreams uint32
100
101 // MaxReadFrameSize optionally specifies the largest frame
102 // this server is willing to read. A valid value is between
103 // 16k and 16M, inclusive. If zero or otherwise invalid, a
104 // default value is used.
105 MaxReadFrameSize uint32
106
107 // PermitProhibitedCipherSuites, if true, permits the use of
108 // cipher suites prohibited by the HTTP/2 spec.
109 PermitProhibitedCipherSuites bool
110
111 // IdleTimeout specifies how long until idle clients should be
112 // closed with a GOAWAY frame. PING frames are not considered
113 // activity for the purposes of IdleTimeout.
114 IdleTimeout time.Duration
115
116 // MaxUploadBufferPerConnection is the size of the initial flow
117 // control window for each connections. The HTTP/2 spec does not
118 // allow this to be smaller than 65535 or larger than 2^32-1.
119 // If the value is outside this range, a default value will be
120 // used instead.
121 MaxUploadBufferPerConnection int32
122
123 // MaxUploadBufferPerStream is the size of the initial flow control
124 // window for each stream. The HTTP/2 spec does not allow this to
125 // be larger than 2^32-1. If the value is zero or larger than the
126 // maximum, a default value will be used instead.
127 MaxUploadBufferPerStream int32
128
129 // NewWriteScheduler constructs a write scheduler for a connection.
130 // If nil, a default scheduler is chosen.
131 NewWriteScheduler func() WriteScheduler
132
133 // Internal state. This is a pointer (rather than embedded directly)
134 // so that we don't embed a Mutex in this struct, which will make the
135 // struct non-copyable, which might break some callers.
136 state *serverInternalState
137}
138
139func (s *Server) initialConnRecvWindowSize() int32 {
140 if s.MaxUploadBufferPerConnection > initialWindowSize {
141 return s.MaxUploadBufferPerConnection
142 }
143 return 1 << 20
144}
145
146func (s *Server) initialStreamRecvWindowSize() int32 {
147 if s.MaxUploadBufferPerStream > 0 {
148 return s.MaxUploadBufferPerStream
149 }
150 return 1 << 20
151}
152
153func (s *Server) maxReadFrameSize() uint32 {
154 if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
155 return v
156 }
157 return defaultMaxReadFrameSize
158}
159
160func (s *Server) maxConcurrentStreams() uint32 {
161 if v := s.MaxConcurrentStreams; v > 0 {
162 return v
163 }
164 return defaultMaxStreams
165}
166
Scott Baker4a35a702019-11-26 08:17:33 -0800167// maxQueuedControlFrames is the maximum number of control frames like
168// SETTINGS, PING and RST_STREAM that will be queued for writing before
169// the connection is closed to prevent memory exhaustion attacks.
170func (s *Server) maxQueuedControlFrames() int {
171 // TODO: if anybody asks, add a Server field, and remember to define the
172 // behavior of negative values.
173 return maxQueuedControlFrames
174}
175
Zack Williamse940c7a2019-08-21 14:25:39 -0700176type serverInternalState struct {
177 mu sync.Mutex
178 activeConns map[*serverConn]struct{}
179}
180
181func (s *serverInternalState) registerConn(sc *serverConn) {
182 if s == nil {
183 return // if the Server was used without calling ConfigureServer
184 }
185 s.mu.Lock()
186 s.activeConns[sc] = struct{}{}
187 s.mu.Unlock()
188}
189
190func (s *serverInternalState) unregisterConn(sc *serverConn) {
191 if s == nil {
192 return // if the Server was used without calling ConfigureServer
193 }
194 s.mu.Lock()
195 delete(s.activeConns, sc)
196 s.mu.Unlock()
197}
198
199func (s *serverInternalState) startGracefulShutdown() {
200 if s == nil {
201 return // if the Server was used without calling ConfigureServer
202 }
203 s.mu.Lock()
204 for sc := range s.activeConns {
205 sc.startGracefulShutdown()
206 }
207 s.mu.Unlock()
208}
209
210// ConfigureServer adds HTTP/2 support to a net/http Server.
211//
212// The configuration conf may be nil.
213//
214// ConfigureServer must be called before s begins serving.
215func ConfigureServer(s *http.Server, conf *Server) error {
216 if s == nil {
217 panic("nil *http.Server")
218 }
219 if conf == nil {
220 conf = new(Server)
221 }
222 conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
223 if h1, h2 := s, conf; h2.IdleTimeout == 0 {
224 if h1.IdleTimeout != 0 {
225 h2.IdleTimeout = h1.IdleTimeout
226 } else {
227 h2.IdleTimeout = h1.ReadTimeout
228 }
229 }
230 s.RegisterOnShutdown(conf.state.startGracefulShutdown)
231
232 if s.TLSConfig == nil {
233 s.TLSConfig = new(tls.Config)
234 } else if s.TLSConfig.CipherSuites != nil {
235 // If they already provided a CipherSuite list, return
236 // an error if it has a bad order or is missing
237 // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
238 haveRequired := false
239 sawBad := false
240 for i, cs := range s.TLSConfig.CipherSuites {
241 switch cs {
242 case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
243 // Alternative MTI cipher to not discourage ECDSA-only servers.
244 // See http://golang.org/cl/30721 for further information.
245 tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
246 haveRequired = true
247 }
248 if isBadCipher(cs) {
249 sawBad = true
250 } else if sawBad {
251 return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", i, cs)
252 }
253 }
254 if !haveRequired {
255 return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher.")
256 }
257 }
258
259 // Note: not setting MinVersion to tls.VersionTLS12,
260 // as we don't want to interfere with HTTP/1.1 traffic
261 // on the user's server. We enforce TLS 1.2 later once
262 // we accept a connection. Ideally this should be done
263 // during next-proto selection, but using TLS <1.2 with
264 // HTTP/2 is still the client's bug.
265
266 s.TLSConfig.PreferServerCipherSuites = true
267
268 haveNPN := false
269 for _, p := range s.TLSConfig.NextProtos {
270 if p == NextProtoTLS {
271 haveNPN = true
272 break
273 }
274 }
275 if !haveNPN {
276 s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS)
277 }
278
279 if s.TLSNextProto == nil {
280 s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
281 }
282 protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) {
283 if testHookOnConn != nil {
284 testHookOnConn()
285 }
286 conf.ServeConn(c, &ServeConnOpts{
287 Handler: h,
288 BaseConfig: hs,
289 })
290 }
291 s.TLSNextProto[NextProtoTLS] = protoHandler
292 return nil
293}
294
295// ServeConnOpts are options for the Server.ServeConn method.
296type ServeConnOpts struct {
297 // BaseConfig optionally sets the base configuration
298 // for values. If nil, defaults are used.
299 BaseConfig *http.Server
300
301 // Handler specifies which handler to use for processing
302 // requests. If nil, BaseConfig.Handler is used. If BaseConfig
303 // or BaseConfig.Handler is nil, http.DefaultServeMux is used.
304 Handler http.Handler
305}
306
307func (o *ServeConnOpts) baseConfig() *http.Server {
308 if o != nil && o.BaseConfig != nil {
309 return o.BaseConfig
310 }
311 return new(http.Server)
312}
313
314func (o *ServeConnOpts) handler() http.Handler {
315 if o != nil {
316 if o.Handler != nil {
317 return o.Handler
318 }
319 if o.BaseConfig != nil && o.BaseConfig.Handler != nil {
320 return o.BaseConfig.Handler
321 }
322 }
323 return http.DefaultServeMux
324}
325
326// ServeConn serves HTTP/2 requests on the provided connection and
327// blocks until the connection is no longer readable.
328//
329// ServeConn starts speaking HTTP/2 assuming that c has not had any
330// reads or writes. It writes its initial settings frame and expects
331// to be able to read the preface and settings frame from the
332// client. If c has a ConnectionState method like a *tls.Conn, the
333// ConnectionState is used to verify the TLS ciphersuite and to set
334// the Request.TLS field in Handlers.
335//
336// ServeConn does not support h2c by itself. Any h2c support must be
337// implemented in terms of providing a suitably-behaving net.Conn.
338//
339// The opts parameter is optional. If nil, default values are used.
340func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
341 baseCtx, cancel := serverConnBaseContext(c, opts)
342 defer cancel()
343
344 sc := &serverConn{
345 srv: s,
346 hs: opts.baseConfig(),
347 conn: c,
348 baseCtx: baseCtx,
349 remoteAddrStr: c.RemoteAddr().String(),
350 bw: newBufferedWriter(c),
351 handler: opts.handler(),
352 streams: make(map[uint32]*stream),
353 readFrameCh: make(chan readFrameResult),
354 wantWriteFrameCh: make(chan FrameWriteRequest, 8),
355 serveMsgCh: make(chan interface{}, 8),
356 wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
357 bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
358 doneServing: make(chan struct{}),
359 clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
360 advMaxStreams: s.maxConcurrentStreams(),
361 initialStreamSendWindowSize: initialWindowSize,
362 maxFrameSize: initialMaxFrameSize,
363 headerTableSize: initialHeaderTableSize,
364 serveG: newGoroutineLock(),
365 pushEnabled: true,
366 }
367
368 s.state.registerConn(sc)
369 defer s.state.unregisterConn(sc)
370
371 // The net/http package sets the write deadline from the
372 // http.Server.WriteTimeout during the TLS handshake, but then
373 // passes the connection off to us with the deadline already set.
374 // Write deadlines are set per stream in serverConn.newStream.
375 // Disarm the net.Conn write deadline here.
376 if sc.hs.WriteTimeout != 0 {
377 sc.conn.SetWriteDeadline(time.Time{})
378 }
379
380 if s.NewWriteScheduler != nil {
381 sc.writeSched = s.NewWriteScheduler()
382 } else {
383 sc.writeSched = NewRandomWriteScheduler()
384 }
385
386 // These start at the RFC-specified defaults. If there is a higher
387 // configured value for inflow, that will be updated when we send a
388 // WINDOW_UPDATE shortly after sending SETTINGS.
389 sc.flow.add(initialWindowSize)
390 sc.inflow.add(initialWindowSize)
391 sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
392
393 fr := NewFramer(sc.bw, c)
394 fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
395 fr.MaxHeaderListSize = sc.maxHeaderListSize()
396 fr.SetMaxReadFrameSize(s.maxReadFrameSize())
397 sc.framer = fr
398
399 if tc, ok := c.(connectionStater); ok {
400 sc.tlsState = new(tls.ConnectionState)
401 *sc.tlsState = tc.ConnectionState()
402 // 9.2 Use of TLS Features
403 // An implementation of HTTP/2 over TLS MUST use TLS
404 // 1.2 or higher with the restrictions on feature set
405 // and cipher suite described in this section. Due to
406 // implementation limitations, it might not be
407 // possible to fail TLS negotiation. An endpoint MUST
408 // immediately terminate an HTTP/2 connection that
409 // does not meet the TLS requirements described in
410 // this section with a connection error (Section
411 // 5.4.1) of type INADEQUATE_SECURITY.
412 if sc.tlsState.Version < tls.VersionTLS12 {
413 sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low")
414 return
415 }
416
417 if sc.tlsState.ServerName == "" {
418 // Client must use SNI, but we don't enforce that anymore,
419 // since it was causing problems when connecting to bare IP
420 // addresses during development.
421 //
422 // TODO: optionally enforce? Or enforce at the time we receive
423 // a new request, and verify the ServerName matches the :authority?
424 // But that precludes proxy situations, perhaps.
425 //
426 // So for now, do nothing here again.
427 }
428
429 if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
430 // "Endpoints MAY choose to generate a connection error
431 // (Section 5.4.1) of type INADEQUATE_SECURITY if one of
432 // the prohibited cipher suites are negotiated."
433 //
434 // We choose that. In my opinion, the spec is weak
435 // here. It also says both parties must support at least
436 // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no
437 // excuses here. If we really must, we could allow an
438 // "AllowInsecureWeakCiphers" option on the server later.
439 // Let's see how it plays out first.
440 sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite))
441 return
442 }
443 }
444
445 if hook := testHookGetServerConn; hook != nil {
446 hook(sc)
447 }
448 sc.serve()
449}
450
451func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) {
452 ctx, cancel = context.WithCancel(context.Background())
453 ctx = context.WithValue(ctx, http.LocalAddrContextKey, c.LocalAddr())
454 if hs := opts.baseConfig(); hs != nil {
455 ctx = context.WithValue(ctx, http.ServerContextKey, hs)
456 }
457 return
458}
459
460func (sc *serverConn) rejectConn(err ErrCode, debug string) {
461 sc.vlogf("http2: server rejecting conn: %v, %s", err, debug)
462 // ignoring errors. hanging up anyway.
463 sc.framer.WriteGoAway(0, err, []byte(debug))
464 sc.bw.Flush()
465 sc.conn.Close()
466}
467
468type serverConn struct {
469 // Immutable:
470 srv *Server
471 hs *http.Server
472 conn net.Conn
473 bw *bufferedWriter // writing to conn
474 handler http.Handler
475 baseCtx context.Context
476 framer *Framer
477 doneServing chan struct{} // closed when serverConn.serve ends
478 readFrameCh chan readFrameResult // written by serverConn.readFrames
479 wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
480 wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
481 bodyReadCh chan bodyReadMsg // from handlers -> serve
482 serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
483 flow flow // conn-wide (not stream-specific) outbound flow control
484 inflow flow // conn-wide inbound flow control
485 tlsState *tls.ConnectionState // shared by all handlers, like net/http
486 remoteAddrStr string
487 writeSched WriteScheduler
488
489 // Everything following is owned by the serve loop; use serveG.check():
490 serveG goroutineLock // used to verify funcs are on serve()
491 pushEnabled bool
492 sawFirstSettings bool // got the initial SETTINGS frame after the preface
493 needToSendSettingsAck bool
494 unackedSettings int // how many SETTINGS have we sent without ACKs?
Scott Baker4a35a702019-11-26 08:17:33 -0800495 queuedControlFrames int // control frames in the writeSched queue
Zack Williamse940c7a2019-08-21 14:25:39 -0700496 clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
497 advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
498 curClientStreams uint32 // number of open streams initiated by the client
499 curPushedStreams uint32 // number of open streams initiated by server push
500 maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
501 maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
502 streams map[uint32]*stream
503 initialStreamSendWindowSize int32
504 maxFrameSize int32
505 headerTableSize uint32
506 peerMaxHeaderListSize uint32 // zero means unknown (default)
507 canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
508 writingFrame bool // started writing a frame (on serve goroutine or separate)
509 writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
510 needsFrameFlush bool // last frame write wasn't a flush
511 inGoAway bool // we've started to or sent GOAWAY
512 inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
513 needToSendGoAway bool // we need to schedule a GOAWAY frame write
514 goAwayCode ErrCode
515 shutdownTimer *time.Timer // nil until used
516 idleTimer *time.Timer // nil if unused
517
518 // Owned by the writeFrameAsync goroutine:
519 headerWriteBuf bytes.Buffer
520 hpackEncoder *hpack.Encoder
521
522 // Used by startGracefulShutdown.
523 shutdownOnce sync.Once
524}
525
526func (sc *serverConn) maxHeaderListSize() uint32 {
527 n := sc.hs.MaxHeaderBytes
528 if n <= 0 {
529 n = http.DefaultMaxHeaderBytes
530 }
531 // http2's count is in a slightly different unit and includes 32 bytes per pair.
532 // So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
533 const perFieldOverhead = 32 // per http2 spec
534 const typicalHeaders = 10 // conservative
535 return uint32(n + typicalHeaders*perFieldOverhead)
536}
537
538func (sc *serverConn) curOpenStreams() uint32 {
539 sc.serveG.check()
540 return sc.curClientStreams + sc.curPushedStreams
541}
542
543// stream represents a stream. This is the minimal metadata needed by
544// the serve goroutine. Most of the actual stream state is owned by
545// the http.Handler's goroutine in the responseWriter. Because the
546// responseWriter's responseWriterState is recycled at the end of a
547// handler, this struct intentionally has no pointer to the
548// *responseWriter{,State} itself, as the Handler ending nils out the
549// responseWriter's state field.
550type stream struct {
551 // immutable:
552 sc *serverConn
553 id uint32
554 body *pipe // non-nil if expecting DATA frames
555 cw closeWaiter // closed wait stream transitions to closed state
556 ctx context.Context
557 cancelCtx func()
558
559 // owned by serverConn's serve loop:
560 bodyBytes int64 // body bytes seen so far
561 declBodyBytes int64 // or -1 if undeclared
562 flow flow // limits writing from Handler to client
563 inflow flow // what the client is allowed to POST/etc to us
564 parent *stream // or nil
565 numTrailerValues int64
566 weight uint8
567 state streamState
568 resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
569 gotTrailerHeader bool // HEADER frame for trailers was seen
570 wroteHeaders bool // whether we wrote headers (not status 100)
571 writeDeadline *time.Timer // nil if unused
572
573 trailer http.Header // accumulated trailers
574 reqTrailer http.Header // handler's Request.Trailer
575}
576
577func (sc *serverConn) Framer() *Framer { return sc.framer }
578func (sc *serverConn) CloseConn() error { return sc.conn.Close() }
579func (sc *serverConn) Flush() error { return sc.bw.Flush() }
580func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
581 return sc.hpackEncoder, &sc.headerWriteBuf
582}
583
584func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
585 sc.serveG.check()
586 // http://tools.ietf.org/html/rfc7540#section-5.1
587 if st, ok := sc.streams[streamID]; ok {
588 return st.state, st
589 }
590 // "The first use of a new stream identifier implicitly closes all
591 // streams in the "idle" state that might have been initiated by
592 // that peer with a lower-valued stream identifier. For example, if
593 // a client sends a HEADERS frame on stream 7 without ever sending a
594 // frame on stream 5, then stream 5 transitions to the "closed"
595 // state when the first frame for stream 7 is sent or received."
596 if streamID%2 == 1 {
597 if streamID <= sc.maxClientStreamID {
598 return stateClosed, nil
599 }
600 } else {
601 if streamID <= sc.maxPushPromiseID {
602 return stateClosed, nil
603 }
604 }
605 return stateIdle, nil
606}
607
608// setConnState calls the net/http ConnState hook for this connection, if configured.
609// Note that the net/http package does StateNew and StateClosed for us.
610// There is currently no plan for StateHijacked or hijacking HTTP/2 connections.
611func (sc *serverConn) setConnState(state http.ConnState) {
612 if sc.hs.ConnState != nil {
613 sc.hs.ConnState(sc.conn, state)
614 }
615}
616
617func (sc *serverConn) vlogf(format string, args ...interface{}) {
618 if VerboseLogs {
619 sc.logf(format, args...)
620 }
621}
622
623func (sc *serverConn) logf(format string, args ...interface{}) {
624 if lg := sc.hs.ErrorLog; lg != nil {
625 lg.Printf(format, args...)
626 } else {
627 log.Printf(format, args...)
628 }
629}
630
631// errno returns v's underlying uintptr, else 0.
632//
633// TODO: remove this helper function once http2 can use build
634// tags. See comment in isClosedConnError.
635func errno(v error) uintptr {
636 if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr {
637 return uintptr(rv.Uint())
638 }
639 return 0
640}
641
642// isClosedConnError reports whether err is an error from use of a closed
643// network connection.
644func isClosedConnError(err error) bool {
645 if err == nil {
646 return false
647 }
648
649 // TODO: remove this string search and be more like the Windows
650 // case below. That might involve modifying the standard library
651 // to return better error types.
652 str := err.Error()
653 if strings.Contains(str, "use of closed network connection") {
654 return true
655 }
656
657 // TODO(bradfitz): x/tools/cmd/bundle doesn't really support
658 // build tags, so I can't make an http2_windows.go file with
659 // Windows-specific stuff. Fix that and move this, once we
660 // have a way to bundle this into std's net/http somehow.
661 if runtime.GOOS == "windows" {
662 if oe, ok := err.(*net.OpError); ok && oe.Op == "read" {
663 if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" {
664 const WSAECONNABORTED = 10053
665 const WSAECONNRESET = 10054
666 if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED {
667 return true
668 }
669 }
670 }
671 }
672 return false
673}
674
675func (sc *serverConn) condlogf(err error, format string, args ...interface{}) {
676 if err == nil {
677 return
678 }
679 if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout {
680 // Boring, expected errors.
681 sc.vlogf(format, args...)
682 } else {
683 sc.logf(format, args...)
684 }
685}
686
687func (sc *serverConn) canonicalHeader(v string) string {
688 sc.serveG.check()
689 buildCommonHeaderMapsOnce()
690 cv, ok := commonCanonHeader[v]
691 if ok {
692 return cv
693 }
694 cv, ok = sc.canonHeader[v]
695 if ok {
696 return cv
697 }
698 if sc.canonHeader == nil {
699 sc.canonHeader = make(map[string]string)
700 }
701 cv = http.CanonicalHeaderKey(v)
702 sc.canonHeader[v] = cv
703 return cv
704}
705
706type readFrameResult struct {
707 f Frame // valid until readMore is called
708 err error
709
710 // readMore should be called once the consumer no longer needs or
711 // retains f. After readMore, f is invalid and more frames can be
712 // read.
713 readMore func()
714}
715
716// readFrames is the loop that reads incoming frames.
717// It takes care to only read one frame at a time, blocking until the
718// consumer is done with the frame.
719// It's run on its own goroutine.
720func (sc *serverConn) readFrames() {
721 gate := make(gate)
722 gateDone := gate.Done
723 for {
724 f, err := sc.framer.ReadFrame()
725 select {
726 case sc.readFrameCh <- readFrameResult{f, err, gateDone}:
727 case <-sc.doneServing:
728 return
729 }
730 select {
731 case <-gate:
732 case <-sc.doneServing:
733 return
734 }
735 if terminalReadFrameError(err) {
736 return
737 }
738 }
739}
740
741// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine.
742type frameWriteResult struct {
743 wr FrameWriteRequest // what was written (or attempted)
744 err error // result of the writeFrame call
745}
746
747// writeFrameAsync runs in its own goroutine and writes a single frame
748// and then reports when it's done.
749// At most one goroutine can be running writeFrameAsync at a time per
750// serverConn.
751func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) {
752 err := wr.write.writeFrame(sc)
753 sc.wroteFrameCh <- frameWriteResult{wr, err}
754}
755
756func (sc *serverConn) closeAllStreamsOnConnClose() {
757 sc.serveG.check()
758 for _, st := range sc.streams {
759 sc.closeStream(st, errClientDisconnected)
760 }
761}
762
763func (sc *serverConn) stopShutdownTimer() {
764 sc.serveG.check()
765 if t := sc.shutdownTimer; t != nil {
766 t.Stop()
767 }
768}
769
770func (sc *serverConn) notePanic() {
771 // Note: this is for serverConn.serve panicking, not http.Handler code.
772 if testHookOnPanicMu != nil {
773 testHookOnPanicMu.Lock()
774 defer testHookOnPanicMu.Unlock()
775 }
776 if testHookOnPanic != nil {
777 if e := recover(); e != nil {
778 if testHookOnPanic(sc, e) {
779 panic(e)
780 }
781 }
782 }
783}
784
785func (sc *serverConn) serve() {
786 sc.serveG.check()
787 defer sc.notePanic()
788 defer sc.conn.Close()
789 defer sc.closeAllStreamsOnConnClose()
790 defer sc.stopShutdownTimer()
791 defer close(sc.doneServing) // unblocks handlers trying to send
792
793 if VerboseLogs {
794 sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
795 }
796
797 sc.writeFrame(FrameWriteRequest{
798 write: writeSettings{
799 {SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
800 {SettingMaxConcurrentStreams, sc.advMaxStreams},
801 {SettingMaxHeaderListSize, sc.maxHeaderListSize()},
802 {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
803 },
804 })
805 sc.unackedSettings++
806
807 // Each connection starts with intialWindowSize inflow tokens.
808 // If a higher value is configured, we add more tokens.
809 if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
810 sc.sendWindowUpdate(nil, int(diff))
811 }
812
813 if err := sc.readPreface(); err != nil {
814 sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
815 return
816 }
817 // Now that we've got the preface, get us out of the
818 // "StateNew" state. We can't go directly to idle, though.
819 // Active means we read some data and anticipate a request. We'll
820 // do another Active when we get a HEADERS frame.
821 sc.setConnState(http.StateActive)
822 sc.setConnState(http.StateIdle)
823
824 if sc.srv.IdleTimeout != 0 {
825 sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
826 defer sc.idleTimer.Stop()
827 }
828
829 go sc.readFrames() // closed by defer sc.conn.Close above
830
831 settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
832 defer settingsTimer.Stop()
833
834 loopNum := 0
835 for {
836 loopNum++
837 select {
838 case wr := <-sc.wantWriteFrameCh:
839 if se, ok := wr.write.(StreamError); ok {
840 sc.resetStream(se)
841 break
842 }
843 sc.writeFrame(wr)
844 case res := <-sc.wroteFrameCh:
845 sc.wroteFrame(res)
846 case res := <-sc.readFrameCh:
847 if !sc.processFrameFromReader(res) {
848 return
849 }
850 res.readMore()
851 if settingsTimer != nil {
852 settingsTimer.Stop()
853 settingsTimer = nil
854 }
855 case m := <-sc.bodyReadCh:
856 sc.noteBodyRead(m.st, m.n)
857 case msg := <-sc.serveMsgCh:
858 switch v := msg.(type) {
859 case func(int):
860 v(loopNum) // for testing
861 case *serverMessage:
862 switch v {
863 case settingsTimerMsg:
864 sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
865 return
866 case idleTimerMsg:
867 sc.vlogf("connection is idle")
868 sc.goAway(ErrCodeNo)
869 case shutdownTimerMsg:
870 sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
871 return
872 case gracefulShutdownMsg:
873 sc.startGracefulShutdownInternal()
874 default:
875 panic("unknown timer")
876 }
877 case *startPushRequest:
878 sc.startPush(v)
879 default:
880 panic(fmt.Sprintf("unexpected type %T", v))
881 }
882 }
883
Scott Baker4a35a702019-11-26 08:17:33 -0800884 // If the peer is causing us to generate a lot of control frames,
885 // but not reading them from us, assume they are trying to make us
886 // run out of memory.
887 if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() {
888 sc.vlogf("http2: too many control frames in send queue, closing connection")
889 return
890 }
891
Zack Williamse940c7a2019-08-21 14:25:39 -0700892 // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY
893 // with no error code (graceful shutdown), don't start the timer until
894 // all open streams have been completed.
895 sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame
896 gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0
897 if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) {
898 sc.shutDownIn(goAwayTimeout)
899 }
900 }
901}
902
903func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
904 select {
905 case <-sc.doneServing:
906 case <-sharedCh:
907 close(privateCh)
908 }
909}
910
911type serverMessage int
912
913// Message values sent to serveMsgCh.
914var (
915 settingsTimerMsg = new(serverMessage)
916 idleTimerMsg = new(serverMessage)
917 shutdownTimerMsg = new(serverMessage)
918 gracefulShutdownMsg = new(serverMessage)
919)
920
921func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
922func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) }
923func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
924
925func (sc *serverConn) sendServeMsg(msg interface{}) {
926 sc.serveG.checkNotOn() // NOT
927 select {
928 case sc.serveMsgCh <- msg:
929 case <-sc.doneServing:
930 }
931}
932
933var errPrefaceTimeout = errors.New("timeout waiting for client preface")
934
935// readPreface reads the ClientPreface greeting from the peer or
936// returns errPrefaceTimeout on timeout, or an error if the greeting
937// is invalid.
938func (sc *serverConn) readPreface() error {
939 errc := make(chan error, 1)
940 go func() {
941 // Read the client preface
942 buf := make([]byte, len(ClientPreface))
943 if _, err := io.ReadFull(sc.conn, buf); err != nil {
944 errc <- err
945 } else if !bytes.Equal(buf, clientPreface) {
946 errc <- fmt.Errorf("bogus greeting %q", buf)
947 } else {
948 errc <- nil
949 }
950 }()
951 timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
952 defer timer.Stop()
953 select {
954 case <-timer.C:
955 return errPrefaceTimeout
956 case err := <-errc:
957 if err == nil {
958 if VerboseLogs {
959 sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr())
960 }
961 }
962 return err
963 }
964}
965
966var errChanPool = sync.Pool{
967 New: func() interface{} { return make(chan error, 1) },
968}
969
970var writeDataPool = sync.Pool{
971 New: func() interface{} { return new(writeData) },
972}
973
974// writeDataFromHandler writes DATA response frames from a handler on
975// the given stream.
976func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
977 ch := errChanPool.Get().(chan error)
978 writeArg := writeDataPool.Get().(*writeData)
979 *writeArg = writeData{stream.id, data, endStream}
980 err := sc.writeFrameFromHandler(FrameWriteRequest{
981 write: writeArg,
982 stream: stream,
983 done: ch,
984 })
985 if err != nil {
986 return err
987 }
988 var frameWriteDone bool // the frame write is done (successfully or not)
989 select {
990 case err = <-ch:
991 frameWriteDone = true
992 case <-sc.doneServing:
993 return errClientDisconnected
994 case <-stream.cw:
995 // If both ch and stream.cw were ready (as might
996 // happen on the final Write after an http.Handler
997 // ends), prefer the write result. Otherwise this
998 // might just be us successfully closing the stream.
999 // The writeFrameAsync and serve goroutines guarantee
1000 // that the ch send will happen before the stream.cw
1001 // close.
1002 select {
1003 case err = <-ch:
1004 frameWriteDone = true
1005 default:
1006 return errStreamClosed
1007 }
1008 }
1009 errChanPool.Put(ch)
1010 if frameWriteDone {
1011 writeDataPool.Put(writeArg)
1012 }
1013 return err
1014}
1015
1016// writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts
1017// if the connection has gone away.
1018//
1019// This must not be run from the serve goroutine itself, else it might
1020// deadlock writing to sc.wantWriteFrameCh (which is only mildly
1021// buffered and is read by serve itself). If you're on the serve
1022// goroutine, call writeFrame instead.
1023func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
1024 sc.serveG.checkNotOn() // NOT
1025 select {
1026 case sc.wantWriteFrameCh <- wr:
1027 return nil
1028 case <-sc.doneServing:
1029 // Serve loop is gone.
1030 // Client has closed their connection to the server.
1031 return errClientDisconnected
1032 }
1033}
1034
1035// writeFrame schedules a frame to write and sends it if there's nothing
1036// already being written.
1037//
1038// There is no pushback here (the serve goroutine never blocks). It's
1039// the http.Handlers that block, waiting for their previous frames to
1040// make it onto the wire
1041//
1042// If you're not on the serve goroutine, use writeFrameFromHandler instead.
1043func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
1044 sc.serveG.check()
1045
1046 // If true, wr will not be written and wr.done will not be signaled.
1047 var ignoreWrite bool
1048
1049 // We are not allowed to write frames on closed streams. RFC 7540 Section
1050 // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
1051 // a closed stream." Our server never sends PRIORITY, so that exception
1052 // does not apply.
1053 //
1054 // The serverConn might close an open stream while the stream's handler
1055 // is still running. For example, the server might close a stream when it
1056 // receives bad data from the client. If this happens, the handler might
1057 // attempt to write a frame after the stream has been closed (since the
1058 // handler hasn't yet been notified of the close). In this case, we simply
1059 // ignore the frame. The handler will notice that the stream is closed when
1060 // it waits for the frame to be written.
1061 //
1062 // As an exception to this rule, we allow sending RST_STREAM after close.
1063 // This allows us to immediately reject new streams without tracking any
1064 // state for those streams (except for the queued RST_STREAM frame). This
1065 // may result in duplicate RST_STREAMs in some cases, but the client should
1066 // ignore those.
1067 if wr.StreamID() != 0 {
1068 _, isReset := wr.write.(StreamError)
1069 if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
1070 ignoreWrite = true
1071 }
1072 }
1073
1074 // Don't send a 100-continue response if we've already sent headers.
1075 // See golang.org/issue/14030.
1076 switch wr.write.(type) {
1077 case *writeResHeaders:
1078 wr.stream.wroteHeaders = true
1079 case write100ContinueHeadersFrame:
1080 if wr.stream.wroteHeaders {
1081 // We do not need to notify wr.done because this frame is
1082 // never written with wr.done != nil.
1083 if wr.done != nil {
1084 panic("wr.done != nil for write100ContinueHeadersFrame")
1085 }
1086 ignoreWrite = true
1087 }
1088 }
1089
1090 if !ignoreWrite {
Scott Baker4a35a702019-11-26 08:17:33 -08001091 if wr.isControl() {
1092 sc.queuedControlFrames++
1093 // For extra safety, detect wraparounds, which should not happen,
1094 // and pull the plug.
1095 if sc.queuedControlFrames < 0 {
1096 sc.conn.Close()
1097 }
1098 }
Zack Williamse940c7a2019-08-21 14:25:39 -07001099 sc.writeSched.Push(wr)
1100 }
1101 sc.scheduleFrameWrite()
1102}
1103
1104// startFrameWrite starts a goroutine to write wr (in a separate
1105// goroutine since that might block on the network), and updates the
1106// serve goroutine's state about the world, updated from info in wr.
1107func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
1108 sc.serveG.check()
1109 if sc.writingFrame {
1110 panic("internal error: can only be writing one frame at a time")
1111 }
1112
1113 st := wr.stream
1114 if st != nil {
1115 switch st.state {
1116 case stateHalfClosedLocal:
1117 switch wr.write.(type) {
1118 case StreamError, handlerPanicRST, writeWindowUpdate:
1119 // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
1120 // in this state. (We never send PRIORITY from the server, so that is not checked.)
1121 default:
1122 panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
1123 }
1124 case stateClosed:
1125 panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr))
1126 }
1127 }
1128 if wpp, ok := wr.write.(*writePushPromise); ok {
1129 var err error
1130 wpp.promisedID, err = wpp.allocatePromisedID()
1131 if err != nil {
1132 sc.writingFrameAsync = false
1133 wr.replyToWriter(err)
1134 return
1135 }
1136 }
1137
1138 sc.writingFrame = true
1139 sc.needsFrameFlush = true
1140 if wr.write.staysWithinBuffer(sc.bw.Available()) {
1141 sc.writingFrameAsync = false
1142 err := wr.write.writeFrame(sc)
1143 sc.wroteFrame(frameWriteResult{wr, err})
1144 } else {
1145 sc.writingFrameAsync = true
1146 go sc.writeFrameAsync(wr)
1147 }
1148}
1149
1150// errHandlerPanicked is the error given to any callers blocked in a read from
1151// Request.Body when the main goroutine panics. Since most handlers read in the
1152// main ServeHTTP goroutine, this will show up rarely.
1153var errHandlerPanicked = errors.New("http2: handler panicked")
1154
1155// wroteFrame is called on the serve goroutine with the result of
1156// whatever happened on writeFrameAsync.
1157func (sc *serverConn) wroteFrame(res frameWriteResult) {
1158 sc.serveG.check()
1159 if !sc.writingFrame {
1160 panic("internal error: expected to be already writing a frame")
1161 }
1162 sc.writingFrame = false
1163 sc.writingFrameAsync = false
1164
1165 wr := res.wr
1166
1167 if writeEndsStream(wr.write) {
1168 st := wr.stream
1169 if st == nil {
1170 panic("internal error: expecting non-nil stream")
1171 }
1172 switch st.state {
1173 case stateOpen:
1174 // Here we would go to stateHalfClosedLocal in
1175 // theory, but since our handler is done and
1176 // the net/http package provides no mechanism
1177 // for closing a ResponseWriter while still
1178 // reading data (see possible TODO at top of
1179 // this file), we go into closed state here
1180 // anyway, after telling the peer we're
1181 // hanging up on them. We'll transition to
1182 // stateClosed after the RST_STREAM frame is
1183 // written.
1184 st.state = stateHalfClosedLocal
1185 // Section 8.1: a server MAY request that the client abort
1186 // transmission of a request without error by sending a
1187 // RST_STREAM with an error code of NO_ERROR after sending
1188 // a complete response.
1189 sc.resetStream(streamError(st.id, ErrCodeNo))
1190 case stateHalfClosedRemote:
1191 sc.closeStream(st, errHandlerComplete)
1192 }
1193 } else {
1194 switch v := wr.write.(type) {
1195 case StreamError:
1196 // st may be unknown if the RST_STREAM was generated to reject bad input.
1197 if st, ok := sc.streams[v.StreamID]; ok {
1198 sc.closeStream(st, v)
1199 }
1200 case handlerPanicRST:
1201 sc.closeStream(wr.stream, errHandlerPanicked)
1202 }
1203 }
1204
1205 // Reply (if requested) to unblock the ServeHTTP goroutine.
1206 wr.replyToWriter(res.err)
1207
1208 sc.scheduleFrameWrite()
1209}
1210
1211// scheduleFrameWrite tickles the frame writing scheduler.
1212//
1213// If a frame is already being written, nothing happens. This will be called again
1214// when the frame is done being written.
1215//
Scott Baker4a35a702019-11-26 08:17:33 -08001216// If a frame isn't being written and we need to send one, the best frame
1217// to send is selected by writeSched.
Zack Williamse940c7a2019-08-21 14:25:39 -07001218//
1219// If a frame isn't being written and there's nothing else to send, we
1220// flush the write buffer.
1221func (sc *serverConn) scheduleFrameWrite() {
1222 sc.serveG.check()
1223 if sc.writingFrame || sc.inFrameScheduleLoop {
1224 return
1225 }
1226 sc.inFrameScheduleLoop = true
1227 for !sc.writingFrameAsync {
1228 if sc.needToSendGoAway {
1229 sc.needToSendGoAway = false
1230 sc.startFrameWrite(FrameWriteRequest{
1231 write: &writeGoAway{
1232 maxStreamID: sc.maxClientStreamID,
1233 code: sc.goAwayCode,
1234 },
1235 })
1236 continue
1237 }
1238 if sc.needToSendSettingsAck {
1239 sc.needToSendSettingsAck = false
1240 sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
1241 continue
1242 }
1243 if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
1244 if wr, ok := sc.writeSched.Pop(); ok {
Scott Baker4a35a702019-11-26 08:17:33 -08001245 if wr.isControl() {
1246 sc.queuedControlFrames--
1247 }
Zack Williamse940c7a2019-08-21 14:25:39 -07001248 sc.startFrameWrite(wr)
1249 continue
1250 }
1251 }
1252 if sc.needsFrameFlush {
1253 sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
1254 sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
1255 continue
1256 }
1257 break
1258 }
1259 sc.inFrameScheduleLoop = false
1260}
1261
1262// startGracefulShutdown gracefully shuts down a connection. This
1263// sends GOAWAY with ErrCodeNo to tell the client we're gracefully
1264// shutting down. The connection isn't closed until all current
1265// streams are done.
1266//
1267// startGracefulShutdown returns immediately; it does not wait until
1268// the connection has shut down.
1269func (sc *serverConn) startGracefulShutdown() {
1270 sc.serveG.checkNotOn() // NOT
1271 sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) })
1272}
1273
1274// After sending GOAWAY, the connection will close after goAwayTimeout.
1275// If we close the connection immediately after sending GOAWAY, there may
1276// be unsent data in our kernel receive buffer, which will cause the kernel
1277// to send a TCP RST on close() instead of a FIN. This RST will abort the
1278// connection immediately, whether or not the client had received the GOAWAY.
1279//
1280// Ideally we should delay for at least 1 RTT + epsilon so the client has
1281// a chance to read the GOAWAY and stop sending messages. Measuring RTT
1282// is hard, so we approximate with 1 second. See golang.org/issue/18701.
1283//
1284// This is a var so it can be shorter in tests, where all requests uses the
1285// loopback interface making the expected RTT very small.
1286//
1287// TODO: configurable?
1288var goAwayTimeout = 1 * time.Second
1289
1290func (sc *serverConn) startGracefulShutdownInternal() {
1291 sc.goAway(ErrCodeNo)
1292}
1293
1294func (sc *serverConn) goAway(code ErrCode) {
1295 sc.serveG.check()
1296 if sc.inGoAway {
1297 return
1298 }
1299 sc.inGoAway = true
1300 sc.needToSendGoAway = true
1301 sc.goAwayCode = code
1302 sc.scheduleFrameWrite()
1303}
1304
1305func (sc *serverConn) shutDownIn(d time.Duration) {
1306 sc.serveG.check()
1307 sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
1308}
1309
1310func (sc *serverConn) resetStream(se StreamError) {
1311 sc.serveG.check()
1312 sc.writeFrame(FrameWriteRequest{write: se})
1313 if st, ok := sc.streams[se.StreamID]; ok {
1314 st.resetQueued = true
1315 }
1316}
1317
1318// processFrameFromReader processes the serve loop's read from readFrameCh from the
1319// frame-reading goroutine.
1320// processFrameFromReader returns whether the connection should be kept open.
1321func (sc *serverConn) processFrameFromReader(res readFrameResult) bool {
1322 sc.serveG.check()
1323 err := res.err
1324 if err != nil {
1325 if err == ErrFrameTooLarge {
1326 sc.goAway(ErrCodeFrameSize)
1327 return true // goAway will close the loop
1328 }
1329 clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err)
1330 if clientGone {
1331 // TODO: could we also get into this state if
1332 // the peer does a half close
1333 // (e.g. CloseWrite) because they're done
1334 // sending frames but they're still wanting
1335 // our open replies? Investigate.
1336 // TODO: add CloseWrite to crypto/tls.Conn first
1337 // so we have a way to test this? I suppose
1338 // just for testing we could have a non-TLS mode.
1339 return false
1340 }
1341 } else {
1342 f := res.f
1343 if VerboseLogs {
1344 sc.vlogf("http2: server read frame %v", summarizeFrame(f))
1345 }
1346 err = sc.processFrame(f)
1347 if err == nil {
1348 return true
1349 }
1350 }
1351
1352 switch ev := err.(type) {
1353 case StreamError:
1354 sc.resetStream(ev)
1355 return true
1356 case goAwayFlowError:
1357 sc.goAway(ErrCodeFlowControl)
1358 return true
1359 case ConnectionError:
1360 sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
1361 sc.goAway(ErrCode(ev))
1362 return true // goAway will handle shutdown
1363 default:
1364 if res.err != nil {
1365 sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
1366 } else {
1367 sc.logf("http2: server closing client connection: %v", err)
1368 }
1369 return false
1370 }
1371}
1372
1373func (sc *serverConn) processFrame(f Frame) error {
1374 sc.serveG.check()
1375
1376 // First frame received must be SETTINGS.
1377 if !sc.sawFirstSettings {
1378 if _, ok := f.(*SettingsFrame); !ok {
1379 return ConnectionError(ErrCodeProtocol)
1380 }
1381 sc.sawFirstSettings = true
1382 }
1383
1384 switch f := f.(type) {
1385 case *SettingsFrame:
1386 return sc.processSettings(f)
1387 case *MetaHeadersFrame:
1388 return sc.processHeaders(f)
1389 case *WindowUpdateFrame:
1390 return sc.processWindowUpdate(f)
1391 case *PingFrame:
1392 return sc.processPing(f)
1393 case *DataFrame:
1394 return sc.processData(f)
1395 case *RSTStreamFrame:
1396 return sc.processResetStream(f)
1397 case *PriorityFrame:
1398 return sc.processPriority(f)
1399 case *GoAwayFrame:
1400 return sc.processGoAway(f)
1401 case *PushPromiseFrame:
1402 // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
1403 // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1404 return ConnectionError(ErrCodeProtocol)
1405 default:
1406 sc.vlogf("http2: server ignoring frame: %v", f.Header())
1407 return nil
1408 }
1409}
1410
1411func (sc *serverConn) processPing(f *PingFrame) error {
1412 sc.serveG.check()
1413 if f.IsAck() {
1414 // 6.7 PING: " An endpoint MUST NOT respond to PING frames
1415 // containing this flag."
1416 return nil
1417 }
1418 if f.StreamID != 0 {
1419 // "PING frames are not associated with any individual
1420 // stream. If a PING frame is received with a stream
1421 // identifier field value other than 0x0, the recipient MUST
1422 // respond with a connection error (Section 5.4.1) of type
1423 // PROTOCOL_ERROR."
1424 return ConnectionError(ErrCodeProtocol)
1425 }
1426 if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1427 return nil
1428 }
1429 sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
1430 return nil
1431}
1432
1433func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
1434 sc.serveG.check()
1435 switch {
1436 case f.StreamID != 0: // stream-level flow control
1437 state, st := sc.state(f.StreamID)
1438 if state == stateIdle {
1439 // Section 5.1: "Receiving any frame other than HEADERS
1440 // or PRIORITY on a stream in this state MUST be
1441 // treated as a connection error (Section 5.4.1) of
1442 // type PROTOCOL_ERROR."
1443 return ConnectionError(ErrCodeProtocol)
1444 }
1445 if st == nil {
1446 // "WINDOW_UPDATE can be sent by a peer that has sent a
1447 // frame bearing the END_STREAM flag. This means that a
1448 // receiver could receive a WINDOW_UPDATE frame on a "half
1449 // closed (remote)" or "closed" stream. A receiver MUST
1450 // NOT treat this as an error, see Section 5.1."
1451 return nil
1452 }
1453 if !st.flow.add(int32(f.Increment)) {
1454 return streamError(f.StreamID, ErrCodeFlowControl)
1455 }
1456 default: // connection-level flow control
1457 if !sc.flow.add(int32(f.Increment)) {
1458 return goAwayFlowError{}
1459 }
1460 }
1461 sc.scheduleFrameWrite()
1462 return nil
1463}
1464
1465func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
1466 sc.serveG.check()
1467
1468 state, st := sc.state(f.StreamID)
1469 if state == stateIdle {
1470 // 6.4 "RST_STREAM frames MUST NOT be sent for a
1471 // stream in the "idle" state. If a RST_STREAM frame
1472 // identifying an idle stream is received, the
1473 // recipient MUST treat this as a connection error
1474 // (Section 5.4.1) of type PROTOCOL_ERROR.
1475 return ConnectionError(ErrCodeProtocol)
1476 }
1477 if st != nil {
1478 st.cancelCtx()
1479 sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
1480 }
1481 return nil
1482}
1483
1484func (sc *serverConn) closeStream(st *stream, err error) {
1485 sc.serveG.check()
1486 if st.state == stateIdle || st.state == stateClosed {
1487 panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
1488 }
1489 st.state = stateClosed
1490 if st.writeDeadline != nil {
1491 st.writeDeadline.Stop()
1492 }
1493 if st.isPushed() {
1494 sc.curPushedStreams--
1495 } else {
1496 sc.curClientStreams--
1497 }
1498 delete(sc.streams, st.id)
1499 if len(sc.streams) == 0 {
1500 sc.setConnState(http.StateIdle)
1501 if sc.srv.IdleTimeout != 0 {
1502 sc.idleTimer.Reset(sc.srv.IdleTimeout)
1503 }
1504 if h1ServerKeepAlivesDisabled(sc.hs) {
1505 sc.startGracefulShutdownInternal()
1506 }
1507 }
1508 if p := st.body; p != nil {
1509 // Return any buffered unread bytes worth of conn-level flow control.
1510 // See golang.org/issue/16481
1511 sc.sendWindowUpdate(nil, p.Len())
1512
1513 p.CloseWithError(err)
1514 }
1515 st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
1516 sc.writeSched.CloseStream(st.id)
1517}
1518
1519func (sc *serverConn) processSettings(f *SettingsFrame) error {
1520 sc.serveG.check()
1521 if f.IsAck() {
1522 sc.unackedSettings--
1523 if sc.unackedSettings < 0 {
1524 // Why is the peer ACKing settings we never sent?
1525 // The spec doesn't mention this case, but
1526 // hang up on them anyway.
1527 return ConnectionError(ErrCodeProtocol)
1528 }
1529 return nil
1530 }
1531 if f.NumSettings() > 100 || f.HasDuplicates() {
1532 // This isn't actually in the spec, but hang up on
1533 // suspiciously large settings frames or those with
1534 // duplicate entries.
1535 return ConnectionError(ErrCodeProtocol)
1536 }
1537 if err := f.ForeachSetting(sc.processSetting); err != nil {
1538 return err
1539 }
Scott Baker4a35a702019-11-26 08:17:33 -08001540 // TODO: judging by RFC 7540, Section 6.5.3 each SETTINGS frame should be
1541 // acknowledged individually, even if multiple are received before the ACK.
Zack Williamse940c7a2019-08-21 14:25:39 -07001542 sc.needToSendSettingsAck = true
1543 sc.scheduleFrameWrite()
1544 return nil
1545}
1546
1547func (sc *serverConn) processSetting(s Setting) error {
1548 sc.serveG.check()
1549 if err := s.Valid(); err != nil {
1550 return err
1551 }
1552 if VerboseLogs {
1553 sc.vlogf("http2: server processing setting %v", s)
1554 }
1555 switch s.ID {
1556 case SettingHeaderTableSize:
1557 sc.headerTableSize = s.Val
1558 sc.hpackEncoder.SetMaxDynamicTableSize(s.Val)
1559 case SettingEnablePush:
1560 sc.pushEnabled = s.Val != 0
1561 case SettingMaxConcurrentStreams:
1562 sc.clientMaxStreams = s.Val
1563 case SettingInitialWindowSize:
1564 return sc.processSettingInitialWindowSize(s.Val)
1565 case SettingMaxFrameSize:
1566 sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
1567 case SettingMaxHeaderListSize:
1568 sc.peerMaxHeaderListSize = s.Val
1569 default:
1570 // Unknown setting: "An endpoint that receives a SETTINGS
1571 // frame with any unknown or unsupported identifier MUST
1572 // ignore that setting."
1573 if VerboseLogs {
1574 sc.vlogf("http2: server ignoring unknown setting %v", s)
1575 }
1576 }
1577 return nil
1578}
1579
1580func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
1581 sc.serveG.check()
1582 // Note: val already validated to be within range by
1583 // processSetting's Valid call.
1584
1585 // "A SETTINGS frame can alter the initial flow control window
1586 // size for all current streams. When the value of
1587 // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
1588 // adjust the size of all stream flow control windows that it
1589 // maintains by the difference between the new value and the
1590 // old value."
1591 old := sc.initialStreamSendWindowSize
1592 sc.initialStreamSendWindowSize = int32(val)
1593 growth := int32(val) - old // may be negative
1594 for _, st := range sc.streams {
1595 if !st.flow.add(growth) {
1596 // 6.9.2 Initial Flow Control Window Size
1597 // "An endpoint MUST treat a change to
1598 // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
1599 // control window to exceed the maximum size as a
1600 // connection error (Section 5.4.1) of type
1601 // FLOW_CONTROL_ERROR."
1602 return ConnectionError(ErrCodeFlowControl)
1603 }
1604 }
1605 return nil
1606}
1607
1608func (sc *serverConn) processData(f *DataFrame) error {
1609 sc.serveG.check()
1610 if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1611 return nil
1612 }
1613 data := f.Data()
1614
1615 // "If a DATA frame is received whose stream is not in "open"
1616 // or "half closed (local)" state, the recipient MUST respond
1617 // with a stream error (Section 5.4.2) of type STREAM_CLOSED."
1618 id := f.Header().StreamID
1619 state, st := sc.state(id)
1620 if id == 0 || state == stateIdle {
1621 // Section 5.1: "Receiving any frame other than HEADERS
1622 // or PRIORITY on a stream in this state MUST be
1623 // treated as a connection error (Section 5.4.1) of
1624 // type PROTOCOL_ERROR."
1625 return ConnectionError(ErrCodeProtocol)
1626 }
1627 if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
1628 // This includes sending a RST_STREAM if the stream is
1629 // in stateHalfClosedLocal (which currently means that
1630 // the http.Handler returned, so it's done reading &
1631 // done writing). Try to stop the client from sending
1632 // more DATA.
1633
1634 // But still enforce their connection-level flow control,
1635 // and return any flow control bytes since we're not going
1636 // to consume them.
1637 if sc.inflow.available() < int32(f.Length) {
1638 return streamError(id, ErrCodeFlowControl)
1639 }
1640 // Deduct the flow control from inflow, since we're
1641 // going to immediately add it back in
1642 // sendWindowUpdate, which also schedules sending the
1643 // frames.
1644 sc.inflow.take(int32(f.Length))
1645 sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
1646
1647 if st != nil && st.resetQueued {
1648 // Already have a stream error in flight. Don't send another.
1649 return nil
1650 }
1651 return streamError(id, ErrCodeStreamClosed)
1652 }
1653 if st.body == nil {
1654 panic("internal error: should have a body in this state")
1655 }
1656
1657 // Sender sending more than they'd declared?
1658 if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
1659 st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
1660 // RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
1661 // value of a content-length header field does not equal the sum of the
1662 // DATA frame payload lengths that form the body.
1663 return streamError(id, ErrCodeProtocol)
1664 }
1665 if f.Length > 0 {
1666 // Check whether the client has flow control quota.
1667 if st.inflow.available() < int32(f.Length) {
1668 return streamError(id, ErrCodeFlowControl)
1669 }
1670 st.inflow.take(int32(f.Length))
1671
1672 if len(data) > 0 {
1673 wrote, err := st.body.Write(data)
1674 if err != nil {
1675 return streamError(id, ErrCodeStreamClosed)
1676 }
1677 if wrote != len(data) {
1678 panic("internal error: bad Writer")
1679 }
1680 st.bodyBytes += int64(len(data))
1681 }
1682
1683 // Return any padded flow control now, since we won't
1684 // refund it later on body reads.
1685 if pad := int32(f.Length) - int32(len(data)); pad > 0 {
1686 sc.sendWindowUpdate32(nil, pad)
1687 sc.sendWindowUpdate32(st, pad)
1688 }
1689 }
1690 if f.StreamEnded() {
1691 st.endStream()
1692 }
1693 return nil
1694}
1695
1696func (sc *serverConn) processGoAway(f *GoAwayFrame) error {
1697 sc.serveG.check()
1698 if f.ErrCode != ErrCodeNo {
1699 sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1700 } else {
1701 sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1702 }
1703 sc.startGracefulShutdownInternal()
1704 // http://tools.ietf.org/html/rfc7540#section-6.8
1705 // We should not create any new streams, which means we should disable push.
1706 sc.pushEnabled = false
1707 return nil
1708}
1709
1710// isPushed reports whether the stream is server-initiated.
1711func (st *stream) isPushed() bool {
1712 return st.id%2 == 0
1713}
1714
1715// endStream closes a Request.Body's pipe. It is called when a DATA
1716// frame says a request body is over (or after trailers).
1717func (st *stream) endStream() {
1718 sc := st.sc
1719 sc.serveG.check()
1720
1721 if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
1722 st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
1723 st.declBodyBytes, st.bodyBytes))
1724 } else {
1725 st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest)
1726 st.body.CloseWithError(io.EOF)
1727 }
1728 st.state = stateHalfClosedRemote
1729}
1730
1731// copyTrailersToHandlerRequest is run in the Handler's goroutine in
1732// its Request.Body.Read just before it gets io.EOF.
1733func (st *stream) copyTrailersToHandlerRequest() {
1734 for k, vv := range st.trailer {
1735 if _, ok := st.reqTrailer[k]; ok {
1736 // Only copy it over it was pre-declared.
1737 st.reqTrailer[k] = vv
1738 }
1739 }
1740}
1741
1742// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
1743// when the stream's WriteTimeout has fired.
1744func (st *stream) onWriteTimeout() {
1745 st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)})
1746}
1747
1748func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
1749 sc.serveG.check()
1750 id := f.StreamID
1751 if sc.inGoAway {
1752 // Ignore.
1753 return nil
1754 }
1755 // http://tools.ietf.org/html/rfc7540#section-5.1.1
1756 // Streams initiated by a client MUST use odd-numbered stream
1757 // identifiers. [...] An endpoint that receives an unexpected
1758 // stream identifier MUST respond with a connection error
1759 // (Section 5.4.1) of type PROTOCOL_ERROR.
1760 if id%2 != 1 {
1761 return ConnectionError(ErrCodeProtocol)
1762 }
1763 // A HEADERS frame can be used to create a new stream or
1764 // send a trailer for an open one. If we already have a stream
1765 // open, let it process its own HEADERS frame (trailers at this
1766 // point, if it's valid).
1767 if st := sc.streams[f.StreamID]; st != nil {
1768 if st.resetQueued {
1769 // We're sending RST_STREAM to close the stream, so don't bother
1770 // processing this frame.
1771 return nil
1772 }
1773 // RFC 7540, sec 5.1: If an endpoint receives additional frames, other than
1774 // WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in
1775 // this state, it MUST respond with a stream error (Section 5.4.2) of
1776 // type STREAM_CLOSED.
1777 if st.state == stateHalfClosedRemote {
1778 return streamError(id, ErrCodeStreamClosed)
1779 }
1780 return st.processTrailerHeaders(f)
1781 }
1782
1783 // [...] The identifier of a newly established stream MUST be
1784 // numerically greater than all streams that the initiating
1785 // endpoint has opened or reserved. [...] An endpoint that
1786 // receives an unexpected stream identifier MUST respond with
1787 // a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1788 if id <= sc.maxClientStreamID {
1789 return ConnectionError(ErrCodeProtocol)
1790 }
1791 sc.maxClientStreamID = id
1792
1793 if sc.idleTimer != nil {
1794 sc.idleTimer.Stop()
1795 }
1796
1797 // http://tools.ietf.org/html/rfc7540#section-5.1.2
1798 // [...] Endpoints MUST NOT exceed the limit set by their peer. An
1799 // endpoint that receives a HEADERS frame that causes their
1800 // advertised concurrent stream limit to be exceeded MUST treat
1801 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
1802 // or REFUSED_STREAM.
1803 if sc.curClientStreams+1 > sc.advMaxStreams {
1804 if sc.unackedSettings == 0 {
1805 // They should know better.
1806 return streamError(id, ErrCodeProtocol)
1807 }
1808 // Assume it's a network race, where they just haven't
1809 // received our last SETTINGS update. But actually
1810 // this can't happen yet, because we don't yet provide
1811 // a way for users to adjust server parameters at
1812 // runtime.
1813 return streamError(id, ErrCodeRefusedStream)
1814 }
1815
1816 initialState := stateOpen
1817 if f.StreamEnded() {
1818 initialState = stateHalfClosedRemote
1819 }
1820 st := sc.newStream(id, 0, initialState)
1821
1822 if f.HasPriority() {
1823 if err := checkPriority(f.StreamID, f.Priority); err != nil {
1824 return err
1825 }
1826 sc.writeSched.AdjustStream(st.id, f.Priority)
1827 }
1828
1829 rw, req, err := sc.newWriterAndRequest(st, f)
1830 if err != nil {
1831 return err
1832 }
1833 st.reqTrailer = req.Trailer
1834 if st.reqTrailer != nil {
1835 st.trailer = make(http.Header)
1836 }
1837 st.body = req.Body.(*requestBody).pipe // may be nil
1838 st.declBodyBytes = req.ContentLength
1839
1840 handler := sc.handler.ServeHTTP
1841 if f.Truncated {
1842 // Their header list was too long. Send a 431 error.
1843 handler = handleHeaderListTooLong
1844 } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
1845 handler = new400Handler(err)
1846 }
1847
1848 // The net/http package sets the read deadline from the
1849 // http.Server.ReadTimeout during the TLS handshake, but then
1850 // passes the connection off to us with the deadline already
1851 // set. Disarm it here after the request headers are read,
1852 // similar to how the http1 server works. Here it's
1853 // technically more like the http1 Server's ReadHeaderTimeout
1854 // (in Go 1.8), though. That's a more sane option anyway.
1855 if sc.hs.ReadTimeout != 0 {
1856 sc.conn.SetReadDeadline(time.Time{})
1857 }
1858
1859 go sc.runHandler(rw, req, handler)
1860 return nil
1861}
1862
1863func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
1864 sc := st.sc
1865 sc.serveG.check()
1866 if st.gotTrailerHeader {
1867 return ConnectionError(ErrCodeProtocol)
1868 }
1869 st.gotTrailerHeader = true
1870 if !f.StreamEnded() {
1871 return streamError(st.id, ErrCodeProtocol)
1872 }
1873
1874 if len(f.PseudoFields()) > 0 {
1875 return streamError(st.id, ErrCodeProtocol)
1876 }
1877 if st.trailer != nil {
1878 for _, hf := range f.RegularFields() {
1879 key := sc.canonicalHeader(hf.Name)
1880 if !httpguts.ValidTrailerHeader(key) {
1881 // TODO: send more details to the peer somehow. But http2 has
1882 // no way to send debug data at a stream level. Discuss with
1883 // HTTP folk.
1884 return streamError(st.id, ErrCodeProtocol)
1885 }
1886 st.trailer[key] = append(st.trailer[key], hf.Value)
1887 }
1888 }
1889 st.endStream()
1890 return nil
1891}
1892
1893func checkPriority(streamID uint32, p PriorityParam) error {
1894 if streamID == p.StreamDep {
1895 // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
1896 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
1897 // Section 5.3.3 says that a stream can depend on one of its dependencies,
1898 // so it's only self-dependencies that are forbidden.
1899 return streamError(streamID, ErrCodeProtocol)
1900 }
1901 return nil
1902}
1903
1904func (sc *serverConn) processPriority(f *PriorityFrame) error {
1905 if sc.inGoAway {
1906 return nil
1907 }
1908 if err := checkPriority(f.StreamID, f.PriorityParam); err != nil {
1909 return err
1910 }
1911 sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
1912 return nil
1913}
1914
1915func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
1916 sc.serveG.check()
1917 if id == 0 {
1918 panic("internal error: cannot create stream with id 0")
1919 }
1920
1921 ctx, cancelCtx := context.WithCancel(sc.baseCtx)
1922 st := &stream{
1923 sc: sc,
1924 id: id,
1925 state: state,
1926 ctx: ctx,
1927 cancelCtx: cancelCtx,
1928 }
1929 st.cw.Init()
1930 st.flow.conn = &sc.flow // link to conn-level counter
1931 st.flow.add(sc.initialStreamSendWindowSize)
1932 st.inflow.conn = &sc.inflow // link to conn-level counter
1933 st.inflow.add(sc.srv.initialStreamRecvWindowSize())
1934 if sc.hs.WriteTimeout != 0 {
1935 st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
1936 }
1937
1938 sc.streams[id] = st
1939 sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
1940 if st.isPushed() {
1941 sc.curPushedStreams++
1942 } else {
1943 sc.curClientStreams++
1944 }
1945 if sc.curOpenStreams() == 1 {
1946 sc.setConnState(http.StateActive)
1947 }
1948
1949 return st
1950}
1951
1952func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
1953 sc.serveG.check()
1954
1955 rp := requestParam{
1956 method: f.PseudoValue("method"),
1957 scheme: f.PseudoValue("scheme"),
1958 authority: f.PseudoValue("authority"),
1959 path: f.PseudoValue("path"),
1960 }
1961
1962 isConnect := rp.method == "CONNECT"
1963 if isConnect {
1964 if rp.path != "" || rp.scheme != "" || rp.authority == "" {
1965 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
1966 }
1967 } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
1968 // See 8.1.2.6 Malformed Requests and Responses:
1969 //
1970 // Malformed requests or responses that are detected
1971 // MUST be treated as a stream error (Section 5.4.2)
1972 // of type PROTOCOL_ERROR."
1973 //
1974 // 8.1.2.3 Request Pseudo-Header Fields
1975 // "All HTTP/2 requests MUST include exactly one valid
1976 // value for the :method, :scheme, and :path
1977 // pseudo-header fields"
1978 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
1979 }
1980
1981 bodyOpen := !f.StreamEnded()
1982 if rp.method == "HEAD" && bodyOpen {
1983 // HEAD requests can't have bodies
1984 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
1985 }
1986
1987 rp.header = make(http.Header)
1988 for _, hf := range f.RegularFields() {
1989 rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value)
1990 }
1991 if rp.authority == "" {
1992 rp.authority = rp.header.Get("Host")
1993 }
1994
1995 rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
1996 if err != nil {
1997 return nil, nil, err
1998 }
1999 if bodyOpen {
2000 if vv, ok := rp.header["Content-Length"]; ok {
2001 req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
2002 } else {
2003 req.ContentLength = -1
2004 }
2005 req.Body.(*requestBody).pipe = &pipe{
2006 b: &dataBuffer{expected: req.ContentLength},
2007 }
2008 }
2009 return rw, req, nil
2010}
2011
2012type requestParam struct {
2013 method string
2014 scheme, authority, path string
2015 header http.Header
2016}
2017
2018func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) {
2019 sc.serveG.check()
2020
2021 var tlsState *tls.ConnectionState // nil if not scheme https
2022 if rp.scheme == "https" {
2023 tlsState = sc.tlsState
2024 }
2025
2026 needsContinue := rp.header.Get("Expect") == "100-continue"
2027 if needsContinue {
2028 rp.header.Del("Expect")
2029 }
2030 // Merge Cookie headers into one "; "-delimited value.
2031 if cookies := rp.header["Cookie"]; len(cookies) > 1 {
2032 rp.header.Set("Cookie", strings.Join(cookies, "; "))
2033 }
2034
2035 // Setup Trailers
2036 var trailer http.Header
2037 for _, v := range rp.header["Trailer"] {
2038 for _, key := range strings.Split(v, ",") {
2039 key = http.CanonicalHeaderKey(strings.TrimSpace(key))
2040 switch key {
2041 case "Transfer-Encoding", "Trailer", "Content-Length":
2042 // Bogus. (copy of http1 rules)
2043 // Ignore.
2044 default:
2045 if trailer == nil {
2046 trailer = make(http.Header)
2047 }
2048 trailer[key] = nil
2049 }
2050 }
2051 }
2052 delete(rp.header, "Trailer")
2053
2054 var url_ *url.URL
2055 var requestURI string
2056 if rp.method == "CONNECT" {
2057 url_ = &url.URL{Host: rp.authority}
2058 requestURI = rp.authority // mimic HTTP/1 server behavior
2059 } else {
2060 var err error
2061 url_, err = url.ParseRequestURI(rp.path)
2062 if err != nil {
2063 return nil, nil, streamError(st.id, ErrCodeProtocol)
2064 }
2065 requestURI = rp.path
2066 }
2067
2068 body := &requestBody{
2069 conn: sc,
2070 stream: st,
2071 needsContinue: needsContinue,
2072 }
2073 req := &http.Request{
2074 Method: rp.method,
2075 URL: url_,
2076 RemoteAddr: sc.remoteAddrStr,
2077 Header: rp.header,
2078 RequestURI: requestURI,
2079 Proto: "HTTP/2.0",
2080 ProtoMajor: 2,
2081 ProtoMinor: 0,
2082 TLS: tlsState,
2083 Host: rp.authority,
2084 Body: body,
2085 Trailer: trailer,
2086 }
2087 req = req.WithContext(st.ctx)
2088
2089 rws := responseWriterStatePool.Get().(*responseWriterState)
2090 bwSave := rws.bw
2091 *rws = responseWriterState{} // zero all the fields
2092 rws.conn = sc
2093 rws.bw = bwSave
2094 rws.bw.Reset(chunkWriter{rws})
2095 rws.stream = st
2096 rws.req = req
2097 rws.body = body
2098
2099 rw := &responseWriter{rws: rws}
2100 return rw, req, nil
2101}
2102
2103// Run on its own goroutine.
2104func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
2105 didPanic := true
2106 defer func() {
2107 rw.rws.stream.cancelCtx()
2108 if didPanic {
2109 e := recover()
2110 sc.writeFrameFromHandler(FrameWriteRequest{
2111 write: handlerPanicRST{rw.rws.stream.id},
2112 stream: rw.rws.stream,
2113 })
2114 // Same as net/http:
2115 if e != nil && e != http.ErrAbortHandler {
2116 const size = 64 << 10
2117 buf := make([]byte, size)
2118 buf = buf[:runtime.Stack(buf, false)]
2119 sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
2120 }
2121 return
2122 }
2123 rw.handlerDone()
2124 }()
2125 handler(rw, req)
2126 didPanic = false
2127}
2128
2129func handleHeaderListTooLong(w http.ResponseWriter, r *http.Request) {
2130 // 10.5.1 Limits on Header Block Size:
2131 // .. "A server that receives a larger header block than it is
2132 // willing to handle can send an HTTP 431 (Request Header Fields Too
2133 // Large) status code"
2134 const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+
2135 w.WriteHeader(statusRequestHeaderFieldsTooLarge)
2136 io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>")
2137}
2138
2139// called from handler goroutines.
2140// h may be nil.
2141func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error {
2142 sc.serveG.checkNotOn() // NOT on
2143 var errc chan error
2144 if headerData.h != nil {
2145 // If there's a header map (which we don't own), so we have to block on
2146 // waiting for this frame to be written, so an http.Flush mid-handler
2147 // writes out the correct value of keys, before a handler later potentially
2148 // mutates it.
2149 errc = errChanPool.Get().(chan error)
2150 }
2151 if err := sc.writeFrameFromHandler(FrameWriteRequest{
2152 write: headerData,
2153 stream: st,
2154 done: errc,
2155 }); err != nil {
2156 return err
2157 }
2158 if errc != nil {
2159 select {
2160 case err := <-errc:
2161 errChanPool.Put(errc)
2162 return err
2163 case <-sc.doneServing:
2164 return errClientDisconnected
2165 case <-st.cw:
2166 return errStreamClosed
2167 }
2168 }
2169 return nil
2170}
2171
2172// called from handler goroutines.
2173func (sc *serverConn) write100ContinueHeaders(st *stream) {
2174 sc.writeFrameFromHandler(FrameWriteRequest{
2175 write: write100ContinueHeadersFrame{st.id},
2176 stream: st,
2177 })
2178}
2179
2180// A bodyReadMsg tells the server loop that the http.Handler read n
2181// bytes of the DATA from the client on the given stream.
2182type bodyReadMsg struct {
2183 st *stream
2184 n int
2185}
2186
2187// called from handler goroutines.
2188// Notes that the handler for the given stream ID read n bytes of its body
2189// and schedules flow control tokens to be sent.
2190func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
2191 sc.serveG.checkNotOn() // NOT on
2192 if n > 0 {
2193 select {
2194 case sc.bodyReadCh <- bodyReadMsg{st, n}:
2195 case <-sc.doneServing:
2196 }
2197 }
2198}
2199
2200func (sc *serverConn) noteBodyRead(st *stream, n int) {
2201 sc.serveG.check()
2202 sc.sendWindowUpdate(nil, n) // conn-level
2203 if st.state != stateHalfClosedRemote && st.state != stateClosed {
2204 // Don't send this WINDOW_UPDATE if the stream is closed
2205 // remotely.
2206 sc.sendWindowUpdate(st, n)
2207 }
2208}
2209
2210// st may be nil for conn-level
2211func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
2212 sc.serveG.check()
2213 // "The legal range for the increment to the flow control
2214 // window is 1 to 2^31-1 (2,147,483,647) octets."
2215 // A Go Read call on 64-bit machines could in theory read
2216 // a larger Read than this. Very unlikely, but we handle it here
2217 // rather than elsewhere for now.
2218 const maxUint31 = 1<<31 - 1
2219 for n >= maxUint31 {
2220 sc.sendWindowUpdate32(st, maxUint31)
2221 n -= maxUint31
2222 }
2223 sc.sendWindowUpdate32(st, int32(n))
2224}
2225
2226// st may be nil for conn-level
2227func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
2228 sc.serveG.check()
2229 if n == 0 {
2230 return
2231 }
2232 if n < 0 {
2233 panic("negative update")
2234 }
2235 var streamID uint32
2236 if st != nil {
2237 streamID = st.id
2238 }
2239 sc.writeFrame(FrameWriteRequest{
2240 write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
2241 stream: st,
2242 })
2243 var ok bool
2244 if st == nil {
2245 ok = sc.inflow.add(n)
2246 } else {
2247 ok = st.inflow.add(n)
2248 }
2249 if !ok {
2250 panic("internal error; sent too many window updates without decrements?")
2251 }
2252}
2253
2254// requestBody is the Handler's Request.Body type.
2255// Read and Close may be called concurrently.
2256type requestBody struct {
2257 stream *stream
2258 conn *serverConn
2259 closed bool // for use by Close only
2260 sawEOF bool // for use by Read only
2261 pipe *pipe // non-nil if we have a HTTP entity message body
2262 needsContinue bool // need to send a 100-continue
2263}
2264
2265func (b *requestBody) Close() error {
2266 if b.pipe != nil && !b.closed {
2267 b.pipe.BreakWithError(errClosedBody)
2268 }
2269 b.closed = true
2270 return nil
2271}
2272
2273func (b *requestBody) Read(p []byte) (n int, err error) {
2274 if b.needsContinue {
2275 b.needsContinue = false
2276 b.conn.write100ContinueHeaders(b.stream)
2277 }
2278 if b.pipe == nil || b.sawEOF {
2279 return 0, io.EOF
2280 }
2281 n, err = b.pipe.Read(p)
2282 if err == io.EOF {
2283 b.sawEOF = true
2284 }
2285 if b.conn == nil && inTests {
2286 return
2287 }
2288 b.conn.noteBodyReadFromHandler(b.stream, n, err)
2289 return
2290}
2291
2292// responseWriter is the http.ResponseWriter implementation. It's
2293// intentionally small (1 pointer wide) to minimize garbage. The
2294// responseWriterState pointer inside is zeroed at the end of a
2295// request (in handlerDone) and calls on the responseWriter thereafter
2296// simply crash (caller's mistake), but the much larger responseWriterState
2297// and buffers are reused between multiple requests.
2298type responseWriter struct {
2299 rws *responseWriterState
2300}
2301
2302// Optional http.ResponseWriter interfaces implemented.
2303var (
2304 _ http.CloseNotifier = (*responseWriter)(nil)
2305 _ http.Flusher = (*responseWriter)(nil)
2306 _ stringWriter = (*responseWriter)(nil)
2307)
2308
2309type responseWriterState struct {
2310 // immutable within a request:
2311 stream *stream
2312 req *http.Request
2313 body *requestBody // to close at end of request, if DATA frames didn't
2314 conn *serverConn
2315
2316 // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
2317 bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState}
2318
2319 // mutated by http.Handler goroutine:
2320 handlerHeader http.Header // nil until called
2321 snapHeader http.Header // snapshot of handlerHeader at WriteHeader time
2322 trailers []string // set in writeChunk
2323 status int // status code passed to WriteHeader
2324 wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
2325 sentHeader bool // have we sent the header frame?
2326 handlerDone bool // handler has finished
2327 dirty bool // a Write failed; don't reuse this responseWriterState
2328
2329 sentContentLen int64 // non-zero if handler set a Content-Length header
2330 wroteBytes int64
2331
2332 closeNotifierMu sync.Mutex // guards closeNotifierCh
2333 closeNotifierCh chan bool // nil until first used
2334}
2335
2336type chunkWriter struct{ rws *responseWriterState }
2337
2338func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) }
2339
Scott Baker4a35a702019-11-26 08:17:33 -08002340func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) != 0 }
Zack Williamse940c7a2019-08-21 14:25:39 -07002341
2342// declareTrailer is called for each Trailer header when the
2343// response header is written. It notes that a header will need to be
2344// written in the trailers at the end of the response.
2345func (rws *responseWriterState) declareTrailer(k string) {
2346 k = http.CanonicalHeaderKey(k)
2347 if !httpguts.ValidTrailerHeader(k) {
2348 // Forbidden by RFC 7230, section 4.1.2.
2349 rws.conn.logf("ignoring invalid trailer %q", k)
2350 return
2351 }
2352 if !strSliceContains(rws.trailers, k) {
2353 rws.trailers = append(rws.trailers, k)
2354 }
2355}
2356
2357// writeChunk writes chunks from the bufio.Writer. But because
2358// bufio.Writer may bypass its chunking, sometimes p may be
2359// arbitrarily large.
2360//
2361// writeChunk is also responsible (on the first chunk) for sending the
2362// HEADER response.
2363func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
2364 if !rws.wroteHeader {
2365 rws.writeHeader(200)
2366 }
2367
2368 isHeadResp := rws.req.Method == "HEAD"
2369 if !rws.sentHeader {
2370 rws.sentHeader = true
2371 var ctype, clen string
2372 if clen = rws.snapHeader.Get("Content-Length"); clen != "" {
2373 rws.snapHeader.Del("Content-Length")
2374 clen64, err := strconv.ParseInt(clen, 10, 64)
2375 if err == nil && clen64 >= 0 {
2376 rws.sentContentLen = clen64
2377 } else {
2378 clen = ""
2379 }
2380 }
2381 if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
2382 clen = strconv.Itoa(len(p))
2383 }
2384 _, hasContentType := rws.snapHeader["Content-Type"]
2385 if !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 {
2386 ctype = http.DetectContentType(p)
2387 }
2388 var date string
2389 if _, ok := rws.snapHeader["Date"]; !ok {
2390 // TODO(bradfitz): be faster here, like net/http? measure.
2391 date = time.Now().UTC().Format(http.TimeFormat)
2392 }
2393
2394 for _, v := range rws.snapHeader["Trailer"] {
2395 foreachHeaderElement(v, rws.declareTrailer)
2396 }
2397
2398 // "Connection" headers aren't allowed in HTTP/2 (RFC 7540, 8.1.2.2),
2399 // but respect "Connection" == "close" to mean sending a GOAWAY and tearing
2400 // down the TCP connection when idle, like we do for HTTP/1.
2401 // TODO: remove more Connection-specific header fields here, in addition
2402 // to "Connection".
2403 if _, ok := rws.snapHeader["Connection"]; ok {
2404 v := rws.snapHeader.Get("Connection")
2405 delete(rws.snapHeader, "Connection")
2406 if v == "close" {
2407 rws.conn.startGracefulShutdown()
2408 }
2409 }
2410
2411 endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp
2412 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2413 streamID: rws.stream.id,
2414 httpResCode: rws.status,
2415 h: rws.snapHeader,
2416 endStream: endStream,
2417 contentType: ctype,
2418 contentLength: clen,
2419 date: date,
2420 })
2421 if err != nil {
2422 rws.dirty = true
2423 return 0, err
2424 }
2425 if endStream {
2426 return 0, nil
2427 }
2428 }
2429 if isHeadResp {
2430 return len(p), nil
2431 }
2432 if len(p) == 0 && !rws.handlerDone {
2433 return 0, nil
2434 }
2435
2436 if rws.handlerDone {
2437 rws.promoteUndeclaredTrailers()
2438 }
2439
Scott Baker4a35a702019-11-26 08:17:33 -08002440 endStream := rws.handlerDone && !rws.hasTrailers()
Zack Williamse940c7a2019-08-21 14:25:39 -07002441 if len(p) > 0 || endStream {
2442 // only send a 0 byte DATA frame if we're ending the stream.
2443 if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
2444 rws.dirty = true
2445 return 0, err
2446 }
2447 }
2448
Scott Baker4a35a702019-11-26 08:17:33 -08002449 if rws.handlerDone && rws.hasTrailers() {
Zack Williamse940c7a2019-08-21 14:25:39 -07002450 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2451 streamID: rws.stream.id,
2452 h: rws.handlerHeader,
2453 trailers: rws.trailers,
2454 endStream: true,
2455 })
2456 if err != nil {
2457 rws.dirty = true
2458 }
2459 return len(p), err
2460 }
2461 return len(p), nil
2462}
2463
2464// TrailerPrefix is a magic prefix for ResponseWriter.Header map keys
2465// that, if present, signals that the map entry is actually for
2466// the response trailers, and not the response headers. The prefix
2467// is stripped after the ServeHTTP call finishes and the values are
2468// sent in the trailers.
2469//
2470// This mechanism is intended only for trailers that are not known
2471// prior to the headers being written. If the set of trailers is fixed
2472// or known before the header is written, the normal Go trailers mechanism
2473// is preferred:
2474// https://golang.org/pkg/net/http/#ResponseWriter
2475// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
2476const TrailerPrefix = "Trailer:"
2477
2478// promoteUndeclaredTrailers permits http.Handlers to set trailers
2479// after the header has already been flushed. Because the Go
2480// ResponseWriter interface has no way to set Trailers (only the
2481// Header), and because we didn't want to expand the ResponseWriter
2482// interface, and because nobody used trailers, and because RFC 7230
2483// says you SHOULD (but not must) predeclare any trailers in the
2484// header, the official ResponseWriter rules said trailers in Go must
2485// be predeclared, and then we reuse the same ResponseWriter.Header()
2486// map to mean both Headers and Trailers. When it's time to write the
2487// Trailers, we pick out the fields of Headers that were declared as
2488// trailers. That worked for a while, until we found the first major
2489// user of Trailers in the wild: gRPC (using them only over http2),
2490// and gRPC libraries permit setting trailers mid-stream without
2491// predeclarnig them. So: change of plans. We still permit the old
2492// way, but we also permit this hack: if a Header() key begins with
2493// "Trailer:", the suffix of that key is a Trailer. Because ':' is an
2494// invalid token byte anyway, there is no ambiguity. (And it's already
2495// filtered out) It's mildly hacky, but not terrible.
2496//
2497// This method runs after the Handler is done and promotes any Header
2498// fields to be trailers.
2499func (rws *responseWriterState) promoteUndeclaredTrailers() {
2500 for k, vv := range rws.handlerHeader {
2501 if !strings.HasPrefix(k, TrailerPrefix) {
2502 continue
2503 }
2504 trailerKey := strings.TrimPrefix(k, TrailerPrefix)
2505 rws.declareTrailer(trailerKey)
2506 rws.handlerHeader[http.CanonicalHeaderKey(trailerKey)] = vv
2507 }
2508
2509 if len(rws.trailers) > 1 {
2510 sorter := sorterPool.Get().(*sorter)
2511 sorter.SortStrings(rws.trailers)
2512 sorterPool.Put(sorter)
2513 }
2514}
2515
2516func (w *responseWriter) Flush() {
2517 rws := w.rws
2518 if rws == nil {
2519 panic("Header called after Handler finished")
2520 }
2521 if rws.bw.Buffered() > 0 {
2522 if err := rws.bw.Flush(); err != nil {
2523 // Ignore the error. The frame writer already knows.
2524 return
2525 }
2526 } else {
2527 // The bufio.Writer won't call chunkWriter.Write
2528 // (writeChunk with zero bytes, so we have to do it
2529 // ourselves to force the HTTP response header and/or
2530 // final DATA frame (with END_STREAM) to be sent.
2531 rws.writeChunk(nil)
2532 }
2533}
2534
2535func (w *responseWriter) CloseNotify() <-chan bool {
2536 rws := w.rws
2537 if rws == nil {
2538 panic("CloseNotify called after Handler finished")
2539 }
2540 rws.closeNotifierMu.Lock()
2541 ch := rws.closeNotifierCh
2542 if ch == nil {
2543 ch = make(chan bool, 1)
2544 rws.closeNotifierCh = ch
2545 cw := rws.stream.cw
2546 go func() {
2547 cw.Wait() // wait for close
2548 ch <- true
2549 }()
2550 }
2551 rws.closeNotifierMu.Unlock()
2552 return ch
2553}
2554
2555func (w *responseWriter) Header() http.Header {
2556 rws := w.rws
2557 if rws == nil {
2558 panic("Header called after Handler finished")
2559 }
2560 if rws.handlerHeader == nil {
2561 rws.handlerHeader = make(http.Header)
2562 }
2563 return rws.handlerHeader
2564}
2565
2566// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode.
2567func checkWriteHeaderCode(code int) {
2568 // Issue 22880: require valid WriteHeader status codes.
2569 // For now we only enforce that it's three digits.
2570 // In the future we might block things over 599 (600 and above aren't defined
2571 // at http://httpwg.org/specs/rfc7231.html#status.codes)
2572 // and we might block under 200 (once we have more mature 1xx support).
2573 // But for now any three digits.
2574 //
2575 // We used to send "HTTP/1.1 000 0" on the wire in responses but there's
2576 // no equivalent bogus thing we can realistically send in HTTP/2,
2577 // so we'll consistently panic instead and help people find their bugs
2578 // early. (We can't return an error from WriteHeader even if we wanted to.)
2579 if code < 100 || code > 999 {
2580 panic(fmt.Sprintf("invalid WriteHeader code %v", code))
2581 }
2582}
2583
2584func (w *responseWriter) WriteHeader(code int) {
2585 rws := w.rws
2586 if rws == nil {
2587 panic("WriteHeader called after Handler finished")
2588 }
2589 rws.writeHeader(code)
2590}
2591
2592func (rws *responseWriterState) writeHeader(code int) {
2593 if !rws.wroteHeader {
2594 checkWriteHeaderCode(code)
2595 rws.wroteHeader = true
2596 rws.status = code
2597 if len(rws.handlerHeader) > 0 {
2598 rws.snapHeader = cloneHeader(rws.handlerHeader)
2599 }
2600 }
2601}
2602
2603func cloneHeader(h http.Header) http.Header {
2604 h2 := make(http.Header, len(h))
2605 for k, vv := range h {
2606 vv2 := make([]string, len(vv))
2607 copy(vv2, vv)
2608 h2[k] = vv2
2609 }
2610 return h2
2611}
2612
2613// The Life Of A Write is like this:
2614//
2615// * Handler calls w.Write or w.WriteString ->
2616// * -> rws.bw (*bufio.Writer) ->
2617// * (Handler might call Flush)
2618// * -> chunkWriter{rws}
2619// * -> responseWriterState.writeChunk(p []byte)
2620// * -> responseWriterState.writeChunk (most of the magic; see comment there)
2621func (w *responseWriter) Write(p []byte) (n int, err error) {
2622 return w.write(len(p), p, "")
2623}
2624
2625func (w *responseWriter) WriteString(s string) (n int, err error) {
2626 return w.write(len(s), nil, s)
2627}
2628
2629// either dataB or dataS is non-zero.
2630func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) {
2631 rws := w.rws
2632 if rws == nil {
2633 panic("Write called after Handler finished")
2634 }
2635 if !rws.wroteHeader {
2636 w.WriteHeader(200)
2637 }
2638 if !bodyAllowedForStatus(rws.status) {
2639 return 0, http.ErrBodyNotAllowed
2640 }
2641 rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set
2642 if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen {
2643 // TODO: send a RST_STREAM
2644 return 0, errors.New("http2: handler wrote more than declared Content-Length")
2645 }
2646
2647 if dataB != nil {
2648 return rws.bw.Write(dataB)
2649 } else {
2650 return rws.bw.WriteString(dataS)
2651 }
2652}
2653
2654func (w *responseWriter) handlerDone() {
2655 rws := w.rws
2656 dirty := rws.dirty
2657 rws.handlerDone = true
2658 w.Flush()
2659 w.rws = nil
2660 if !dirty {
2661 // Only recycle the pool if all prior Write calls to
2662 // the serverConn goroutine completed successfully. If
2663 // they returned earlier due to resets from the peer
2664 // there might still be write goroutines outstanding
2665 // from the serverConn referencing the rws memory. See
2666 // issue 20704.
2667 responseWriterStatePool.Put(rws)
2668 }
2669}
2670
2671// Push errors.
2672var (
2673 ErrRecursivePush = errors.New("http2: recursive push not allowed")
2674 ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
2675)
2676
2677var _ http.Pusher = (*responseWriter)(nil)
2678
2679func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
2680 st := w.rws.stream
2681 sc := st.sc
2682 sc.serveG.checkNotOn()
2683
2684 // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
2685 // http://tools.ietf.org/html/rfc7540#section-6.6
2686 if st.isPushed() {
2687 return ErrRecursivePush
2688 }
2689
2690 if opts == nil {
2691 opts = new(http.PushOptions)
2692 }
2693
2694 // Default options.
2695 if opts.Method == "" {
2696 opts.Method = "GET"
2697 }
2698 if opts.Header == nil {
2699 opts.Header = http.Header{}
2700 }
2701 wantScheme := "http"
2702 if w.rws.req.TLS != nil {
2703 wantScheme = "https"
2704 }
2705
2706 // Validate the request.
2707 u, err := url.Parse(target)
2708 if err != nil {
2709 return err
2710 }
2711 if u.Scheme == "" {
2712 if !strings.HasPrefix(target, "/") {
2713 return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
2714 }
2715 u.Scheme = wantScheme
2716 u.Host = w.rws.req.Host
2717 } else {
2718 if u.Scheme != wantScheme {
2719 return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
2720 }
2721 if u.Host == "" {
2722 return errors.New("URL must have a host")
2723 }
2724 }
2725 for k := range opts.Header {
2726 if strings.HasPrefix(k, ":") {
2727 return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
2728 }
2729 // These headers are meaningful only if the request has a body,
2730 // but PUSH_PROMISE requests cannot have a body.
2731 // http://tools.ietf.org/html/rfc7540#section-8.2
2732 // Also disallow Host, since the promised URL must be absolute.
2733 switch strings.ToLower(k) {
2734 case "content-length", "content-encoding", "trailer", "te", "expect", "host":
2735 return fmt.Errorf("promised request headers cannot include %q", k)
2736 }
2737 }
2738 if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil {
2739 return err
2740 }
2741
2742 // The RFC effectively limits promised requests to GET and HEAD:
2743 // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
2744 // http://tools.ietf.org/html/rfc7540#section-8.2
2745 if opts.Method != "GET" && opts.Method != "HEAD" {
2746 return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
2747 }
2748
2749 msg := &startPushRequest{
2750 parent: st,
2751 method: opts.Method,
2752 url: u,
2753 header: cloneHeader(opts.Header),
2754 done: errChanPool.Get().(chan error),
2755 }
2756
2757 select {
2758 case <-sc.doneServing:
2759 return errClientDisconnected
2760 case <-st.cw:
2761 return errStreamClosed
2762 case sc.serveMsgCh <- msg:
2763 }
2764
2765 select {
2766 case <-sc.doneServing:
2767 return errClientDisconnected
2768 case <-st.cw:
2769 return errStreamClosed
2770 case err := <-msg.done:
2771 errChanPool.Put(msg.done)
2772 return err
2773 }
2774}
2775
2776type startPushRequest struct {
2777 parent *stream
2778 method string
2779 url *url.URL
2780 header http.Header
2781 done chan error
2782}
2783
2784func (sc *serverConn) startPush(msg *startPushRequest) {
2785 sc.serveG.check()
2786
2787 // http://tools.ietf.org/html/rfc7540#section-6.6.
2788 // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
2789 // is in either the "open" or "half-closed (remote)" state.
2790 if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
2791 // responseWriter.Push checks that the stream is peer-initiaed.
2792 msg.done <- errStreamClosed
2793 return
2794 }
2795
2796 // http://tools.ietf.org/html/rfc7540#section-6.6.
2797 if !sc.pushEnabled {
2798 msg.done <- http.ErrNotSupported
2799 return
2800 }
2801
2802 // PUSH_PROMISE frames must be sent in increasing order by stream ID, so
2803 // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
2804 // is written. Once the ID is allocated, we start the request handler.
2805 allocatePromisedID := func() (uint32, error) {
2806 sc.serveG.check()
2807
2808 // Check this again, just in case. Technically, we might have received
2809 // an updated SETTINGS by the time we got around to writing this frame.
2810 if !sc.pushEnabled {
2811 return 0, http.ErrNotSupported
2812 }
2813 // http://tools.ietf.org/html/rfc7540#section-6.5.2.
2814 if sc.curPushedStreams+1 > sc.clientMaxStreams {
2815 return 0, ErrPushLimitReached
2816 }
2817
2818 // http://tools.ietf.org/html/rfc7540#section-5.1.1.
2819 // Streams initiated by the server MUST use even-numbered identifiers.
2820 // A server that is unable to establish a new stream identifier can send a GOAWAY
2821 // frame so that the client is forced to open a new connection for new streams.
2822 if sc.maxPushPromiseID+2 >= 1<<31 {
2823 sc.startGracefulShutdownInternal()
2824 return 0, ErrPushLimitReached
2825 }
2826 sc.maxPushPromiseID += 2
2827 promisedID := sc.maxPushPromiseID
2828
2829 // http://tools.ietf.org/html/rfc7540#section-8.2.
2830 // Strictly speaking, the new stream should start in "reserved (local)", then
2831 // transition to "half closed (remote)" after sending the initial HEADERS, but
2832 // we start in "half closed (remote)" for simplicity.
2833 // See further comments at the definition of stateHalfClosedRemote.
2834 promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote)
2835 rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{
2836 method: msg.method,
2837 scheme: msg.url.Scheme,
2838 authority: msg.url.Host,
2839 path: msg.url.RequestURI(),
2840 header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
2841 })
2842 if err != nil {
2843 // Should not happen, since we've already validated msg.url.
2844 panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
2845 }
2846
2847 go sc.runHandler(rw, req, sc.handler.ServeHTTP)
2848 return promisedID, nil
2849 }
2850
2851 sc.writeFrame(FrameWriteRequest{
2852 write: &writePushPromise{
2853 streamID: msg.parent.id,
2854 method: msg.method,
2855 url: msg.url,
2856 h: msg.header,
2857 allocatePromisedID: allocatePromisedID,
2858 },
2859 stream: msg.parent,
2860 done: msg.done,
2861 })
2862}
2863
2864// foreachHeaderElement splits v according to the "#rule" construction
2865// in RFC 7230 section 7 and calls fn for each non-empty element.
2866func foreachHeaderElement(v string, fn func(string)) {
2867 v = textproto.TrimString(v)
2868 if v == "" {
2869 return
2870 }
2871 if !strings.Contains(v, ",") {
2872 fn(v)
2873 return
2874 }
2875 for _, f := range strings.Split(v, ",") {
2876 if f = textproto.TrimString(f); f != "" {
2877 fn(f)
2878 }
2879 }
2880}
2881
2882// From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2
2883var connHeaders = []string{
2884 "Connection",
2885 "Keep-Alive",
2886 "Proxy-Connection",
2887 "Transfer-Encoding",
2888 "Upgrade",
2889}
2890
2891// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request,
2892// per RFC 7540 Section 8.1.2.2.
2893// The returned error is reported to users.
2894func checkValidHTTP2RequestHeaders(h http.Header) error {
2895 for _, k := range connHeaders {
2896 if _, ok := h[k]; ok {
2897 return fmt.Errorf("request header %q is not valid in HTTP/2", k)
2898 }
2899 }
2900 te := h["Te"]
2901 if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) {
2902 return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
2903 }
2904 return nil
2905}
2906
2907func new400Handler(err error) http.HandlerFunc {
2908 return func(w http.ResponseWriter, r *http.Request) {
2909 http.Error(w, err.Error(), http.StatusBadRequest)
2910 }
2911}
2912
2913// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
2914// disabled. See comments on h1ServerShutdownChan above for why
2915// the code is written this way.
2916func h1ServerKeepAlivesDisabled(hs *http.Server) bool {
2917 var x interface{} = hs
2918 type I interface {
2919 doKeepAlives() bool
2920 }
2921 if hs, ok := x.(I); ok {
2922 return !hs.doKeepAlives()
2923 }
2924 return false
2925}