blob: bc9e41a1b770cc946a0f939f7ef1fff3c2933db5 [file] [log] [blame]
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// TODO: turn off the serve goroutine when idle, so
6// an idle conn only has the readFrames goroutine active. (which could
7// also be optimized probably to pin less memory in crypto/tls). This
8// would involve tracking when the serve goroutine is active (atomic
9// int32 read/CAS probably?) and starting it up when frames arrive,
10// and shutting it down when all handlers exit. the occasional PING
11// packets could use time.AfterFunc to call sc.wakeStartServeLoop()
12// (which is a no-op if already running) and then queue the PING write
13// as normal. The serve loop would then exit in most cases (if no
14// Handlers running) and not be woken up again until the PING packet
15// returns.
16
17// TODO (maybe): add a mechanism for Handlers to going into
18// half-closed-local mode (rw.(io.Closer) test?) but not exit their
19// handler, and continue to be able to read from the
20// Request.Body. This would be a somewhat semantic change from HTTP/1
21// (or at least what we expose in net/http), so I'd probably want to
22// add it there too. For now, this package says that returning from
23// the Handler ServeHTTP function means you're both done reading and
24// done writing, without a way to stop just one or the other.
25
26package http2
27
28import (
29 "bufio"
30 "bytes"
31 "context"
32 "crypto/tls"
33 "errors"
34 "fmt"
35 "io"
36 "log"
37 "math"
38 "net"
39 "net/http"
40 "net/textproto"
41 "net/url"
42 "os"
43 "reflect"
44 "runtime"
45 "strconv"
46 "strings"
47 "sync"
48 "time"
49
50 "golang.org/x/net/http/httpguts"
51 "golang.org/x/net/http2/hpack"
52)
53
54const (
Arjun E K57a7fcb2020-01-30 06:44:45 +000055 prefaceTimeout = 10 * time.Second
56 firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
57 handlerChunkWriteSize = 4 << 10
58 defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
59 maxQueuedControlFrames = 10000
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070060)
61
62var (
63 errClientDisconnected = errors.New("client disconnected")
64 errClosedBody = errors.New("body closed by handler")
65 errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
66 errStreamClosed = errors.New("http2: stream closed")
67)
68
69var responseWriterStatePool = sync.Pool{
70 New: func() interface{} {
71 rws := &responseWriterState{}
72 rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
73 return rws
74 },
75}
76
77// Test hooks.
78var (
79 testHookOnConn func()
80 testHookGetServerConn func(*serverConn)
81 testHookOnPanicMu *sync.Mutex // nil except in tests
82 testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool)
83)
84
85// Server is an HTTP/2 server.
86type Server struct {
87 // MaxHandlers limits the number of http.Handler ServeHTTP goroutines
88 // which may run at a time over all connections.
89 // Negative or zero no limit.
90 // TODO: implement
91 MaxHandlers int
92
93 // MaxConcurrentStreams optionally specifies the number of
94 // concurrent streams that each client may have open at a
95 // time. This is unrelated to the number of http.Handler goroutines
96 // which may be active globally, which is MaxHandlers.
97 // If zero, MaxConcurrentStreams defaults to at least 100, per
98 // the HTTP/2 spec's recommendations.
99 MaxConcurrentStreams uint32
100
101 // MaxReadFrameSize optionally specifies the largest frame
102 // this server is willing to read. A valid value is between
103 // 16k and 16M, inclusive. If zero or otherwise invalid, a
104 // default value is used.
105 MaxReadFrameSize uint32
106
107 // PermitProhibitedCipherSuites, if true, permits the use of
108 // cipher suites prohibited by the HTTP/2 spec.
109 PermitProhibitedCipherSuites bool
110
111 // IdleTimeout specifies how long until idle clients should be
112 // closed with a GOAWAY frame. PING frames are not considered
113 // activity for the purposes of IdleTimeout.
114 IdleTimeout time.Duration
115
116 // MaxUploadBufferPerConnection is the size of the initial flow
117 // control window for each connections. The HTTP/2 spec does not
118 // allow this to be smaller than 65535 or larger than 2^32-1.
119 // If the value is outside this range, a default value will be
120 // used instead.
121 MaxUploadBufferPerConnection int32
122
123 // MaxUploadBufferPerStream is the size of the initial flow control
124 // window for each stream. The HTTP/2 spec does not allow this to
125 // be larger than 2^32-1. If the value is zero or larger than the
126 // maximum, a default value will be used instead.
127 MaxUploadBufferPerStream int32
128
129 // NewWriteScheduler constructs a write scheduler for a connection.
130 // If nil, a default scheduler is chosen.
131 NewWriteScheduler func() WriteScheduler
132
133 // Internal state. This is a pointer (rather than embedded directly)
134 // so that we don't embed a Mutex in this struct, which will make the
135 // struct non-copyable, which might break some callers.
136 state *serverInternalState
137}
138
139func (s *Server) initialConnRecvWindowSize() int32 {
140 if s.MaxUploadBufferPerConnection > initialWindowSize {
141 return s.MaxUploadBufferPerConnection
142 }
143 return 1 << 20
144}
145
146func (s *Server) initialStreamRecvWindowSize() int32 {
147 if s.MaxUploadBufferPerStream > 0 {
148 return s.MaxUploadBufferPerStream
149 }
150 return 1 << 20
151}
152
153func (s *Server) maxReadFrameSize() uint32 {
154 if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
155 return v
156 }
157 return defaultMaxReadFrameSize
158}
159
160func (s *Server) maxConcurrentStreams() uint32 {
161 if v := s.MaxConcurrentStreams; v > 0 {
162 return v
163 }
164 return defaultMaxStreams
165}
166
Arjun E K57a7fcb2020-01-30 06:44:45 +0000167// maxQueuedControlFrames is the maximum number of control frames like
168// SETTINGS, PING and RST_STREAM that will be queued for writing before
169// the connection is closed to prevent memory exhaustion attacks.
170func (s *Server) maxQueuedControlFrames() int {
171 // TODO: if anybody asks, add a Server field, and remember to define the
172 // behavior of negative values.
173 return maxQueuedControlFrames
174}
175
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700176type serverInternalState struct {
177 mu sync.Mutex
178 activeConns map[*serverConn]struct{}
179}
180
181func (s *serverInternalState) registerConn(sc *serverConn) {
182 if s == nil {
183 return // if the Server was used without calling ConfigureServer
184 }
185 s.mu.Lock()
186 s.activeConns[sc] = struct{}{}
187 s.mu.Unlock()
188}
189
190func (s *serverInternalState) unregisterConn(sc *serverConn) {
191 if s == nil {
192 return // if the Server was used without calling ConfigureServer
193 }
194 s.mu.Lock()
195 delete(s.activeConns, sc)
196 s.mu.Unlock()
197}
198
199func (s *serverInternalState) startGracefulShutdown() {
200 if s == nil {
201 return // if the Server was used without calling ConfigureServer
202 }
203 s.mu.Lock()
204 for sc := range s.activeConns {
205 sc.startGracefulShutdown()
206 }
207 s.mu.Unlock()
208}
209
210// ConfigureServer adds HTTP/2 support to a net/http Server.
211//
212// The configuration conf may be nil.
213//
214// ConfigureServer must be called before s begins serving.
215func ConfigureServer(s *http.Server, conf *Server) error {
216 if s == nil {
217 panic("nil *http.Server")
218 }
219 if conf == nil {
220 conf = new(Server)
221 }
222 conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
223 if h1, h2 := s, conf; h2.IdleTimeout == 0 {
224 if h1.IdleTimeout != 0 {
225 h2.IdleTimeout = h1.IdleTimeout
226 } else {
227 h2.IdleTimeout = h1.ReadTimeout
228 }
229 }
230 s.RegisterOnShutdown(conf.state.startGracefulShutdown)
231
232 if s.TLSConfig == nil {
233 s.TLSConfig = new(tls.Config)
234 } else if s.TLSConfig.CipherSuites != nil {
235 // If they already provided a CipherSuite list, return
236 // an error if it has a bad order or is missing
237 // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
238 haveRequired := false
239 sawBad := false
240 for i, cs := range s.TLSConfig.CipherSuites {
241 switch cs {
242 case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
243 // Alternative MTI cipher to not discourage ECDSA-only servers.
244 // See http://golang.org/cl/30721 for further information.
245 tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
246 haveRequired = true
247 }
248 if isBadCipher(cs) {
249 sawBad = true
250 } else if sawBad {
251 return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", i, cs)
252 }
253 }
254 if !haveRequired {
Pragya Arya324337e2020-02-20 14:35:08 +0530255 return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher (need at least one of TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 or TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256).")
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700256 }
257 }
258
259 // Note: not setting MinVersion to tls.VersionTLS12,
260 // as we don't want to interfere with HTTP/1.1 traffic
261 // on the user's server. We enforce TLS 1.2 later once
262 // we accept a connection. Ideally this should be done
263 // during next-proto selection, but using TLS <1.2 with
264 // HTTP/2 is still the client's bug.
265
266 s.TLSConfig.PreferServerCipherSuites = true
267
268 haveNPN := false
269 for _, p := range s.TLSConfig.NextProtos {
270 if p == NextProtoTLS {
271 haveNPN = true
272 break
273 }
274 }
275 if !haveNPN {
276 s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS)
277 }
278
279 if s.TLSNextProto == nil {
280 s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
281 }
282 protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) {
283 if testHookOnConn != nil {
284 testHookOnConn()
285 }
286 // The TLSNextProto interface predates contexts, so
287 // the net/http package passes down its per-connection
288 // base context via an exported but unadvertised
289 // method on the Handler. This is for internal
290 // net/http<=>http2 use only.
291 var ctx context.Context
292 type baseContexter interface {
293 BaseContext() context.Context
294 }
295 if bc, ok := h.(baseContexter); ok {
296 ctx = bc.BaseContext()
297 }
298 conf.ServeConn(c, &ServeConnOpts{
299 Context: ctx,
300 Handler: h,
301 BaseConfig: hs,
302 })
303 }
304 s.TLSNextProto[NextProtoTLS] = protoHandler
305 return nil
306}
307
308// ServeConnOpts are options for the Server.ServeConn method.
309type ServeConnOpts struct {
310 // Context is the base context to use.
311 // If nil, context.Background is used.
312 Context context.Context
313
314 // BaseConfig optionally sets the base configuration
315 // for values. If nil, defaults are used.
316 BaseConfig *http.Server
317
318 // Handler specifies which handler to use for processing
319 // requests. If nil, BaseConfig.Handler is used. If BaseConfig
320 // or BaseConfig.Handler is nil, http.DefaultServeMux is used.
321 Handler http.Handler
322}
323
324func (o *ServeConnOpts) context() context.Context {
Arjun E K57a7fcb2020-01-30 06:44:45 +0000325 if o != nil && o.Context != nil {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700326 return o.Context
327 }
328 return context.Background()
329}
330
331func (o *ServeConnOpts) baseConfig() *http.Server {
332 if o != nil && o.BaseConfig != nil {
333 return o.BaseConfig
334 }
335 return new(http.Server)
336}
337
338func (o *ServeConnOpts) handler() http.Handler {
339 if o != nil {
340 if o.Handler != nil {
341 return o.Handler
342 }
343 if o.BaseConfig != nil && o.BaseConfig.Handler != nil {
344 return o.BaseConfig.Handler
345 }
346 }
347 return http.DefaultServeMux
348}
349
350// ServeConn serves HTTP/2 requests on the provided connection and
351// blocks until the connection is no longer readable.
352//
353// ServeConn starts speaking HTTP/2 assuming that c has not had any
354// reads or writes. It writes its initial settings frame and expects
355// to be able to read the preface and settings frame from the
356// client. If c has a ConnectionState method like a *tls.Conn, the
357// ConnectionState is used to verify the TLS ciphersuite and to set
358// the Request.TLS field in Handlers.
359//
360// ServeConn does not support h2c by itself. Any h2c support must be
361// implemented in terms of providing a suitably-behaving net.Conn.
362//
363// The opts parameter is optional. If nil, default values are used.
364func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
365 baseCtx, cancel := serverConnBaseContext(c, opts)
366 defer cancel()
367
368 sc := &serverConn{
369 srv: s,
370 hs: opts.baseConfig(),
371 conn: c,
372 baseCtx: baseCtx,
373 remoteAddrStr: c.RemoteAddr().String(),
374 bw: newBufferedWriter(c),
375 handler: opts.handler(),
376 streams: make(map[uint32]*stream),
377 readFrameCh: make(chan readFrameResult),
378 wantWriteFrameCh: make(chan FrameWriteRequest, 8),
379 serveMsgCh: make(chan interface{}, 8),
380 wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
381 bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
382 doneServing: make(chan struct{}),
383 clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
384 advMaxStreams: s.maxConcurrentStreams(),
385 initialStreamSendWindowSize: initialWindowSize,
386 maxFrameSize: initialMaxFrameSize,
387 headerTableSize: initialHeaderTableSize,
388 serveG: newGoroutineLock(),
389 pushEnabled: true,
390 }
391
392 s.state.registerConn(sc)
393 defer s.state.unregisterConn(sc)
394
395 // The net/http package sets the write deadline from the
396 // http.Server.WriteTimeout during the TLS handshake, but then
397 // passes the connection off to us with the deadline already set.
398 // Write deadlines are set per stream in serverConn.newStream.
399 // Disarm the net.Conn write deadline here.
400 if sc.hs.WriteTimeout != 0 {
401 sc.conn.SetWriteDeadline(time.Time{})
402 }
403
404 if s.NewWriteScheduler != nil {
405 sc.writeSched = s.NewWriteScheduler()
406 } else {
407 sc.writeSched = NewRandomWriteScheduler()
408 }
409
410 // These start at the RFC-specified defaults. If there is a higher
411 // configured value for inflow, that will be updated when we send a
412 // WINDOW_UPDATE shortly after sending SETTINGS.
413 sc.flow.add(initialWindowSize)
414 sc.inflow.add(initialWindowSize)
415 sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
416
417 fr := NewFramer(sc.bw, c)
418 fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
419 fr.MaxHeaderListSize = sc.maxHeaderListSize()
420 fr.SetMaxReadFrameSize(s.maxReadFrameSize())
421 sc.framer = fr
422
423 if tc, ok := c.(connectionStater); ok {
424 sc.tlsState = new(tls.ConnectionState)
425 *sc.tlsState = tc.ConnectionState()
426 // 9.2 Use of TLS Features
427 // An implementation of HTTP/2 over TLS MUST use TLS
428 // 1.2 or higher with the restrictions on feature set
429 // and cipher suite described in this section. Due to
430 // implementation limitations, it might not be
431 // possible to fail TLS negotiation. An endpoint MUST
432 // immediately terminate an HTTP/2 connection that
433 // does not meet the TLS requirements described in
434 // this section with a connection error (Section
435 // 5.4.1) of type INADEQUATE_SECURITY.
436 if sc.tlsState.Version < tls.VersionTLS12 {
437 sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low")
438 return
439 }
440
441 if sc.tlsState.ServerName == "" {
442 // Client must use SNI, but we don't enforce that anymore,
443 // since it was causing problems when connecting to bare IP
444 // addresses during development.
445 //
446 // TODO: optionally enforce? Or enforce at the time we receive
447 // a new request, and verify the ServerName matches the :authority?
448 // But that precludes proxy situations, perhaps.
449 //
450 // So for now, do nothing here again.
451 }
452
453 if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) {
454 // "Endpoints MAY choose to generate a connection error
455 // (Section 5.4.1) of type INADEQUATE_SECURITY if one of
456 // the prohibited cipher suites are negotiated."
457 //
458 // We choose that. In my opinion, the spec is weak
459 // here. It also says both parties must support at least
460 // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no
461 // excuses here. If we really must, we could allow an
462 // "AllowInsecureWeakCiphers" option on the server later.
463 // Let's see how it plays out first.
464 sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite))
465 return
466 }
467 }
468
469 if hook := testHookGetServerConn; hook != nil {
470 hook(sc)
471 }
472 sc.serve()
473}
474
475func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) {
476 ctx, cancel = context.WithCancel(opts.context())
477 ctx = context.WithValue(ctx, http.LocalAddrContextKey, c.LocalAddr())
478 if hs := opts.baseConfig(); hs != nil {
479 ctx = context.WithValue(ctx, http.ServerContextKey, hs)
480 }
481 return
482}
483
484func (sc *serverConn) rejectConn(err ErrCode, debug string) {
485 sc.vlogf("http2: server rejecting conn: %v, %s", err, debug)
486 // ignoring errors. hanging up anyway.
487 sc.framer.WriteGoAway(0, err, []byte(debug))
488 sc.bw.Flush()
489 sc.conn.Close()
490}
491
492type serverConn struct {
493 // Immutable:
494 srv *Server
495 hs *http.Server
496 conn net.Conn
497 bw *bufferedWriter // writing to conn
498 handler http.Handler
499 baseCtx context.Context
500 framer *Framer
501 doneServing chan struct{} // closed when serverConn.serve ends
502 readFrameCh chan readFrameResult // written by serverConn.readFrames
503 wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
504 wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
505 bodyReadCh chan bodyReadMsg // from handlers -> serve
506 serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
507 flow flow // conn-wide (not stream-specific) outbound flow control
508 inflow flow // conn-wide inbound flow control
509 tlsState *tls.ConnectionState // shared by all handlers, like net/http
510 remoteAddrStr string
511 writeSched WriteScheduler
512
513 // Everything following is owned by the serve loop; use serveG.check():
514 serveG goroutineLock // used to verify funcs are on serve()
515 pushEnabled bool
516 sawFirstSettings bool // got the initial SETTINGS frame after the preface
517 needToSendSettingsAck bool
518 unackedSettings int // how many SETTINGS have we sent without ACKs?
Arjun E K57a7fcb2020-01-30 06:44:45 +0000519 queuedControlFrames int // control frames in the writeSched queue
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700520 clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
521 advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
522 curClientStreams uint32 // number of open streams initiated by the client
523 curPushedStreams uint32 // number of open streams initiated by server push
524 maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
525 maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
526 streams map[uint32]*stream
527 initialStreamSendWindowSize int32
528 maxFrameSize int32
529 headerTableSize uint32
530 peerMaxHeaderListSize uint32 // zero means unknown (default)
531 canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
532 writingFrame bool // started writing a frame (on serve goroutine or separate)
533 writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
534 needsFrameFlush bool // last frame write wasn't a flush
535 inGoAway bool // we've started to or sent GOAWAY
536 inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
537 needToSendGoAway bool // we need to schedule a GOAWAY frame write
538 goAwayCode ErrCode
539 shutdownTimer *time.Timer // nil until used
540 idleTimer *time.Timer // nil if unused
541
542 // Owned by the writeFrameAsync goroutine:
543 headerWriteBuf bytes.Buffer
544 hpackEncoder *hpack.Encoder
545
546 // Used by startGracefulShutdown.
547 shutdownOnce sync.Once
548}
549
550func (sc *serverConn) maxHeaderListSize() uint32 {
551 n := sc.hs.MaxHeaderBytes
552 if n <= 0 {
553 n = http.DefaultMaxHeaderBytes
554 }
555 // http2's count is in a slightly different unit and includes 32 bytes per pair.
556 // So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
557 const perFieldOverhead = 32 // per http2 spec
558 const typicalHeaders = 10 // conservative
559 return uint32(n + typicalHeaders*perFieldOverhead)
560}
561
562func (sc *serverConn) curOpenStreams() uint32 {
563 sc.serveG.check()
564 return sc.curClientStreams + sc.curPushedStreams
565}
566
567// stream represents a stream. This is the minimal metadata needed by
568// the serve goroutine. Most of the actual stream state is owned by
569// the http.Handler's goroutine in the responseWriter. Because the
570// responseWriter's responseWriterState is recycled at the end of a
571// handler, this struct intentionally has no pointer to the
572// *responseWriter{,State} itself, as the Handler ending nils out the
573// responseWriter's state field.
574type stream struct {
575 // immutable:
576 sc *serverConn
577 id uint32
578 body *pipe // non-nil if expecting DATA frames
579 cw closeWaiter // closed wait stream transitions to closed state
580 ctx context.Context
581 cancelCtx func()
582
583 // owned by serverConn's serve loop:
Matteo Scandolof9d43412021-01-12 11:11:34 -0800584 bodyBytes int64 // body bytes seen so far
585 declBodyBytes int64 // or -1 if undeclared
586 flow flow // limits writing from Handler to client
587 inflow flow // what the client is allowed to POST/etc to us
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700588 state streamState
589 resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
590 gotTrailerHeader bool // HEADER frame for trailers was seen
591 wroteHeaders bool // whether we wrote headers (not status 100)
592 writeDeadline *time.Timer // nil if unused
593
594 trailer http.Header // accumulated trailers
595 reqTrailer http.Header // handler's Request.Trailer
596}
597
598func (sc *serverConn) Framer() *Framer { return sc.framer }
599func (sc *serverConn) CloseConn() error { return sc.conn.Close() }
600func (sc *serverConn) Flush() error { return sc.bw.Flush() }
601func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
602 return sc.hpackEncoder, &sc.headerWriteBuf
603}
604
605func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
606 sc.serveG.check()
607 // http://tools.ietf.org/html/rfc7540#section-5.1
608 if st, ok := sc.streams[streamID]; ok {
609 return st.state, st
610 }
611 // "The first use of a new stream identifier implicitly closes all
612 // streams in the "idle" state that might have been initiated by
613 // that peer with a lower-valued stream identifier. For example, if
614 // a client sends a HEADERS frame on stream 7 without ever sending a
615 // frame on stream 5, then stream 5 transitions to the "closed"
616 // state when the first frame for stream 7 is sent or received."
617 if streamID%2 == 1 {
618 if streamID <= sc.maxClientStreamID {
619 return stateClosed, nil
620 }
621 } else {
622 if streamID <= sc.maxPushPromiseID {
623 return stateClosed, nil
624 }
625 }
626 return stateIdle, nil
627}
628
629// setConnState calls the net/http ConnState hook for this connection, if configured.
630// Note that the net/http package does StateNew and StateClosed for us.
631// There is currently no plan for StateHijacked or hijacking HTTP/2 connections.
632func (sc *serverConn) setConnState(state http.ConnState) {
633 if sc.hs.ConnState != nil {
634 sc.hs.ConnState(sc.conn, state)
635 }
636}
637
638func (sc *serverConn) vlogf(format string, args ...interface{}) {
639 if VerboseLogs {
640 sc.logf(format, args...)
641 }
642}
643
644func (sc *serverConn) logf(format string, args ...interface{}) {
645 if lg := sc.hs.ErrorLog; lg != nil {
646 lg.Printf(format, args...)
647 } else {
648 log.Printf(format, args...)
649 }
650}
651
652// errno returns v's underlying uintptr, else 0.
653//
654// TODO: remove this helper function once http2 can use build
655// tags. See comment in isClosedConnError.
656func errno(v error) uintptr {
657 if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr {
658 return uintptr(rv.Uint())
659 }
660 return 0
661}
662
663// isClosedConnError reports whether err is an error from use of a closed
664// network connection.
665func isClosedConnError(err error) bool {
666 if err == nil {
667 return false
668 }
669
670 // TODO: remove this string search and be more like the Windows
671 // case below. That might involve modifying the standard library
672 // to return better error types.
673 str := err.Error()
674 if strings.Contains(str, "use of closed network connection") {
675 return true
676 }
677
678 // TODO(bradfitz): x/tools/cmd/bundle doesn't really support
679 // build tags, so I can't make an http2_windows.go file with
680 // Windows-specific stuff. Fix that and move this, once we
681 // have a way to bundle this into std's net/http somehow.
682 if runtime.GOOS == "windows" {
683 if oe, ok := err.(*net.OpError); ok && oe.Op == "read" {
684 if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" {
685 const WSAECONNABORTED = 10053
686 const WSAECONNRESET = 10054
687 if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED {
688 return true
689 }
690 }
691 }
692 }
693 return false
694}
695
696func (sc *serverConn) condlogf(err error, format string, args ...interface{}) {
697 if err == nil {
698 return
699 }
700 if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout {
701 // Boring, expected errors.
702 sc.vlogf(format, args...)
703 } else {
704 sc.logf(format, args...)
705 }
706}
707
708func (sc *serverConn) canonicalHeader(v string) string {
709 sc.serveG.check()
710 buildCommonHeaderMapsOnce()
711 cv, ok := commonCanonHeader[v]
712 if ok {
713 return cv
714 }
715 cv, ok = sc.canonHeader[v]
716 if ok {
717 return cv
718 }
719 if sc.canonHeader == nil {
720 sc.canonHeader = make(map[string]string)
721 }
722 cv = http.CanonicalHeaderKey(v)
723 sc.canonHeader[v] = cv
724 return cv
725}
726
727type readFrameResult struct {
728 f Frame // valid until readMore is called
729 err error
730
731 // readMore should be called once the consumer no longer needs or
732 // retains f. After readMore, f is invalid and more frames can be
733 // read.
734 readMore func()
735}
736
737// readFrames is the loop that reads incoming frames.
738// It takes care to only read one frame at a time, blocking until the
739// consumer is done with the frame.
740// It's run on its own goroutine.
741func (sc *serverConn) readFrames() {
742 gate := make(gate)
743 gateDone := gate.Done
744 for {
745 f, err := sc.framer.ReadFrame()
746 select {
747 case sc.readFrameCh <- readFrameResult{f, err, gateDone}:
748 case <-sc.doneServing:
749 return
750 }
751 select {
752 case <-gate:
753 case <-sc.doneServing:
754 return
755 }
756 if terminalReadFrameError(err) {
757 return
758 }
759 }
760}
761
762// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine.
763type frameWriteResult struct {
764 wr FrameWriteRequest // what was written (or attempted)
765 err error // result of the writeFrame call
766}
767
768// writeFrameAsync runs in its own goroutine and writes a single frame
769// and then reports when it's done.
770// At most one goroutine can be running writeFrameAsync at a time per
771// serverConn.
772func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) {
773 err := wr.write.writeFrame(sc)
774 sc.wroteFrameCh <- frameWriteResult{wr, err}
775}
776
777func (sc *serverConn) closeAllStreamsOnConnClose() {
778 sc.serveG.check()
779 for _, st := range sc.streams {
780 sc.closeStream(st, errClientDisconnected)
781 }
782}
783
784func (sc *serverConn) stopShutdownTimer() {
785 sc.serveG.check()
786 if t := sc.shutdownTimer; t != nil {
787 t.Stop()
788 }
789}
790
791func (sc *serverConn) notePanic() {
792 // Note: this is for serverConn.serve panicking, not http.Handler code.
793 if testHookOnPanicMu != nil {
794 testHookOnPanicMu.Lock()
795 defer testHookOnPanicMu.Unlock()
796 }
797 if testHookOnPanic != nil {
798 if e := recover(); e != nil {
799 if testHookOnPanic(sc, e) {
800 panic(e)
801 }
802 }
803 }
804}
805
806func (sc *serverConn) serve() {
807 sc.serveG.check()
808 defer sc.notePanic()
809 defer sc.conn.Close()
810 defer sc.closeAllStreamsOnConnClose()
811 defer sc.stopShutdownTimer()
812 defer close(sc.doneServing) // unblocks handlers trying to send
813
814 if VerboseLogs {
815 sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
816 }
817
818 sc.writeFrame(FrameWriteRequest{
819 write: writeSettings{
820 {SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
821 {SettingMaxConcurrentStreams, sc.advMaxStreams},
822 {SettingMaxHeaderListSize, sc.maxHeaderListSize()},
823 {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
824 },
825 })
826 sc.unackedSettings++
827
828 // Each connection starts with intialWindowSize inflow tokens.
829 // If a higher value is configured, we add more tokens.
830 if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
831 sc.sendWindowUpdate(nil, int(diff))
832 }
833
834 if err := sc.readPreface(); err != nil {
835 sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
836 return
837 }
838 // Now that we've got the preface, get us out of the
839 // "StateNew" state. We can't go directly to idle, though.
840 // Active means we read some data and anticipate a request. We'll
841 // do another Active when we get a HEADERS frame.
842 sc.setConnState(http.StateActive)
843 sc.setConnState(http.StateIdle)
844
845 if sc.srv.IdleTimeout != 0 {
846 sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
847 defer sc.idleTimer.Stop()
848 }
849
850 go sc.readFrames() // closed by defer sc.conn.Close above
851
852 settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer)
853 defer settingsTimer.Stop()
854
855 loopNum := 0
856 for {
857 loopNum++
858 select {
859 case wr := <-sc.wantWriteFrameCh:
860 if se, ok := wr.write.(StreamError); ok {
861 sc.resetStream(se)
862 break
863 }
864 sc.writeFrame(wr)
865 case res := <-sc.wroteFrameCh:
866 sc.wroteFrame(res)
867 case res := <-sc.readFrameCh:
868 if !sc.processFrameFromReader(res) {
869 return
870 }
871 res.readMore()
872 if settingsTimer != nil {
873 settingsTimer.Stop()
874 settingsTimer = nil
875 }
876 case m := <-sc.bodyReadCh:
877 sc.noteBodyRead(m.st, m.n)
878 case msg := <-sc.serveMsgCh:
879 switch v := msg.(type) {
880 case func(int):
881 v(loopNum) // for testing
882 case *serverMessage:
883 switch v {
884 case settingsTimerMsg:
885 sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
886 return
887 case idleTimerMsg:
888 sc.vlogf("connection is idle")
889 sc.goAway(ErrCodeNo)
890 case shutdownTimerMsg:
891 sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
892 return
893 case gracefulShutdownMsg:
894 sc.startGracefulShutdownInternal()
895 default:
896 panic("unknown timer")
897 }
898 case *startPushRequest:
899 sc.startPush(v)
900 default:
901 panic(fmt.Sprintf("unexpected type %T", v))
902 }
903 }
904
Arjun E K57a7fcb2020-01-30 06:44:45 +0000905 // If the peer is causing us to generate a lot of control frames,
906 // but not reading them from us, assume they are trying to make us
907 // run out of memory.
908 if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() {
909 sc.vlogf("http2: too many control frames in send queue, closing connection")
910 return
911 }
912
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700913 // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY
914 // with no error code (graceful shutdown), don't start the timer until
915 // all open streams have been completed.
916 sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame
917 gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0
918 if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) {
919 sc.shutDownIn(goAwayTimeout)
920 }
921 }
922}
923
924func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) {
925 select {
926 case <-sc.doneServing:
927 case <-sharedCh:
928 close(privateCh)
929 }
930}
931
932type serverMessage int
933
934// Message values sent to serveMsgCh.
935var (
936 settingsTimerMsg = new(serverMessage)
937 idleTimerMsg = new(serverMessage)
938 shutdownTimerMsg = new(serverMessage)
939 gracefulShutdownMsg = new(serverMessage)
940)
941
942func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
943func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) }
944func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
945
946func (sc *serverConn) sendServeMsg(msg interface{}) {
947 sc.serveG.checkNotOn() // NOT
948 select {
949 case sc.serveMsgCh <- msg:
950 case <-sc.doneServing:
951 }
952}
953
954var errPrefaceTimeout = errors.New("timeout waiting for client preface")
955
956// readPreface reads the ClientPreface greeting from the peer or
957// returns errPrefaceTimeout on timeout, or an error if the greeting
958// is invalid.
959func (sc *serverConn) readPreface() error {
960 errc := make(chan error, 1)
961 go func() {
962 // Read the client preface
963 buf := make([]byte, len(ClientPreface))
964 if _, err := io.ReadFull(sc.conn, buf); err != nil {
965 errc <- err
966 } else if !bytes.Equal(buf, clientPreface) {
967 errc <- fmt.Errorf("bogus greeting %q", buf)
968 } else {
969 errc <- nil
970 }
971 }()
972 timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
973 defer timer.Stop()
974 select {
975 case <-timer.C:
976 return errPrefaceTimeout
977 case err := <-errc:
978 if err == nil {
979 if VerboseLogs {
980 sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr())
981 }
982 }
983 return err
984 }
985}
986
987var errChanPool = sync.Pool{
988 New: func() interface{} { return make(chan error, 1) },
989}
990
991var writeDataPool = sync.Pool{
992 New: func() interface{} { return new(writeData) },
993}
994
995// writeDataFromHandler writes DATA response frames from a handler on
996// the given stream.
997func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
998 ch := errChanPool.Get().(chan error)
999 writeArg := writeDataPool.Get().(*writeData)
1000 *writeArg = writeData{stream.id, data, endStream}
1001 err := sc.writeFrameFromHandler(FrameWriteRequest{
1002 write: writeArg,
1003 stream: stream,
1004 done: ch,
1005 })
1006 if err != nil {
1007 return err
1008 }
1009 var frameWriteDone bool // the frame write is done (successfully or not)
1010 select {
1011 case err = <-ch:
1012 frameWriteDone = true
1013 case <-sc.doneServing:
1014 return errClientDisconnected
1015 case <-stream.cw:
1016 // If both ch and stream.cw were ready (as might
1017 // happen on the final Write after an http.Handler
1018 // ends), prefer the write result. Otherwise this
1019 // might just be us successfully closing the stream.
1020 // The writeFrameAsync and serve goroutines guarantee
1021 // that the ch send will happen before the stream.cw
1022 // close.
1023 select {
1024 case err = <-ch:
1025 frameWriteDone = true
1026 default:
1027 return errStreamClosed
1028 }
1029 }
1030 errChanPool.Put(ch)
1031 if frameWriteDone {
1032 writeDataPool.Put(writeArg)
1033 }
1034 return err
1035}
1036
1037// writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts
1038// if the connection has gone away.
1039//
1040// This must not be run from the serve goroutine itself, else it might
1041// deadlock writing to sc.wantWriteFrameCh (which is only mildly
1042// buffered and is read by serve itself). If you're on the serve
1043// goroutine, call writeFrame instead.
1044func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
1045 sc.serveG.checkNotOn() // NOT
1046 select {
1047 case sc.wantWriteFrameCh <- wr:
1048 return nil
1049 case <-sc.doneServing:
1050 // Serve loop is gone.
1051 // Client has closed their connection to the server.
1052 return errClientDisconnected
1053 }
1054}
1055
1056// writeFrame schedules a frame to write and sends it if there's nothing
1057// already being written.
1058//
1059// There is no pushback here (the serve goroutine never blocks). It's
1060// the http.Handlers that block, waiting for their previous frames to
1061// make it onto the wire
1062//
1063// If you're not on the serve goroutine, use writeFrameFromHandler instead.
1064func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
1065 sc.serveG.check()
1066
1067 // If true, wr will not be written and wr.done will not be signaled.
1068 var ignoreWrite bool
1069
1070 // We are not allowed to write frames on closed streams. RFC 7540 Section
1071 // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
1072 // a closed stream." Our server never sends PRIORITY, so that exception
1073 // does not apply.
1074 //
1075 // The serverConn might close an open stream while the stream's handler
1076 // is still running. For example, the server might close a stream when it
1077 // receives bad data from the client. If this happens, the handler might
1078 // attempt to write a frame after the stream has been closed (since the
1079 // handler hasn't yet been notified of the close). In this case, we simply
1080 // ignore the frame. The handler will notice that the stream is closed when
1081 // it waits for the frame to be written.
1082 //
1083 // As an exception to this rule, we allow sending RST_STREAM after close.
1084 // This allows us to immediately reject new streams without tracking any
1085 // state for those streams (except for the queued RST_STREAM frame). This
1086 // may result in duplicate RST_STREAMs in some cases, but the client should
1087 // ignore those.
1088 if wr.StreamID() != 0 {
1089 _, isReset := wr.write.(StreamError)
1090 if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
1091 ignoreWrite = true
1092 }
1093 }
1094
1095 // Don't send a 100-continue response if we've already sent headers.
1096 // See golang.org/issue/14030.
1097 switch wr.write.(type) {
1098 case *writeResHeaders:
1099 wr.stream.wroteHeaders = true
1100 case write100ContinueHeadersFrame:
1101 if wr.stream.wroteHeaders {
1102 // We do not need to notify wr.done because this frame is
1103 // never written with wr.done != nil.
1104 if wr.done != nil {
1105 panic("wr.done != nil for write100ContinueHeadersFrame")
1106 }
1107 ignoreWrite = true
1108 }
1109 }
1110
1111 if !ignoreWrite {
Arjun E K57a7fcb2020-01-30 06:44:45 +00001112 if wr.isControl() {
1113 sc.queuedControlFrames++
1114 // For extra safety, detect wraparounds, which should not happen,
1115 // and pull the plug.
1116 if sc.queuedControlFrames < 0 {
1117 sc.conn.Close()
1118 }
1119 }
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07001120 sc.writeSched.Push(wr)
1121 }
1122 sc.scheduleFrameWrite()
1123}
1124
1125// startFrameWrite starts a goroutine to write wr (in a separate
1126// goroutine since that might block on the network), and updates the
1127// serve goroutine's state about the world, updated from info in wr.
1128func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
1129 sc.serveG.check()
1130 if sc.writingFrame {
1131 panic("internal error: can only be writing one frame at a time")
1132 }
1133
1134 st := wr.stream
1135 if st != nil {
1136 switch st.state {
1137 case stateHalfClosedLocal:
1138 switch wr.write.(type) {
1139 case StreamError, handlerPanicRST, writeWindowUpdate:
1140 // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
1141 // in this state. (We never send PRIORITY from the server, so that is not checked.)
1142 default:
1143 panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
1144 }
1145 case stateClosed:
1146 panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr))
1147 }
1148 }
1149 if wpp, ok := wr.write.(*writePushPromise); ok {
1150 var err error
1151 wpp.promisedID, err = wpp.allocatePromisedID()
1152 if err != nil {
1153 sc.writingFrameAsync = false
1154 wr.replyToWriter(err)
1155 return
1156 }
1157 }
1158
1159 sc.writingFrame = true
1160 sc.needsFrameFlush = true
1161 if wr.write.staysWithinBuffer(sc.bw.Available()) {
1162 sc.writingFrameAsync = false
1163 err := wr.write.writeFrame(sc)
1164 sc.wroteFrame(frameWriteResult{wr, err})
1165 } else {
1166 sc.writingFrameAsync = true
1167 go sc.writeFrameAsync(wr)
1168 }
1169}
1170
1171// errHandlerPanicked is the error given to any callers blocked in a read from
1172// Request.Body when the main goroutine panics. Since most handlers read in the
1173// main ServeHTTP goroutine, this will show up rarely.
1174var errHandlerPanicked = errors.New("http2: handler panicked")
1175
1176// wroteFrame is called on the serve goroutine with the result of
1177// whatever happened on writeFrameAsync.
1178func (sc *serverConn) wroteFrame(res frameWriteResult) {
1179 sc.serveG.check()
1180 if !sc.writingFrame {
1181 panic("internal error: expected to be already writing a frame")
1182 }
1183 sc.writingFrame = false
1184 sc.writingFrameAsync = false
1185
1186 wr := res.wr
1187
1188 if writeEndsStream(wr.write) {
1189 st := wr.stream
1190 if st == nil {
1191 panic("internal error: expecting non-nil stream")
1192 }
1193 switch st.state {
1194 case stateOpen:
1195 // Here we would go to stateHalfClosedLocal in
1196 // theory, but since our handler is done and
1197 // the net/http package provides no mechanism
1198 // for closing a ResponseWriter while still
1199 // reading data (see possible TODO at top of
1200 // this file), we go into closed state here
1201 // anyway, after telling the peer we're
1202 // hanging up on them. We'll transition to
1203 // stateClosed after the RST_STREAM frame is
1204 // written.
1205 st.state = stateHalfClosedLocal
1206 // Section 8.1: a server MAY request that the client abort
1207 // transmission of a request without error by sending a
1208 // RST_STREAM with an error code of NO_ERROR after sending
1209 // a complete response.
1210 sc.resetStream(streamError(st.id, ErrCodeNo))
1211 case stateHalfClosedRemote:
1212 sc.closeStream(st, errHandlerComplete)
1213 }
1214 } else {
1215 switch v := wr.write.(type) {
1216 case StreamError:
1217 // st may be unknown if the RST_STREAM was generated to reject bad input.
1218 if st, ok := sc.streams[v.StreamID]; ok {
1219 sc.closeStream(st, v)
1220 }
1221 case handlerPanicRST:
1222 sc.closeStream(wr.stream, errHandlerPanicked)
1223 }
1224 }
1225
1226 // Reply (if requested) to unblock the ServeHTTP goroutine.
1227 wr.replyToWriter(res.err)
1228
1229 sc.scheduleFrameWrite()
1230}
1231
1232// scheduleFrameWrite tickles the frame writing scheduler.
1233//
1234// If a frame is already being written, nothing happens. This will be called again
1235// when the frame is done being written.
1236//
Arjun E K57a7fcb2020-01-30 06:44:45 +00001237// If a frame isn't being written and we need to send one, the best frame
1238// to send is selected by writeSched.
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07001239//
1240// If a frame isn't being written and there's nothing else to send, we
1241// flush the write buffer.
1242func (sc *serverConn) scheduleFrameWrite() {
1243 sc.serveG.check()
1244 if sc.writingFrame || sc.inFrameScheduleLoop {
1245 return
1246 }
1247 sc.inFrameScheduleLoop = true
1248 for !sc.writingFrameAsync {
1249 if sc.needToSendGoAway {
1250 sc.needToSendGoAway = false
1251 sc.startFrameWrite(FrameWriteRequest{
1252 write: &writeGoAway{
1253 maxStreamID: sc.maxClientStreamID,
1254 code: sc.goAwayCode,
1255 },
1256 })
1257 continue
1258 }
1259 if sc.needToSendSettingsAck {
1260 sc.needToSendSettingsAck = false
1261 sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
1262 continue
1263 }
1264 if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
1265 if wr, ok := sc.writeSched.Pop(); ok {
Arjun E K57a7fcb2020-01-30 06:44:45 +00001266 if wr.isControl() {
1267 sc.queuedControlFrames--
1268 }
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07001269 sc.startFrameWrite(wr)
1270 continue
1271 }
1272 }
1273 if sc.needsFrameFlush {
1274 sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
1275 sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
1276 continue
1277 }
1278 break
1279 }
1280 sc.inFrameScheduleLoop = false
1281}
1282
1283// startGracefulShutdown gracefully shuts down a connection. This
1284// sends GOAWAY with ErrCodeNo to tell the client we're gracefully
1285// shutting down. The connection isn't closed until all current
1286// streams are done.
1287//
1288// startGracefulShutdown returns immediately; it does not wait until
1289// the connection has shut down.
1290func (sc *serverConn) startGracefulShutdown() {
1291 sc.serveG.checkNotOn() // NOT
1292 sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) })
1293}
1294
1295// After sending GOAWAY, the connection will close after goAwayTimeout.
1296// If we close the connection immediately after sending GOAWAY, there may
1297// be unsent data in our kernel receive buffer, which will cause the kernel
1298// to send a TCP RST on close() instead of a FIN. This RST will abort the
1299// connection immediately, whether or not the client had received the GOAWAY.
1300//
1301// Ideally we should delay for at least 1 RTT + epsilon so the client has
1302// a chance to read the GOAWAY and stop sending messages. Measuring RTT
1303// is hard, so we approximate with 1 second. See golang.org/issue/18701.
1304//
1305// This is a var so it can be shorter in tests, where all requests uses the
1306// loopback interface making the expected RTT very small.
1307//
1308// TODO: configurable?
1309var goAwayTimeout = 1 * time.Second
1310
1311func (sc *serverConn) startGracefulShutdownInternal() {
1312 sc.goAway(ErrCodeNo)
1313}
1314
1315func (sc *serverConn) goAway(code ErrCode) {
1316 sc.serveG.check()
1317 if sc.inGoAway {
1318 return
1319 }
1320 sc.inGoAway = true
1321 sc.needToSendGoAway = true
1322 sc.goAwayCode = code
1323 sc.scheduleFrameWrite()
1324}
1325
1326func (sc *serverConn) shutDownIn(d time.Duration) {
1327 sc.serveG.check()
1328 sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
1329}
1330
1331func (sc *serverConn) resetStream(se StreamError) {
1332 sc.serveG.check()
1333 sc.writeFrame(FrameWriteRequest{write: se})
1334 if st, ok := sc.streams[se.StreamID]; ok {
1335 st.resetQueued = true
1336 }
1337}
1338
1339// processFrameFromReader processes the serve loop's read from readFrameCh from the
1340// frame-reading goroutine.
1341// processFrameFromReader returns whether the connection should be kept open.
1342func (sc *serverConn) processFrameFromReader(res readFrameResult) bool {
1343 sc.serveG.check()
1344 err := res.err
1345 if err != nil {
1346 if err == ErrFrameTooLarge {
1347 sc.goAway(ErrCodeFrameSize)
1348 return true // goAway will close the loop
1349 }
1350 clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err)
1351 if clientGone {
1352 // TODO: could we also get into this state if
1353 // the peer does a half close
1354 // (e.g. CloseWrite) because they're done
1355 // sending frames but they're still wanting
1356 // our open replies? Investigate.
1357 // TODO: add CloseWrite to crypto/tls.Conn first
1358 // so we have a way to test this? I suppose
1359 // just for testing we could have a non-TLS mode.
1360 return false
1361 }
1362 } else {
1363 f := res.f
1364 if VerboseLogs {
1365 sc.vlogf("http2: server read frame %v", summarizeFrame(f))
1366 }
1367 err = sc.processFrame(f)
1368 if err == nil {
1369 return true
1370 }
1371 }
1372
1373 switch ev := err.(type) {
1374 case StreamError:
1375 sc.resetStream(ev)
1376 return true
1377 case goAwayFlowError:
1378 sc.goAway(ErrCodeFlowControl)
1379 return true
1380 case ConnectionError:
1381 sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
1382 sc.goAway(ErrCode(ev))
1383 return true // goAway will handle shutdown
1384 default:
1385 if res.err != nil {
1386 sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
1387 } else {
1388 sc.logf("http2: server closing client connection: %v", err)
1389 }
1390 return false
1391 }
1392}
1393
1394func (sc *serverConn) processFrame(f Frame) error {
1395 sc.serveG.check()
1396
1397 // First frame received must be SETTINGS.
1398 if !sc.sawFirstSettings {
1399 if _, ok := f.(*SettingsFrame); !ok {
1400 return ConnectionError(ErrCodeProtocol)
1401 }
1402 sc.sawFirstSettings = true
1403 }
1404
1405 switch f := f.(type) {
1406 case *SettingsFrame:
1407 return sc.processSettings(f)
1408 case *MetaHeadersFrame:
1409 return sc.processHeaders(f)
1410 case *WindowUpdateFrame:
1411 return sc.processWindowUpdate(f)
1412 case *PingFrame:
1413 return sc.processPing(f)
1414 case *DataFrame:
1415 return sc.processData(f)
1416 case *RSTStreamFrame:
1417 return sc.processResetStream(f)
1418 case *PriorityFrame:
1419 return sc.processPriority(f)
1420 case *GoAwayFrame:
1421 return sc.processGoAway(f)
1422 case *PushPromiseFrame:
1423 // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
1424 // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1425 return ConnectionError(ErrCodeProtocol)
1426 default:
1427 sc.vlogf("http2: server ignoring frame: %v", f.Header())
1428 return nil
1429 }
1430}
1431
1432func (sc *serverConn) processPing(f *PingFrame) error {
1433 sc.serveG.check()
1434 if f.IsAck() {
1435 // 6.7 PING: " An endpoint MUST NOT respond to PING frames
1436 // containing this flag."
1437 return nil
1438 }
1439 if f.StreamID != 0 {
1440 // "PING frames are not associated with any individual
1441 // stream. If a PING frame is received with a stream
1442 // identifier field value other than 0x0, the recipient MUST
1443 // respond with a connection error (Section 5.4.1) of type
1444 // PROTOCOL_ERROR."
1445 return ConnectionError(ErrCodeProtocol)
1446 }
1447 if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1448 return nil
1449 }
1450 sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
1451 return nil
1452}
1453
1454func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
1455 sc.serveG.check()
1456 switch {
1457 case f.StreamID != 0: // stream-level flow control
1458 state, st := sc.state(f.StreamID)
1459 if state == stateIdle {
1460 // Section 5.1: "Receiving any frame other than HEADERS
1461 // or PRIORITY on a stream in this state MUST be
1462 // treated as a connection error (Section 5.4.1) of
1463 // type PROTOCOL_ERROR."
1464 return ConnectionError(ErrCodeProtocol)
1465 }
1466 if st == nil {
1467 // "WINDOW_UPDATE can be sent by a peer that has sent a
1468 // frame bearing the END_STREAM flag. This means that a
1469 // receiver could receive a WINDOW_UPDATE frame on a "half
1470 // closed (remote)" or "closed" stream. A receiver MUST
1471 // NOT treat this as an error, see Section 5.1."
1472 return nil
1473 }
1474 if !st.flow.add(int32(f.Increment)) {
1475 return streamError(f.StreamID, ErrCodeFlowControl)
1476 }
1477 default: // connection-level flow control
1478 if !sc.flow.add(int32(f.Increment)) {
1479 return goAwayFlowError{}
1480 }
1481 }
1482 sc.scheduleFrameWrite()
1483 return nil
1484}
1485
1486func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
1487 sc.serveG.check()
1488
1489 state, st := sc.state(f.StreamID)
1490 if state == stateIdle {
1491 // 6.4 "RST_STREAM frames MUST NOT be sent for a
1492 // stream in the "idle" state. If a RST_STREAM frame
1493 // identifying an idle stream is received, the
1494 // recipient MUST treat this as a connection error
1495 // (Section 5.4.1) of type PROTOCOL_ERROR.
1496 return ConnectionError(ErrCodeProtocol)
1497 }
1498 if st != nil {
1499 st.cancelCtx()
1500 sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
1501 }
1502 return nil
1503}
1504
1505func (sc *serverConn) closeStream(st *stream, err error) {
1506 sc.serveG.check()
1507 if st.state == stateIdle || st.state == stateClosed {
1508 panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
1509 }
1510 st.state = stateClosed
1511 if st.writeDeadline != nil {
1512 st.writeDeadline.Stop()
1513 }
1514 if st.isPushed() {
1515 sc.curPushedStreams--
1516 } else {
1517 sc.curClientStreams--
1518 }
1519 delete(sc.streams, st.id)
1520 if len(sc.streams) == 0 {
1521 sc.setConnState(http.StateIdle)
1522 if sc.srv.IdleTimeout != 0 {
1523 sc.idleTimer.Reset(sc.srv.IdleTimeout)
1524 }
1525 if h1ServerKeepAlivesDisabled(sc.hs) {
1526 sc.startGracefulShutdownInternal()
1527 }
1528 }
1529 if p := st.body; p != nil {
1530 // Return any buffered unread bytes worth of conn-level flow control.
1531 // See golang.org/issue/16481
1532 sc.sendWindowUpdate(nil, p.Len())
1533
1534 p.CloseWithError(err)
1535 }
1536 st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
1537 sc.writeSched.CloseStream(st.id)
1538}
1539
1540func (sc *serverConn) processSettings(f *SettingsFrame) error {
1541 sc.serveG.check()
1542 if f.IsAck() {
1543 sc.unackedSettings--
1544 if sc.unackedSettings < 0 {
1545 // Why is the peer ACKing settings we never sent?
1546 // The spec doesn't mention this case, but
1547 // hang up on them anyway.
1548 return ConnectionError(ErrCodeProtocol)
1549 }
1550 return nil
1551 }
1552 if f.NumSettings() > 100 || f.HasDuplicates() {
1553 // This isn't actually in the spec, but hang up on
1554 // suspiciously large settings frames or those with
1555 // duplicate entries.
1556 return ConnectionError(ErrCodeProtocol)
1557 }
1558 if err := f.ForeachSetting(sc.processSetting); err != nil {
1559 return err
1560 }
Arjun E K57a7fcb2020-01-30 06:44:45 +00001561 // TODO: judging by RFC 7540, Section 6.5.3 each SETTINGS frame should be
1562 // acknowledged individually, even if multiple are received before the ACK.
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07001563 sc.needToSendSettingsAck = true
1564 sc.scheduleFrameWrite()
1565 return nil
1566}
1567
1568func (sc *serverConn) processSetting(s Setting) error {
1569 sc.serveG.check()
1570 if err := s.Valid(); err != nil {
1571 return err
1572 }
1573 if VerboseLogs {
1574 sc.vlogf("http2: server processing setting %v", s)
1575 }
1576 switch s.ID {
1577 case SettingHeaderTableSize:
1578 sc.headerTableSize = s.Val
1579 sc.hpackEncoder.SetMaxDynamicTableSize(s.Val)
1580 case SettingEnablePush:
1581 sc.pushEnabled = s.Val != 0
1582 case SettingMaxConcurrentStreams:
1583 sc.clientMaxStreams = s.Val
1584 case SettingInitialWindowSize:
1585 return sc.processSettingInitialWindowSize(s.Val)
1586 case SettingMaxFrameSize:
1587 sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
1588 case SettingMaxHeaderListSize:
1589 sc.peerMaxHeaderListSize = s.Val
1590 default:
1591 // Unknown setting: "An endpoint that receives a SETTINGS
1592 // frame with any unknown or unsupported identifier MUST
1593 // ignore that setting."
1594 if VerboseLogs {
1595 sc.vlogf("http2: server ignoring unknown setting %v", s)
1596 }
1597 }
1598 return nil
1599}
1600
1601func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
1602 sc.serveG.check()
1603 // Note: val already validated to be within range by
1604 // processSetting's Valid call.
1605
1606 // "A SETTINGS frame can alter the initial flow control window
1607 // size for all current streams. When the value of
1608 // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
1609 // adjust the size of all stream flow control windows that it
1610 // maintains by the difference between the new value and the
1611 // old value."
1612 old := sc.initialStreamSendWindowSize
1613 sc.initialStreamSendWindowSize = int32(val)
1614 growth := int32(val) - old // may be negative
1615 for _, st := range sc.streams {
1616 if !st.flow.add(growth) {
1617 // 6.9.2 Initial Flow Control Window Size
1618 // "An endpoint MUST treat a change to
1619 // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
1620 // control window to exceed the maximum size as a
1621 // connection error (Section 5.4.1) of type
1622 // FLOW_CONTROL_ERROR."
1623 return ConnectionError(ErrCodeFlowControl)
1624 }
1625 }
1626 return nil
1627}
1628
1629func (sc *serverConn) processData(f *DataFrame) error {
1630 sc.serveG.check()
1631 if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
1632 return nil
1633 }
1634 data := f.Data()
1635
1636 // "If a DATA frame is received whose stream is not in "open"
1637 // or "half closed (local)" state, the recipient MUST respond
1638 // with a stream error (Section 5.4.2) of type STREAM_CLOSED."
1639 id := f.Header().StreamID
1640 state, st := sc.state(id)
1641 if id == 0 || state == stateIdle {
1642 // Section 5.1: "Receiving any frame other than HEADERS
1643 // or PRIORITY on a stream in this state MUST be
1644 // treated as a connection error (Section 5.4.1) of
1645 // type PROTOCOL_ERROR."
1646 return ConnectionError(ErrCodeProtocol)
1647 }
1648 if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
1649 // This includes sending a RST_STREAM if the stream is
1650 // in stateHalfClosedLocal (which currently means that
1651 // the http.Handler returned, so it's done reading &
1652 // done writing). Try to stop the client from sending
1653 // more DATA.
1654
1655 // But still enforce their connection-level flow control,
1656 // and return any flow control bytes since we're not going
1657 // to consume them.
1658 if sc.inflow.available() < int32(f.Length) {
1659 return streamError(id, ErrCodeFlowControl)
1660 }
1661 // Deduct the flow control from inflow, since we're
1662 // going to immediately add it back in
1663 // sendWindowUpdate, which also schedules sending the
1664 // frames.
1665 sc.inflow.take(int32(f.Length))
1666 sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
1667
1668 if st != nil && st.resetQueued {
1669 // Already have a stream error in flight. Don't send another.
1670 return nil
1671 }
1672 return streamError(id, ErrCodeStreamClosed)
1673 }
1674 if st.body == nil {
1675 panic("internal error: should have a body in this state")
1676 }
1677
1678 // Sender sending more than they'd declared?
1679 if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
1680 st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
1681 // RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
1682 // value of a content-length header field does not equal the sum of the
1683 // DATA frame payload lengths that form the body.
1684 return streamError(id, ErrCodeProtocol)
1685 }
1686 if f.Length > 0 {
1687 // Check whether the client has flow control quota.
1688 if st.inflow.available() < int32(f.Length) {
1689 return streamError(id, ErrCodeFlowControl)
1690 }
1691 st.inflow.take(int32(f.Length))
1692
1693 if len(data) > 0 {
1694 wrote, err := st.body.Write(data)
1695 if err != nil {
1696 return streamError(id, ErrCodeStreamClosed)
1697 }
1698 if wrote != len(data) {
1699 panic("internal error: bad Writer")
1700 }
1701 st.bodyBytes += int64(len(data))
1702 }
1703
1704 // Return any padded flow control now, since we won't
1705 // refund it later on body reads.
1706 if pad := int32(f.Length) - int32(len(data)); pad > 0 {
1707 sc.sendWindowUpdate32(nil, pad)
1708 sc.sendWindowUpdate32(st, pad)
1709 }
1710 }
1711 if f.StreamEnded() {
1712 st.endStream()
1713 }
1714 return nil
1715}
1716
1717func (sc *serverConn) processGoAway(f *GoAwayFrame) error {
1718 sc.serveG.check()
1719 if f.ErrCode != ErrCodeNo {
1720 sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1721 } else {
1722 sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
1723 }
1724 sc.startGracefulShutdownInternal()
1725 // http://tools.ietf.org/html/rfc7540#section-6.8
1726 // We should not create any new streams, which means we should disable push.
1727 sc.pushEnabled = false
1728 return nil
1729}
1730
1731// isPushed reports whether the stream is server-initiated.
1732func (st *stream) isPushed() bool {
1733 return st.id%2 == 0
1734}
1735
1736// endStream closes a Request.Body's pipe. It is called when a DATA
1737// frame says a request body is over (or after trailers).
1738func (st *stream) endStream() {
1739 sc := st.sc
1740 sc.serveG.check()
1741
1742 if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
1743 st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
1744 st.declBodyBytes, st.bodyBytes))
1745 } else {
1746 st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest)
1747 st.body.CloseWithError(io.EOF)
1748 }
1749 st.state = stateHalfClosedRemote
1750}
1751
1752// copyTrailersToHandlerRequest is run in the Handler's goroutine in
1753// its Request.Body.Read just before it gets io.EOF.
1754func (st *stream) copyTrailersToHandlerRequest() {
1755 for k, vv := range st.trailer {
1756 if _, ok := st.reqTrailer[k]; ok {
1757 // Only copy it over it was pre-declared.
1758 st.reqTrailer[k] = vv
1759 }
1760 }
1761}
1762
1763// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
1764// when the stream's WriteTimeout has fired.
1765func (st *stream) onWriteTimeout() {
1766 st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)})
1767}
1768
1769func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
1770 sc.serveG.check()
1771 id := f.StreamID
1772 if sc.inGoAway {
1773 // Ignore.
1774 return nil
1775 }
1776 // http://tools.ietf.org/html/rfc7540#section-5.1.1
1777 // Streams initiated by a client MUST use odd-numbered stream
1778 // identifiers. [...] An endpoint that receives an unexpected
1779 // stream identifier MUST respond with a connection error
1780 // (Section 5.4.1) of type PROTOCOL_ERROR.
1781 if id%2 != 1 {
1782 return ConnectionError(ErrCodeProtocol)
1783 }
1784 // A HEADERS frame can be used to create a new stream or
1785 // send a trailer for an open one. If we already have a stream
1786 // open, let it process its own HEADERS frame (trailers at this
1787 // point, if it's valid).
1788 if st := sc.streams[f.StreamID]; st != nil {
1789 if st.resetQueued {
1790 // We're sending RST_STREAM to close the stream, so don't bother
1791 // processing this frame.
1792 return nil
1793 }
1794 // RFC 7540, sec 5.1: If an endpoint receives additional frames, other than
1795 // WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in
1796 // this state, it MUST respond with a stream error (Section 5.4.2) of
1797 // type STREAM_CLOSED.
1798 if st.state == stateHalfClosedRemote {
1799 return streamError(id, ErrCodeStreamClosed)
1800 }
1801 return st.processTrailerHeaders(f)
1802 }
1803
1804 // [...] The identifier of a newly established stream MUST be
1805 // numerically greater than all streams that the initiating
1806 // endpoint has opened or reserved. [...] An endpoint that
1807 // receives an unexpected stream identifier MUST respond with
1808 // a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
1809 if id <= sc.maxClientStreamID {
1810 return ConnectionError(ErrCodeProtocol)
1811 }
1812 sc.maxClientStreamID = id
1813
1814 if sc.idleTimer != nil {
1815 sc.idleTimer.Stop()
1816 }
1817
1818 // http://tools.ietf.org/html/rfc7540#section-5.1.2
1819 // [...] Endpoints MUST NOT exceed the limit set by their peer. An
1820 // endpoint that receives a HEADERS frame that causes their
1821 // advertised concurrent stream limit to be exceeded MUST treat
1822 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
1823 // or REFUSED_STREAM.
1824 if sc.curClientStreams+1 > sc.advMaxStreams {
1825 if sc.unackedSettings == 0 {
1826 // They should know better.
1827 return streamError(id, ErrCodeProtocol)
1828 }
1829 // Assume it's a network race, where they just haven't
1830 // received our last SETTINGS update. But actually
1831 // this can't happen yet, because we don't yet provide
1832 // a way for users to adjust server parameters at
1833 // runtime.
1834 return streamError(id, ErrCodeRefusedStream)
1835 }
1836
1837 initialState := stateOpen
1838 if f.StreamEnded() {
1839 initialState = stateHalfClosedRemote
1840 }
1841 st := sc.newStream(id, 0, initialState)
1842
1843 if f.HasPriority() {
1844 if err := checkPriority(f.StreamID, f.Priority); err != nil {
1845 return err
1846 }
1847 sc.writeSched.AdjustStream(st.id, f.Priority)
1848 }
1849
1850 rw, req, err := sc.newWriterAndRequest(st, f)
1851 if err != nil {
1852 return err
1853 }
1854 st.reqTrailer = req.Trailer
1855 if st.reqTrailer != nil {
1856 st.trailer = make(http.Header)
1857 }
1858 st.body = req.Body.(*requestBody).pipe // may be nil
1859 st.declBodyBytes = req.ContentLength
1860
1861 handler := sc.handler.ServeHTTP
1862 if f.Truncated {
1863 // Their header list was too long. Send a 431 error.
1864 handler = handleHeaderListTooLong
1865 } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
1866 handler = new400Handler(err)
1867 }
1868
1869 // The net/http package sets the read deadline from the
1870 // http.Server.ReadTimeout during the TLS handshake, but then
1871 // passes the connection off to us with the deadline already
1872 // set. Disarm it here after the request headers are read,
1873 // similar to how the http1 server works. Here it's
1874 // technically more like the http1 Server's ReadHeaderTimeout
1875 // (in Go 1.8), though. That's a more sane option anyway.
1876 if sc.hs.ReadTimeout != 0 {
1877 sc.conn.SetReadDeadline(time.Time{})
1878 }
1879
1880 go sc.runHandler(rw, req, handler)
1881 return nil
1882}
1883
1884func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
1885 sc := st.sc
1886 sc.serveG.check()
1887 if st.gotTrailerHeader {
1888 return ConnectionError(ErrCodeProtocol)
1889 }
1890 st.gotTrailerHeader = true
1891 if !f.StreamEnded() {
1892 return streamError(st.id, ErrCodeProtocol)
1893 }
1894
1895 if len(f.PseudoFields()) > 0 {
1896 return streamError(st.id, ErrCodeProtocol)
1897 }
1898 if st.trailer != nil {
1899 for _, hf := range f.RegularFields() {
1900 key := sc.canonicalHeader(hf.Name)
1901 if !httpguts.ValidTrailerHeader(key) {
1902 // TODO: send more details to the peer somehow. But http2 has
1903 // no way to send debug data at a stream level. Discuss with
1904 // HTTP folk.
1905 return streamError(st.id, ErrCodeProtocol)
1906 }
1907 st.trailer[key] = append(st.trailer[key], hf.Value)
1908 }
1909 }
1910 st.endStream()
1911 return nil
1912}
1913
1914func checkPriority(streamID uint32, p PriorityParam) error {
1915 if streamID == p.StreamDep {
1916 // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
1917 // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
1918 // Section 5.3.3 says that a stream can depend on one of its dependencies,
1919 // so it's only self-dependencies that are forbidden.
1920 return streamError(streamID, ErrCodeProtocol)
1921 }
1922 return nil
1923}
1924
1925func (sc *serverConn) processPriority(f *PriorityFrame) error {
1926 if sc.inGoAway {
1927 return nil
1928 }
1929 if err := checkPriority(f.StreamID, f.PriorityParam); err != nil {
1930 return err
1931 }
1932 sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
1933 return nil
1934}
1935
1936func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
1937 sc.serveG.check()
1938 if id == 0 {
1939 panic("internal error: cannot create stream with id 0")
1940 }
1941
1942 ctx, cancelCtx := context.WithCancel(sc.baseCtx)
1943 st := &stream{
1944 sc: sc,
1945 id: id,
1946 state: state,
1947 ctx: ctx,
1948 cancelCtx: cancelCtx,
1949 }
1950 st.cw.Init()
1951 st.flow.conn = &sc.flow // link to conn-level counter
1952 st.flow.add(sc.initialStreamSendWindowSize)
1953 st.inflow.conn = &sc.inflow // link to conn-level counter
1954 st.inflow.add(sc.srv.initialStreamRecvWindowSize())
1955 if sc.hs.WriteTimeout != 0 {
1956 st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
1957 }
1958
1959 sc.streams[id] = st
1960 sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
1961 if st.isPushed() {
1962 sc.curPushedStreams++
1963 } else {
1964 sc.curClientStreams++
1965 }
1966 if sc.curOpenStreams() == 1 {
1967 sc.setConnState(http.StateActive)
1968 }
1969
1970 return st
1971}
1972
1973func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
1974 sc.serveG.check()
1975
1976 rp := requestParam{
1977 method: f.PseudoValue("method"),
1978 scheme: f.PseudoValue("scheme"),
1979 authority: f.PseudoValue("authority"),
1980 path: f.PseudoValue("path"),
1981 }
1982
1983 isConnect := rp.method == "CONNECT"
1984 if isConnect {
1985 if rp.path != "" || rp.scheme != "" || rp.authority == "" {
1986 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
1987 }
1988 } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
1989 // See 8.1.2.6 Malformed Requests and Responses:
1990 //
1991 // Malformed requests or responses that are detected
1992 // MUST be treated as a stream error (Section 5.4.2)
1993 // of type PROTOCOL_ERROR."
1994 //
1995 // 8.1.2.3 Request Pseudo-Header Fields
1996 // "All HTTP/2 requests MUST include exactly one valid
1997 // value for the :method, :scheme, and :path
1998 // pseudo-header fields"
1999 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
2000 }
2001
2002 bodyOpen := !f.StreamEnded()
2003 if rp.method == "HEAD" && bodyOpen {
2004 // HEAD requests can't have bodies
2005 return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
2006 }
2007
2008 rp.header = make(http.Header)
2009 for _, hf := range f.RegularFields() {
2010 rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value)
2011 }
2012 if rp.authority == "" {
2013 rp.authority = rp.header.Get("Host")
2014 }
2015
2016 rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
2017 if err != nil {
2018 return nil, nil, err
2019 }
2020 if bodyOpen {
2021 if vv, ok := rp.header["Content-Length"]; ok {
2022 req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
2023 } else {
2024 req.ContentLength = -1
2025 }
2026 req.Body.(*requestBody).pipe = &pipe{
2027 b: &dataBuffer{expected: req.ContentLength},
2028 }
2029 }
2030 return rw, req, nil
2031}
2032
2033type requestParam struct {
2034 method string
2035 scheme, authority, path string
2036 header http.Header
2037}
2038
2039func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) {
2040 sc.serveG.check()
2041
2042 var tlsState *tls.ConnectionState // nil if not scheme https
2043 if rp.scheme == "https" {
2044 tlsState = sc.tlsState
2045 }
2046
2047 needsContinue := rp.header.Get("Expect") == "100-continue"
2048 if needsContinue {
2049 rp.header.Del("Expect")
2050 }
2051 // Merge Cookie headers into one "; "-delimited value.
2052 if cookies := rp.header["Cookie"]; len(cookies) > 1 {
2053 rp.header.Set("Cookie", strings.Join(cookies, "; "))
2054 }
2055
2056 // Setup Trailers
2057 var trailer http.Header
2058 for _, v := range rp.header["Trailer"] {
2059 for _, key := range strings.Split(v, ",") {
2060 key = http.CanonicalHeaderKey(strings.TrimSpace(key))
2061 switch key {
2062 case "Transfer-Encoding", "Trailer", "Content-Length":
2063 // Bogus. (copy of http1 rules)
2064 // Ignore.
2065 default:
2066 if trailer == nil {
2067 trailer = make(http.Header)
2068 }
2069 trailer[key] = nil
2070 }
2071 }
2072 }
2073 delete(rp.header, "Trailer")
2074
2075 var url_ *url.URL
2076 var requestURI string
2077 if rp.method == "CONNECT" {
2078 url_ = &url.URL{Host: rp.authority}
2079 requestURI = rp.authority // mimic HTTP/1 server behavior
2080 } else {
2081 var err error
2082 url_, err = url.ParseRequestURI(rp.path)
2083 if err != nil {
2084 return nil, nil, streamError(st.id, ErrCodeProtocol)
2085 }
2086 requestURI = rp.path
2087 }
2088
2089 body := &requestBody{
2090 conn: sc,
2091 stream: st,
2092 needsContinue: needsContinue,
2093 }
2094 req := &http.Request{
2095 Method: rp.method,
2096 URL: url_,
2097 RemoteAddr: sc.remoteAddrStr,
2098 Header: rp.header,
2099 RequestURI: requestURI,
2100 Proto: "HTTP/2.0",
2101 ProtoMajor: 2,
2102 ProtoMinor: 0,
2103 TLS: tlsState,
2104 Host: rp.authority,
2105 Body: body,
2106 Trailer: trailer,
2107 }
2108 req = req.WithContext(st.ctx)
2109
2110 rws := responseWriterStatePool.Get().(*responseWriterState)
2111 bwSave := rws.bw
2112 *rws = responseWriterState{} // zero all the fields
2113 rws.conn = sc
2114 rws.bw = bwSave
2115 rws.bw.Reset(chunkWriter{rws})
2116 rws.stream = st
2117 rws.req = req
2118 rws.body = body
2119
2120 rw := &responseWriter{rws: rws}
2121 return rw, req, nil
2122}
2123
2124// Run on its own goroutine.
2125func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
2126 didPanic := true
2127 defer func() {
2128 rw.rws.stream.cancelCtx()
2129 if didPanic {
2130 e := recover()
2131 sc.writeFrameFromHandler(FrameWriteRequest{
2132 write: handlerPanicRST{rw.rws.stream.id},
2133 stream: rw.rws.stream,
2134 })
2135 // Same as net/http:
2136 if e != nil && e != http.ErrAbortHandler {
2137 const size = 64 << 10
2138 buf := make([]byte, size)
2139 buf = buf[:runtime.Stack(buf, false)]
2140 sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf)
2141 }
2142 return
2143 }
2144 rw.handlerDone()
2145 }()
2146 handler(rw, req)
2147 didPanic = false
2148}
2149
2150func handleHeaderListTooLong(w http.ResponseWriter, r *http.Request) {
2151 // 10.5.1 Limits on Header Block Size:
2152 // .. "A server that receives a larger header block than it is
2153 // willing to handle can send an HTTP 431 (Request Header Fields Too
2154 // Large) status code"
2155 const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+
2156 w.WriteHeader(statusRequestHeaderFieldsTooLarge)
2157 io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>")
2158}
2159
2160// called from handler goroutines.
2161// h may be nil.
2162func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error {
2163 sc.serveG.checkNotOn() // NOT on
2164 var errc chan error
2165 if headerData.h != nil {
2166 // If there's a header map (which we don't own), so we have to block on
2167 // waiting for this frame to be written, so an http.Flush mid-handler
2168 // writes out the correct value of keys, before a handler later potentially
2169 // mutates it.
2170 errc = errChanPool.Get().(chan error)
2171 }
2172 if err := sc.writeFrameFromHandler(FrameWriteRequest{
2173 write: headerData,
2174 stream: st,
2175 done: errc,
2176 }); err != nil {
2177 return err
2178 }
2179 if errc != nil {
2180 select {
2181 case err := <-errc:
2182 errChanPool.Put(errc)
2183 return err
2184 case <-sc.doneServing:
2185 return errClientDisconnected
2186 case <-st.cw:
2187 return errStreamClosed
2188 }
2189 }
2190 return nil
2191}
2192
2193// called from handler goroutines.
2194func (sc *serverConn) write100ContinueHeaders(st *stream) {
2195 sc.writeFrameFromHandler(FrameWriteRequest{
2196 write: write100ContinueHeadersFrame{st.id},
2197 stream: st,
2198 })
2199}
2200
2201// A bodyReadMsg tells the server loop that the http.Handler read n
2202// bytes of the DATA from the client on the given stream.
2203type bodyReadMsg struct {
2204 st *stream
2205 n int
2206}
2207
2208// called from handler goroutines.
2209// Notes that the handler for the given stream ID read n bytes of its body
2210// and schedules flow control tokens to be sent.
2211func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
2212 sc.serveG.checkNotOn() // NOT on
2213 if n > 0 {
2214 select {
2215 case sc.bodyReadCh <- bodyReadMsg{st, n}:
2216 case <-sc.doneServing:
2217 }
2218 }
2219}
2220
2221func (sc *serverConn) noteBodyRead(st *stream, n int) {
2222 sc.serveG.check()
2223 sc.sendWindowUpdate(nil, n) // conn-level
2224 if st.state != stateHalfClosedRemote && st.state != stateClosed {
2225 // Don't send this WINDOW_UPDATE if the stream is closed
2226 // remotely.
2227 sc.sendWindowUpdate(st, n)
2228 }
2229}
2230
2231// st may be nil for conn-level
2232func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
2233 sc.serveG.check()
2234 // "The legal range for the increment to the flow control
2235 // window is 1 to 2^31-1 (2,147,483,647) octets."
2236 // A Go Read call on 64-bit machines could in theory read
2237 // a larger Read than this. Very unlikely, but we handle it here
2238 // rather than elsewhere for now.
2239 const maxUint31 = 1<<31 - 1
2240 for n >= maxUint31 {
2241 sc.sendWindowUpdate32(st, maxUint31)
2242 n -= maxUint31
2243 }
2244 sc.sendWindowUpdate32(st, int32(n))
2245}
2246
2247// st may be nil for conn-level
2248func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
2249 sc.serveG.check()
2250 if n == 0 {
2251 return
2252 }
2253 if n < 0 {
2254 panic("negative update")
2255 }
2256 var streamID uint32
2257 if st != nil {
2258 streamID = st.id
2259 }
2260 sc.writeFrame(FrameWriteRequest{
2261 write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
2262 stream: st,
2263 })
2264 var ok bool
2265 if st == nil {
2266 ok = sc.inflow.add(n)
2267 } else {
2268 ok = st.inflow.add(n)
2269 }
2270 if !ok {
2271 panic("internal error; sent too many window updates without decrements?")
2272 }
2273}
2274
2275// requestBody is the Handler's Request.Body type.
2276// Read and Close may be called concurrently.
2277type requestBody struct {
2278 stream *stream
2279 conn *serverConn
2280 closed bool // for use by Close only
2281 sawEOF bool // for use by Read only
2282 pipe *pipe // non-nil if we have a HTTP entity message body
2283 needsContinue bool // need to send a 100-continue
2284}
2285
2286func (b *requestBody) Close() error {
2287 if b.pipe != nil && !b.closed {
2288 b.pipe.BreakWithError(errClosedBody)
2289 }
2290 b.closed = true
2291 return nil
2292}
2293
2294func (b *requestBody) Read(p []byte) (n int, err error) {
2295 if b.needsContinue {
2296 b.needsContinue = false
2297 b.conn.write100ContinueHeaders(b.stream)
2298 }
2299 if b.pipe == nil || b.sawEOF {
2300 return 0, io.EOF
2301 }
2302 n, err = b.pipe.Read(p)
2303 if err == io.EOF {
2304 b.sawEOF = true
2305 }
2306 if b.conn == nil && inTests {
2307 return
2308 }
2309 b.conn.noteBodyReadFromHandler(b.stream, n, err)
2310 return
2311}
2312
2313// responseWriter is the http.ResponseWriter implementation. It's
2314// intentionally small (1 pointer wide) to minimize garbage. The
2315// responseWriterState pointer inside is zeroed at the end of a
2316// request (in handlerDone) and calls on the responseWriter thereafter
2317// simply crash (caller's mistake), but the much larger responseWriterState
2318// and buffers are reused between multiple requests.
2319type responseWriter struct {
2320 rws *responseWriterState
2321}
2322
2323// Optional http.ResponseWriter interfaces implemented.
2324var (
2325 _ http.CloseNotifier = (*responseWriter)(nil)
2326 _ http.Flusher = (*responseWriter)(nil)
2327 _ stringWriter = (*responseWriter)(nil)
2328)
2329
2330type responseWriterState struct {
2331 // immutable within a request:
2332 stream *stream
2333 req *http.Request
2334 body *requestBody // to close at end of request, if DATA frames didn't
2335 conn *serverConn
2336
2337 // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
2338 bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState}
2339
2340 // mutated by http.Handler goroutine:
2341 handlerHeader http.Header // nil until called
2342 snapHeader http.Header // snapshot of handlerHeader at WriteHeader time
2343 trailers []string // set in writeChunk
2344 status int // status code passed to WriteHeader
2345 wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
2346 sentHeader bool // have we sent the header frame?
2347 handlerDone bool // handler has finished
2348 dirty bool // a Write failed; don't reuse this responseWriterState
2349
2350 sentContentLen int64 // non-zero if handler set a Content-Length header
2351 wroteBytes int64
2352
2353 closeNotifierMu sync.Mutex // guards closeNotifierCh
2354 closeNotifierCh chan bool // nil until first used
2355}
2356
2357type chunkWriter struct{ rws *responseWriterState }
2358
2359func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) }
2360
2361func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 }
2362
2363func (rws *responseWriterState) hasNonemptyTrailers() bool {
2364 for _, trailer := range rws.trailers {
2365 if _, ok := rws.handlerHeader[trailer]; ok {
2366 return true
2367 }
2368 }
2369 return false
2370}
2371
2372// declareTrailer is called for each Trailer header when the
2373// response header is written. It notes that a header will need to be
2374// written in the trailers at the end of the response.
2375func (rws *responseWriterState) declareTrailer(k string) {
2376 k = http.CanonicalHeaderKey(k)
2377 if !httpguts.ValidTrailerHeader(k) {
2378 // Forbidden by RFC 7230, section 4.1.2.
2379 rws.conn.logf("ignoring invalid trailer %q", k)
2380 return
2381 }
2382 if !strSliceContains(rws.trailers, k) {
2383 rws.trailers = append(rws.trailers, k)
2384 }
2385}
2386
2387// writeChunk writes chunks from the bufio.Writer. But because
2388// bufio.Writer may bypass its chunking, sometimes p may be
2389// arbitrarily large.
2390//
2391// writeChunk is also responsible (on the first chunk) for sending the
2392// HEADER response.
2393func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
2394 if !rws.wroteHeader {
2395 rws.writeHeader(200)
2396 }
2397
2398 isHeadResp := rws.req.Method == "HEAD"
2399 if !rws.sentHeader {
2400 rws.sentHeader = true
2401 var ctype, clen string
2402 if clen = rws.snapHeader.Get("Content-Length"); clen != "" {
2403 rws.snapHeader.Del("Content-Length")
2404 clen64, err := strconv.ParseInt(clen, 10, 64)
2405 if err == nil && clen64 >= 0 {
2406 rws.sentContentLen = clen64
2407 } else {
2408 clen = ""
2409 }
2410 }
2411 if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) {
2412 clen = strconv.Itoa(len(p))
2413 }
2414 _, hasContentType := rws.snapHeader["Content-Type"]
Pragya Arya324337e2020-02-20 14:35:08 +05302415 // If the Content-Encoding is non-blank, we shouldn't
2416 // sniff the body. See Issue golang.org/issue/31753.
2417 ce := rws.snapHeader.Get("Content-Encoding")
2418 hasCE := len(ce) > 0
2419 if !hasCE && !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07002420 ctype = http.DetectContentType(p)
2421 }
2422 var date string
2423 if _, ok := rws.snapHeader["Date"]; !ok {
2424 // TODO(bradfitz): be faster here, like net/http? measure.
2425 date = time.Now().UTC().Format(http.TimeFormat)
2426 }
2427
2428 for _, v := range rws.snapHeader["Trailer"] {
2429 foreachHeaderElement(v, rws.declareTrailer)
2430 }
2431
2432 // "Connection" headers aren't allowed in HTTP/2 (RFC 7540, 8.1.2.2),
2433 // but respect "Connection" == "close" to mean sending a GOAWAY and tearing
2434 // down the TCP connection when idle, like we do for HTTP/1.
2435 // TODO: remove more Connection-specific header fields here, in addition
2436 // to "Connection".
2437 if _, ok := rws.snapHeader["Connection"]; ok {
2438 v := rws.snapHeader.Get("Connection")
2439 delete(rws.snapHeader, "Connection")
2440 if v == "close" {
2441 rws.conn.startGracefulShutdown()
2442 }
2443 }
2444
2445 endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp
2446 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2447 streamID: rws.stream.id,
2448 httpResCode: rws.status,
2449 h: rws.snapHeader,
2450 endStream: endStream,
2451 contentType: ctype,
2452 contentLength: clen,
2453 date: date,
2454 })
2455 if err != nil {
2456 rws.dirty = true
2457 return 0, err
2458 }
2459 if endStream {
2460 return 0, nil
2461 }
2462 }
2463 if isHeadResp {
2464 return len(p), nil
2465 }
2466 if len(p) == 0 && !rws.handlerDone {
2467 return 0, nil
2468 }
2469
2470 if rws.handlerDone {
2471 rws.promoteUndeclaredTrailers()
2472 }
2473
2474 // only send trailers if they have actually been defined by the
2475 // server handler.
2476 hasNonemptyTrailers := rws.hasNonemptyTrailers()
2477 endStream := rws.handlerDone && !hasNonemptyTrailers
2478 if len(p) > 0 || endStream {
2479 // only send a 0 byte DATA frame if we're ending the stream.
2480 if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
2481 rws.dirty = true
2482 return 0, err
2483 }
2484 }
2485
2486 if rws.handlerDone && hasNonemptyTrailers {
2487 err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{
2488 streamID: rws.stream.id,
2489 h: rws.handlerHeader,
2490 trailers: rws.trailers,
2491 endStream: true,
2492 })
2493 if err != nil {
2494 rws.dirty = true
2495 }
2496 return len(p), err
2497 }
2498 return len(p), nil
2499}
2500
2501// TrailerPrefix is a magic prefix for ResponseWriter.Header map keys
2502// that, if present, signals that the map entry is actually for
2503// the response trailers, and not the response headers. The prefix
2504// is stripped after the ServeHTTP call finishes and the values are
2505// sent in the trailers.
2506//
2507// This mechanism is intended only for trailers that are not known
2508// prior to the headers being written. If the set of trailers is fixed
2509// or known before the header is written, the normal Go trailers mechanism
2510// is preferred:
2511// https://golang.org/pkg/net/http/#ResponseWriter
2512// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
2513const TrailerPrefix = "Trailer:"
2514
2515// promoteUndeclaredTrailers permits http.Handlers to set trailers
2516// after the header has already been flushed. Because the Go
2517// ResponseWriter interface has no way to set Trailers (only the
2518// Header), and because we didn't want to expand the ResponseWriter
2519// interface, and because nobody used trailers, and because RFC 7230
2520// says you SHOULD (but not must) predeclare any trailers in the
2521// header, the official ResponseWriter rules said trailers in Go must
2522// be predeclared, and then we reuse the same ResponseWriter.Header()
2523// map to mean both Headers and Trailers. When it's time to write the
2524// Trailers, we pick out the fields of Headers that were declared as
2525// trailers. That worked for a while, until we found the first major
2526// user of Trailers in the wild: gRPC (using them only over http2),
2527// and gRPC libraries permit setting trailers mid-stream without
Arjun E K57a7fcb2020-01-30 06:44:45 +00002528// predeclaring them. So: change of plans. We still permit the old
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07002529// way, but we also permit this hack: if a Header() key begins with
2530// "Trailer:", the suffix of that key is a Trailer. Because ':' is an
2531// invalid token byte anyway, there is no ambiguity. (And it's already
2532// filtered out) It's mildly hacky, but not terrible.
2533//
2534// This method runs after the Handler is done and promotes any Header
2535// fields to be trailers.
2536func (rws *responseWriterState) promoteUndeclaredTrailers() {
2537 for k, vv := range rws.handlerHeader {
2538 if !strings.HasPrefix(k, TrailerPrefix) {
2539 continue
2540 }
2541 trailerKey := strings.TrimPrefix(k, TrailerPrefix)
2542 rws.declareTrailer(trailerKey)
2543 rws.handlerHeader[http.CanonicalHeaderKey(trailerKey)] = vv
2544 }
2545
2546 if len(rws.trailers) > 1 {
2547 sorter := sorterPool.Get().(*sorter)
2548 sorter.SortStrings(rws.trailers)
2549 sorterPool.Put(sorter)
2550 }
2551}
2552
2553func (w *responseWriter) Flush() {
2554 rws := w.rws
2555 if rws == nil {
2556 panic("Header called after Handler finished")
2557 }
2558 if rws.bw.Buffered() > 0 {
2559 if err := rws.bw.Flush(); err != nil {
2560 // Ignore the error. The frame writer already knows.
2561 return
2562 }
2563 } else {
2564 // The bufio.Writer won't call chunkWriter.Write
2565 // (writeChunk with zero bytes, so we have to do it
2566 // ourselves to force the HTTP response header and/or
2567 // final DATA frame (with END_STREAM) to be sent.
2568 rws.writeChunk(nil)
2569 }
2570}
2571
2572func (w *responseWriter) CloseNotify() <-chan bool {
2573 rws := w.rws
2574 if rws == nil {
2575 panic("CloseNotify called after Handler finished")
2576 }
2577 rws.closeNotifierMu.Lock()
2578 ch := rws.closeNotifierCh
2579 if ch == nil {
2580 ch = make(chan bool, 1)
2581 rws.closeNotifierCh = ch
2582 cw := rws.stream.cw
2583 go func() {
2584 cw.Wait() // wait for close
2585 ch <- true
2586 }()
2587 }
2588 rws.closeNotifierMu.Unlock()
2589 return ch
2590}
2591
2592func (w *responseWriter) Header() http.Header {
2593 rws := w.rws
2594 if rws == nil {
2595 panic("Header called after Handler finished")
2596 }
2597 if rws.handlerHeader == nil {
2598 rws.handlerHeader = make(http.Header)
2599 }
2600 return rws.handlerHeader
2601}
2602
2603// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode.
2604func checkWriteHeaderCode(code int) {
2605 // Issue 22880: require valid WriteHeader status codes.
2606 // For now we only enforce that it's three digits.
2607 // In the future we might block things over 599 (600 and above aren't defined
2608 // at http://httpwg.org/specs/rfc7231.html#status.codes)
2609 // and we might block under 200 (once we have more mature 1xx support).
2610 // But for now any three digits.
2611 //
2612 // We used to send "HTTP/1.1 000 0" on the wire in responses but there's
2613 // no equivalent bogus thing we can realistically send in HTTP/2,
2614 // so we'll consistently panic instead and help people find their bugs
2615 // early. (We can't return an error from WriteHeader even if we wanted to.)
2616 if code < 100 || code > 999 {
2617 panic(fmt.Sprintf("invalid WriteHeader code %v", code))
2618 }
2619}
2620
2621func (w *responseWriter) WriteHeader(code int) {
2622 rws := w.rws
2623 if rws == nil {
2624 panic("WriteHeader called after Handler finished")
2625 }
2626 rws.writeHeader(code)
2627}
2628
2629func (rws *responseWriterState) writeHeader(code int) {
2630 if !rws.wroteHeader {
2631 checkWriteHeaderCode(code)
2632 rws.wroteHeader = true
2633 rws.status = code
2634 if len(rws.handlerHeader) > 0 {
2635 rws.snapHeader = cloneHeader(rws.handlerHeader)
2636 }
2637 }
2638}
2639
2640func cloneHeader(h http.Header) http.Header {
2641 h2 := make(http.Header, len(h))
2642 for k, vv := range h {
2643 vv2 := make([]string, len(vv))
2644 copy(vv2, vv)
2645 h2[k] = vv2
2646 }
2647 return h2
2648}
2649
2650// The Life Of A Write is like this:
2651//
2652// * Handler calls w.Write or w.WriteString ->
2653// * -> rws.bw (*bufio.Writer) ->
2654// * (Handler might call Flush)
2655// * -> chunkWriter{rws}
2656// * -> responseWriterState.writeChunk(p []byte)
2657// * -> responseWriterState.writeChunk (most of the magic; see comment there)
2658func (w *responseWriter) Write(p []byte) (n int, err error) {
2659 return w.write(len(p), p, "")
2660}
2661
2662func (w *responseWriter) WriteString(s string) (n int, err error) {
2663 return w.write(len(s), nil, s)
2664}
2665
2666// either dataB or dataS is non-zero.
2667func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) {
2668 rws := w.rws
2669 if rws == nil {
2670 panic("Write called after Handler finished")
2671 }
2672 if !rws.wroteHeader {
2673 w.WriteHeader(200)
2674 }
2675 if !bodyAllowedForStatus(rws.status) {
2676 return 0, http.ErrBodyNotAllowed
2677 }
2678 rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set
2679 if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen {
2680 // TODO: send a RST_STREAM
2681 return 0, errors.New("http2: handler wrote more than declared Content-Length")
2682 }
2683
2684 if dataB != nil {
2685 return rws.bw.Write(dataB)
2686 } else {
2687 return rws.bw.WriteString(dataS)
2688 }
2689}
2690
2691func (w *responseWriter) handlerDone() {
2692 rws := w.rws
2693 dirty := rws.dirty
2694 rws.handlerDone = true
2695 w.Flush()
2696 w.rws = nil
2697 if !dirty {
2698 // Only recycle the pool if all prior Write calls to
2699 // the serverConn goroutine completed successfully. If
2700 // they returned earlier due to resets from the peer
2701 // there might still be write goroutines outstanding
2702 // from the serverConn referencing the rws memory. See
2703 // issue 20704.
2704 responseWriterStatePool.Put(rws)
2705 }
2706}
2707
2708// Push errors.
2709var (
2710 ErrRecursivePush = errors.New("http2: recursive push not allowed")
2711 ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
2712)
2713
2714var _ http.Pusher = (*responseWriter)(nil)
2715
2716func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
2717 st := w.rws.stream
2718 sc := st.sc
2719 sc.serveG.checkNotOn()
2720
2721 // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
2722 // http://tools.ietf.org/html/rfc7540#section-6.6
2723 if st.isPushed() {
2724 return ErrRecursivePush
2725 }
2726
2727 if opts == nil {
2728 opts = new(http.PushOptions)
2729 }
2730
2731 // Default options.
2732 if opts.Method == "" {
2733 opts.Method = "GET"
2734 }
2735 if opts.Header == nil {
2736 opts.Header = http.Header{}
2737 }
2738 wantScheme := "http"
2739 if w.rws.req.TLS != nil {
2740 wantScheme = "https"
2741 }
2742
2743 // Validate the request.
2744 u, err := url.Parse(target)
2745 if err != nil {
2746 return err
2747 }
2748 if u.Scheme == "" {
2749 if !strings.HasPrefix(target, "/") {
2750 return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
2751 }
2752 u.Scheme = wantScheme
2753 u.Host = w.rws.req.Host
2754 } else {
2755 if u.Scheme != wantScheme {
2756 return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
2757 }
2758 if u.Host == "" {
2759 return errors.New("URL must have a host")
2760 }
2761 }
2762 for k := range opts.Header {
2763 if strings.HasPrefix(k, ":") {
2764 return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
2765 }
2766 // These headers are meaningful only if the request has a body,
2767 // but PUSH_PROMISE requests cannot have a body.
2768 // http://tools.ietf.org/html/rfc7540#section-8.2
2769 // Also disallow Host, since the promised URL must be absolute.
2770 switch strings.ToLower(k) {
2771 case "content-length", "content-encoding", "trailer", "te", "expect", "host":
2772 return fmt.Errorf("promised request headers cannot include %q", k)
2773 }
2774 }
2775 if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil {
2776 return err
2777 }
2778
2779 // The RFC effectively limits promised requests to GET and HEAD:
2780 // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
2781 // http://tools.ietf.org/html/rfc7540#section-8.2
2782 if opts.Method != "GET" && opts.Method != "HEAD" {
2783 return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
2784 }
2785
2786 msg := &startPushRequest{
2787 parent: st,
2788 method: opts.Method,
2789 url: u,
2790 header: cloneHeader(opts.Header),
2791 done: errChanPool.Get().(chan error),
2792 }
2793
2794 select {
2795 case <-sc.doneServing:
2796 return errClientDisconnected
2797 case <-st.cw:
2798 return errStreamClosed
2799 case sc.serveMsgCh <- msg:
2800 }
2801
2802 select {
2803 case <-sc.doneServing:
2804 return errClientDisconnected
2805 case <-st.cw:
2806 return errStreamClosed
2807 case err := <-msg.done:
2808 errChanPool.Put(msg.done)
2809 return err
2810 }
2811}
2812
2813type startPushRequest struct {
2814 parent *stream
2815 method string
2816 url *url.URL
2817 header http.Header
2818 done chan error
2819}
2820
2821func (sc *serverConn) startPush(msg *startPushRequest) {
2822 sc.serveG.check()
2823
2824 // http://tools.ietf.org/html/rfc7540#section-6.6.
2825 // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
2826 // is in either the "open" or "half-closed (remote)" state.
2827 if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
Arjun E K57a7fcb2020-01-30 06:44:45 +00002828 // responseWriter.Push checks that the stream is peer-initiated.
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07002829 msg.done <- errStreamClosed
2830 return
2831 }
2832
2833 // http://tools.ietf.org/html/rfc7540#section-6.6.
2834 if !sc.pushEnabled {
2835 msg.done <- http.ErrNotSupported
2836 return
2837 }
2838
2839 // PUSH_PROMISE frames must be sent in increasing order by stream ID, so
2840 // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
2841 // is written. Once the ID is allocated, we start the request handler.
2842 allocatePromisedID := func() (uint32, error) {
2843 sc.serveG.check()
2844
2845 // Check this again, just in case. Technically, we might have received
2846 // an updated SETTINGS by the time we got around to writing this frame.
2847 if !sc.pushEnabled {
2848 return 0, http.ErrNotSupported
2849 }
2850 // http://tools.ietf.org/html/rfc7540#section-6.5.2.
2851 if sc.curPushedStreams+1 > sc.clientMaxStreams {
2852 return 0, ErrPushLimitReached
2853 }
2854
2855 // http://tools.ietf.org/html/rfc7540#section-5.1.1.
2856 // Streams initiated by the server MUST use even-numbered identifiers.
2857 // A server that is unable to establish a new stream identifier can send a GOAWAY
2858 // frame so that the client is forced to open a new connection for new streams.
2859 if sc.maxPushPromiseID+2 >= 1<<31 {
2860 sc.startGracefulShutdownInternal()
2861 return 0, ErrPushLimitReached
2862 }
2863 sc.maxPushPromiseID += 2
2864 promisedID := sc.maxPushPromiseID
2865
2866 // http://tools.ietf.org/html/rfc7540#section-8.2.
2867 // Strictly speaking, the new stream should start in "reserved (local)", then
2868 // transition to "half closed (remote)" after sending the initial HEADERS, but
2869 // we start in "half closed (remote)" for simplicity.
2870 // See further comments at the definition of stateHalfClosedRemote.
2871 promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote)
2872 rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{
2873 method: msg.method,
2874 scheme: msg.url.Scheme,
2875 authority: msg.url.Host,
2876 path: msg.url.RequestURI(),
2877 header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
2878 })
2879 if err != nil {
2880 // Should not happen, since we've already validated msg.url.
2881 panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
2882 }
2883
2884 go sc.runHandler(rw, req, sc.handler.ServeHTTP)
2885 return promisedID, nil
2886 }
2887
2888 sc.writeFrame(FrameWriteRequest{
2889 write: &writePushPromise{
2890 streamID: msg.parent.id,
2891 method: msg.method,
2892 url: msg.url,
2893 h: msg.header,
2894 allocatePromisedID: allocatePromisedID,
2895 },
2896 stream: msg.parent,
2897 done: msg.done,
2898 })
2899}
2900
2901// foreachHeaderElement splits v according to the "#rule" construction
2902// in RFC 7230 section 7 and calls fn for each non-empty element.
2903func foreachHeaderElement(v string, fn func(string)) {
2904 v = textproto.TrimString(v)
2905 if v == "" {
2906 return
2907 }
2908 if !strings.Contains(v, ",") {
2909 fn(v)
2910 return
2911 }
2912 for _, f := range strings.Split(v, ",") {
2913 if f = textproto.TrimString(f); f != "" {
2914 fn(f)
2915 }
2916 }
2917}
2918
2919// From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2
2920var connHeaders = []string{
2921 "Connection",
2922 "Keep-Alive",
2923 "Proxy-Connection",
2924 "Transfer-Encoding",
2925 "Upgrade",
2926}
2927
2928// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request,
2929// per RFC 7540 Section 8.1.2.2.
2930// The returned error is reported to users.
2931func checkValidHTTP2RequestHeaders(h http.Header) error {
2932 for _, k := range connHeaders {
2933 if _, ok := h[k]; ok {
2934 return fmt.Errorf("request header %q is not valid in HTTP/2", k)
2935 }
2936 }
2937 te := h["Te"]
2938 if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) {
2939 return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
2940 }
2941 return nil
2942}
2943
2944func new400Handler(err error) http.HandlerFunc {
2945 return func(w http.ResponseWriter, r *http.Request) {
2946 http.Error(w, err.Error(), http.StatusBadRequest)
2947 }
2948}
2949
2950// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
2951// disabled. See comments on h1ServerShutdownChan above for why
2952// the code is written this way.
2953func h1ServerKeepAlivesDisabled(hs *http.Server) bool {
2954 var x interface{} = hs
2955 type I interface {
2956 doKeepAlives() bool
2957 }
2958 if hs, ok := x.(I); ok {
2959 return !hs.doKeepAlives()
2960 }
2961 return false
2962}