blob: 4182f52b4fc3af869fc57c6098596087b96a4202 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Transport code.
6
7package http2
8
9import (
10 "bufio"
11 "bytes"
12 "compress/gzip"
13 "context"
14 "crypto/rand"
15 "crypto/tls"
16 "errors"
17 "fmt"
18 "io"
19 "io/ioutil"
20 "log"
21 "math"
22 mathrand "math/rand"
23 "net"
24 "net/http"
25 "net/http/httptrace"
26 "net/textproto"
27 "sort"
28 "strconv"
29 "strings"
30 "sync"
Scott Baker8461e152019-10-01 14:44:30 -070031 "sync/atomic"
khenaidooac637102019-01-14 15:44:34 -050032 "time"
33
34 "golang.org/x/net/http/httpguts"
35 "golang.org/x/net/http2/hpack"
36 "golang.org/x/net/idna"
37)
38
39const (
40 // transportDefaultConnFlow is how many connection-level flow control
41 // tokens we give the server at start-up, past the default 64k.
42 transportDefaultConnFlow = 1 << 30
43
44 // transportDefaultStreamFlow is how many stream-level flow
45 // control tokens we announce to the peer, and how many bytes
46 // we buffer per stream.
47 transportDefaultStreamFlow = 4 << 20
48
49 // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send
50 // a stream-level WINDOW_UPDATE for at a time.
51 transportDefaultStreamMinRefresh = 4 << 10
52
53 defaultUserAgent = "Go-http-client/2.0"
54)
55
56// Transport is an HTTP/2 Transport.
57//
58// A Transport internally caches connections to servers. It is safe
59// for concurrent use by multiple goroutines.
60type Transport struct {
61 // DialTLS specifies an optional dial function for creating
62 // TLS connections for requests.
63 //
64 // If DialTLS is nil, tls.Dial is used.
65 //
66 // If the returned net.Conn has a ConnectionState method like tls.Conn,
67 // it will be used to set http.Response.TLS.
68 DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
69
70 // TLSClientConfig specifies the TLS configuration to use with
71 // tls.Client. If nil, the default configuration is used.
72 TLSClientConfig *tls.Config
73
74 // ConnPool optionally specifies an alternate connection pool to use.
75 // If nil, the default is used.
76 ConnPool ClientConnPool
77
78 // DisableCompression, if true, prevents the Transport from
79 // requesting compression with an "Accept-Encoding: gzip"
80 // request header when the Request contains no existing
81 // Accept-Encoding value. If the Transport requests gzip on
82 // its own and gets a gzipped response, it's transparently
83 // decoded in the Response.Body. However, if the user
84 // explicitly requested gzip it is not automatically
85 // uncompressed.
86 DisableCompression bool
87
88 // AllowHTTP, if true, permits HTTP/2 requests using the insecure,
89 // plain-text "http" scheme. Note that this does not enable h2c support.
90 AllowHTTP bool
91
92 // MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
93 // send in the initial settings frame. It is how many bytes
94 // of response headers are allowed. Unlike the http2 spec, zero here
95 // means to use a default limit (currently 10MB). If you actually
Andrea Campanella3614a922021-02-25 12:40:42 +010096 // want to advertise an unlimited value to the peer, Transport
khenaidooac637102019-01-14 15:44:34 -050097 // interprets the highest possible value here (0xffffffff or 1<<32-1)
98 // to mean no limit.
99 MaxHeaderListSize uint32
100
101 // StrictMaxConcurrentStreams controls whether the server's
102 // SETTINGS_MAX_CONCURRENT_STREAMS should be respected
103 // globally. If false, new TCP connections are created to the
104 // server as needed to keep each under the per-connection
105 // SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the
106 // server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as
107 // a global limit and callers of RoundTrip block when needed,
108 // waiting for their turn.
109 StrictMaxConcurrentStreams bool
110
Andrea Campanella3614a922021-02-25 12:40:42 +0100111 // ReadIdleTimeout is the timeout after which a health check using ping
112 // frame will be carried out if no frame is received on the connection.
113 // Note that a ping response will is considered a received frame, so if
114 // there is no other traffic on the connection, the health check will
115 // be performed every ReadIdleTimeout interval.
116 // If zero, no health check is performed.
117 ReadIdleTimeout time.Duration
118
119 // PingTimeout is the timeout after which the connection will be closed
120 // if a response to Ping is not received.
121 // Defaults to 15s.
122 PingTimeout time.Duration
123
khenaidooac637102019-01-14 15:44:34 -0500124 // t1, if non-nil, is the standard library Transport using
125 // this transport. Its settings are used (but not its
126 // RoundTrip method, etc).
127 t1 *http.Transport
128
129 connPoolOnce sync.Once
130 connPoolOrDef ClientConnPool // non-nil version of ConnPool
131}
132
133func (t *Transport) maxHeaderListSize() uint32 {
134 if t.MaxHeaderListSize == 0 {
135 return 10 << 20
136 }
137 if t.MaxHeaderListSize == 0xffffffff {
138 return 0
139 }
140 return t.MaxHeaderListSize
141}
142
143func (t *Transport) disableCompression() bool {
144 return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
145}
146
Andrea Campanella3614a922021-02-25 12:40:42 +0100147func (t *Transport) pingTimeout() time.Duration {
148 if t.PingTimeout == 0 {
149 return 15 * time.Second
150 }
151 return t.PingTimeout
152
153}
154
khenaidooac637102019-01-14 15:44:34 -0500155// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
156// It returns an error if t1 has already been HTTP/2-enabled.
157func ConfigureTransport(t1 *http.Transport) error {
158 _, err := configureTransport(t1)
159 return err
160}
161
162func configureTransport(t1 *http.Transport) (*Transport, error) {
163 connPool := new(clientConnPool)
164 t2 := &Transport{
165 ConnPool: noDialClientConnPool{connPool},
166 t1: t1,
167 }
168 connPool.t = t2
169 if err := registerHTTPSProtocol(t1, noDialH2RoundTripper{t2}); err != nil {
170 return nil, err
171 }
172 if t1.TLSClientConfig == nil {
173 t1.TLSClientConfig = new(tls.Config)
174 }
175 if !strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
176 t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
177 }
178 if !strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
179 t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
180 }
181 upgradeFn := func(authority string, c *tls.Conn) http.RoundTripper {
182 addr := authorityAddr("https", authority)
183 if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
184 go c.Close()
185 return erringRoundTripper{err}
186 } else if !used {
187 // Turns out we don't need this c.
188 // For example, two goroutines made requests to the same host
189 // at the same time, both kicking off TCP dials. (since protocol
190 // was unknown)
191 go c.Close()
192 }
193 return t2
194 }
195 if m := t1.TLSNextProto; len(m) == 0 {
196 t1.TLSNextProto = map[string]func(string, *tls.Conn) http.RoundTripper{
197 "h2": upgradeFn,
198 }
199 } else {
200 m["h2"] = upgradeFn
201 }
202 return t2, nil
203}
204
205func (t *Transport) connPool() ClientConnPool {
206 t.connPoolOnce.Do(t.initConnPool)
207 return t.connPoolOrDef
208}
209
210func (t *Transport) initConnPool() {
211 if t.ConnPool != nil {
212 t.connPoolOrDef = t.ConnPool
213 } else {
214 t.connPoolOrDef = &clientConnPool{t: t}
215 }
216}
217
218// ClientConn is the state of a single HTTP/2 client connection to an
219// HTTP/2 server.
220type ClientConn struct {
221 t *Transport
222 tconn net.Conn // usually *tls.Conn, except specialized impls
223 tlsState *tls.ConnectionState // nil only for specialized impls
Scott Baker8461e152019-10-01 14:44:30 -0700224 reused uint32 // whether conn is being reused; atomic
khenaidooac637102019-01-14 15:44:34 -0500225 singleUse bool // whether being used for a single http.Request
226
227 // readLoop goroutine fields:
228 readerDone chan struct{} // closed on error
229 readerErr error // set before readerDone is closed
230
231 idleTimeout time.Duration // or 0 for never
232 idleTimer *time.Timer
233
234 mu sync.Mutex // guards following
235 cond *sync.Cond // hold mu; broadcast on flow/closed changes
236 flow flow // our conn-level flow control quota (cs.flow is per stream)
237 inflow flow // peer's conn-level flow control
238 closing bool
239 closed bool
240 wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
241 goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
242 goAwayDebug string // goAway frame's debug data, retained as a string
243 streams map[uint32]*clientStream // client-initiated
244 nextStreamID uint32
245 pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
246 pings map[[8]byte]chan struct{} // in flight ping data to notification channel
247 bw *bufio.Writer
248 br *bufio.Reader
249 fr *Framer
250 lastActive time.Time
Andrea Campanella3614a922021-02-25 12:40:42 +0100251 lastIdle time.Time // time last idle
khenaidooac637102019-01-14 15:44:34 -0500252 // Settings from peer: (also guarded by mu)
253 maxFrameSize uint32
254 maxConcurrentStreams uint32
255 peerMaxHeaderListSize uint64
256 initialWindowSize uint32
257
258 hbuf bytes.Buffer // HPACK encoder writes into this
259 henc *hpack.Encoder
260 freeBuf [][]byte
261
262 wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
263 werr error // first write error that has occurred
264}
265
266// clientStream is the state for a single HTTP/2 stream. One of these
267// is created for each Transport.RoundTrip call.
268type clientStream struct {
269 cc *ClientConn
270 req *http.Request
271 trace *httptrace.ClientTrace // or nil
272 ID uint32
273 resc chan resAndError
274 bufPipe pipe // buffered pipe with the flow-controlled response payload
275 startedWrite bool // started request body write; guarded by cc.mu
276 requestedGzip bool
277 on100 func() // optional code to run if get a 100 continue response
278
279 flow flow // guarded by cc.mu
280 inflow flow // guarded by cc.mu
281 bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
282 readErr error // sticky read error; owned by transportResponseBody.Read
283 stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
284 didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu
285
286 peerReset chan struct{} // closed on peer reset
287 resetErr error // populated before peerReset is closed
288
289 done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu
290
291 // owned by clientConnReadLoop:
292 firstByte bool // got the first response byte
293 pastHeaders bool // got first MetaHeadersFrame (actual headers)
294 pastTrailers bool // got optional second MetaHeadersFrame (trailers)
295 num1xx uint8 // number of 1xx responses seen
296
297 trailer http.Header // accumulated trailers
298 resTrailer *http.Header // client's Response.Trailer
299}
300
301// awaitRequestCancel waits for the user to cancel a request or for the done
302// channel to be signaled. A non-nil error is returned only if the request was
303// canceled.
304func awaitRequestCancel(req *http.Request, done <-chan struct{}) error {
305 ctx := req.Context()
306 if req.Cancel == nil && ctx.Done() == nil {
307 return nil
308 }
309 select {
310 case <-req.Cancel:
311 return errRequestCanceled
312 case <-ctx.Done():
313 return ctx.Err()
314 case <-done:
315 return nil
316 }
317}
318
319var got1xxFuncForTests func(int, textproto.MIMEHeader) error
320
321// get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
322// if any. It returns nil if not set or if the Go version is too old.
323func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
324 if fn := got1xxFuncForTests; fn != nil {
325 return fn
326 }
327 return traceGot1xxResponseFunc(cs.trace)
328}
329
330// awaitRequestCancel waits for the user to cancel a request, its context to
331// expire, or for the request to be done (any way it might be removed from the
332// cc.streams map: peer reset, successful completion, TCP connection breakage,
333// etc). If the request is canceled, then cs will be canceled and closed.
334func (cs *clientStream) awaitRequestCancel(req *http.Request) {
335 if err := awaitRequestCancel(req, cs.done); err != nil {
336 cs.cancelStream()
337 cs.bufPipe.CloseWithError(err)
338 }
339}
340
341func (cs *clientStream) cancelStream() {
342 cc := cs.cc
343 cc.mu.Lock()
344 didReset := cs.didReset
345 cs.didReset = true
346 cc.mu.Unlock()
347
348 if !didReset {
349 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
350 cc.forgetStreamID(cs.ID)
351 }
352}
353
354// checkResetOrDone reports any error sent in a RST_STREAM frame by the
355// server, or errStreamClosed if the stream is complete.
356func (cs *clientStream) checkResetOrDone() error {
357 select {
358 case <-cs.peerReset:
359 return cs.resetErr
360 case <-cs.done:
361 return errStreamClosed
362 default:
363 return nil
364 }
365}
366
367func (cs *clientStream) getStartedWrite() bool {
368 cc := cs.cc
369 cc.mu.Lock()
370 defer cc.mu.Unlock()
371 return cs.startedWrite
372}
373
374func (cs *clientStream) abortRequestBodyWrite(err error) {
375 if err == nil {
376 panic("nil error")
377 }
378 cc := cs.cc
379 cc.mu.Lock()
380 cs.stopReqBody = err
381 cc.cond.Broadcast()
382 cc.mu.Unlock()
383}
384
385type stickyErrWriter struct {
386 w io.Writer
387 err *error
388}
389
390func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
391 if *sew.err != nil {
392 return 0, *sew.err
393 }
394 n, err = sew.w.Write(p)
395 *sew.err = err
396 return
397}
398
399// noCachedConnError is the concrete type of ErrNoCachedConn, which
400// needs to be detected by net/http regardless of whether it's its
401// bundled version (in h2_bundle.go with a rewritten type name) or
402// from a user's x/net/http2. As such, as it has a unique method name
403// (IsHTTP2NoCachedConnError) that net/http sniffs for via func
404// isNoCachedConnError.
405type noCachedConnError struct{}
406
407func (noCachedConnError) IsHTTP2NoCachedConnError() {}
408func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
409
410// isNoCachedConnError reports whether err is of type noCachedConnError
411// or its equivalent renamed type in net/http2's h2_bundle.go. Both types
412// may coexist in the same running program.
413func isNoCachedConnError(err error) bool {
414 _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
415 return ok
416}
417
418var ErrNoCachedConn error = noCachedConnError{}
419
420// RoundTripOpt are options for the Transport.RoundTripOpt method.
421type RoundTripOpt struct {
422 // OnlyCachedConn controls whether RoundTripOpt may
423 // create a new TCP connection. If set true and
424 // no cached connection is available, RoundTripOpt
425 // will return ErrNoCachedConn.
426 OnlyCachedConn bool
427}
428
429func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
430 return t.RoundTripOpt(req, RoundTripOpt{})
431}
432
433// authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
434// and returns a host:port. The port 443 is added if needed.
435func authorityAddr(scheme string, authority string) (addr string) {
436 host, port, err := net.SplitHostPort(authority)
437 if err != nil { // authority didn't have a port
438 port = "443"
439 if scheme == "http" {
440 port = "80"
441 }
442 host = authority
443 }
444 if a, err := idna.ToASCII(host); err == nil {
445 host = a
446 }
447 // IPv6 address literal, without a port:
448 if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
449 return host + ":" + port
450 }
451 return net.JoinHostPort(host, port)
452}
453
454// RoundTripOpt is like RoundTrip, but takes options.
455func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
456 if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) {
457 return nil, errors.New("http2: unsupported scheme")
458 }
459
460 addr := authorityAddr(req.URL.Scheme, req.URL.Host)
461 for retry := 0; ; retry++ {
462 cc, err := t.connPool().GetClientConn(req, addr)
463 if err != nil {
464 t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
465 return nil, err
466 }
Scott Baker8461e152019-10-01 14:44:30 -0700467 reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1)
468 traceGotConn(req, cc, reused)
khenaidooac637102019-01-14 15:44:34 -0500469 res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req)
470 if err != nil && retry <= 6 {
471 if req, err = shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil {
472 // After the first retry, do exponential backoff with 10% jitter.
473 if retry == 0 {
474 continue
475 }
476 backoff := float64(uint(1) << (uint(retry) - 1))
477 backoff += backoff * (0.1 * mathrand.Float64())
478 select {
479 case <-time.After(time.Second * time.Duration(backoff)):
480 continue
481 case <-req.Context().Done():
482 return nil, req.Context().Err()
483 }
484 }
485 }
486 if err != nil {
487 t.vlogf("RoundTrip failure: %v", err)
488 return nil, err
489 }
490 return res, nil
491 }
492}
493
494// CloseIdleConnections closes any connections which were previously
495// connected from previous requests but are now sitting idle.
496// It does not interrupt any connections currently in use.
497func (t *Transport) CloseIdleConnections() {
498 if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
499 cp.closeIdleConnections()
500 }
501}
502
503var (
504 errClientConnClosed = errors.New("http2: client conn is closed")
505 errClientConnUnusable = errors.New("http2: client conn not usable")
506 errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
507)
508
509// shouldRetryRequest is called by RoundTrip when a request fails to get
510// response headers. It is always called with a non-nil error.
511// It returns either a request to retry (either the same request, or a
512// modified clone), or an error if the request can't be replayed.
513func shouldRetryRequest(req *http.Request, err error, afterBodyWrite bool) (*http.Request, error) {
514 if !canRetryError(err) {
515 return nil, err
516 }
517 // If the Body is nil (or http.NoBody), it's safe to reuse
518 // this request and its Body.
519 if req.Body == nil || req.Body == http.NoBody {
520 return req, nil
521 }
522
523 // If the request body can be reset back to its original
524 // state via the optional req.GetBody, do that.
525 if req.GetBody != nil {
526 // TODO: consider a req.Body.Close here? or audit that all caller paths do?
527 body, err := req.GetBody()
528 if err != nil {
529 return nil, err
530 }
531 newReq := *req
532 newReq.Body = body
533 return &newReq, nil
534 }
535
536 // The Request.Body can't reset back to the beginning, but we
537 // don't seem to have started to read from it yet, so reuse
538 // the request directly. The "afterBodyWrite" means the
539 // bodyWrite process has started, which becomes true before
540 // the first Read.
541 if !afterBodyWrite {
542 return req, nil
543 }
544
545 return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
546}
547
548func canRetryError(err error) bool {
549 if err == errClientConnUnusable || err == errClientConnGotGoAway {
550 return true
551 }
552 if se, ok := err.(StreamError); ok {
553 return se.Code == ErrCodeRefusedStream
554 }
555 return false
556}
557
558func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) {
559 host, _, err := net.SplitHostPort(addr)
560 if err != nil {
561 return nil, err
562 }
563 tconn, err := t.dialTLS()("tcp", addr, t.newTLSConfig(host))
564 if err != nil {
565 return nil, err
566 }
567 return t.newClientConn(tconn, singleUse)
568}
569
570func (t *Transport) newTLSConfig(host string) *tls.Config {
571 cfg := new(tls.Config)
572 if t.TLSClientConfig != nil {
573 *cfg = *t.TLSClientConfig.Clone()
574 }
575 if !strSliceContains(cfg.NextProtos, NextProtoTLS) {
576 cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
577 }
578 if cfg.ServerName == "" {
579 cfg.ServerName = host
580 }
581 return cfg
582}
583
584func (t *Transport) dialTLS() func(string, string, *tls.Config) (net.Conn, error) {
585 if t.DialTLS != nil {
586 return t.DialTLS
587 }
588 return t.dialTLSDefault
589}
590
591func (t *Transport) dialTLSDefault(network, addr string, cfg *tls.Config) (net.Conn, error) {
592 cn, err := tls.Dial(network, addr, cfg)
593 if err != nil {
594 return nil, err
595 }
596 if err := cn.Handshake(); err != nil {
597 return nil, err
598 }
599 if !cfg.InsecureSkipVerify {
600 if err := cn.VerifyHostname(cfg.ServerName); err != nil {
601 return nil, err
602 }
603 }
604 state := cn.ConnectionState()
605 if p := state.NegotiatedProtocol; p != NextProtoTLS {
606 return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
607 }
608 if !state.NegotiatedProtocolIsMutual {
609 return nil, errors.New("http2: could not negotiate protocol mutually")
610 }
611 return cn, nil
612}
613
614// disableKeepAlives reports whether connections should be closed as
615// soon as possible after handling the first request.
616func (t *Transport) disableKeepAlives() bool {
617 return t.t1 != nil && t.t1.DisableKeepAlives
618}
619
620func (t *Transport) expectContinueTimeout() time.Duration {
621 if t.t1 == nil {
622 return 0
623 }
624 return t.t1.ExpectContinueTimeout
625}
626
627func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
Andrea Campanella3614a922021-02-25 12:40:42 +0100628 return t.newClientConn(c, t.disableKeepAlives())
khenaidooac637102019-01-14 15:44:34 -0500629}
630
631func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
632 cc := &ClientConn{
633 t: t,
634 tconn: c,
635 readerDone: make(chan struct{}),
636 nextStreamID: 1,
637 maxFrameSize: 16 << 10, // spec default
638 initialWindowSize: 65535, // spec default
639 maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
640 peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
641 streams: make(map[uint32]*clientStream),
642 singleUse: singleUse,
643 wantSettingsAck: true,
644 pings: make(map[[8]byte]chan struct{}),
645 }
646 if d := t.idleConnTimeout(); d != 0 {
647 cc.idleTimeout = d
648 cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
649 }
650 if VerboseLogs {
651 t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
652 }
653
654 cc.cond = sync.NewCond(&cc.mu)
655 cc.flow.add(int32(initialWindowSize))
656
657 // TODO: adjust this writer size to account for frame size +
658 // MTU + crypto/tls record padding.
659 cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr})
660 cc.br = bufio.NewReader(c)
661 cc.fr = NewFramer(cc.bw, cc.br)
662 cc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
663 cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
664
665 // TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on
666 // henc in response to SETTINGS frames?
667 cc.henc = hpack.NewEncoder(&cc.hbuf)
668
669 if t.AllowHTTP {
670 cc.nextStreamID = 3
671 }
672
673 if cs, ok := c.(connectionStater); ok {
674 state := cs.ConnectionState()
675 cc.tlsState = &state
676 }
677
678 initialSettings := []Setting{
679 {ID: SettingEnablePush, Val: 0},
680 {ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow},
681 }
682 if max := t.maxHeaderListSize(); max != 0 {
683 initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
684 }
685
686 cc.bw.Write(clientPreface)
687 cc.fr.WriteSettings(initialSettings...)
688 cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
689 cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
690 cc.bw.Flush()
691 if cc.werr != nil {
Andrea Campanella3614a922021-02-25 12:40:42 +0100692 cc.Close()
khenaidooac637102019-01-14 15:44:34 -0500693 return nil, cc.werr
694 }
695
696 go cc.readLoop()
697 return cc, nil
698}
699
Andrea Campanella3614a922021-02-25 12:40:42 +0100700func (cc *ClientConn) healthCheck() {
701 pingTimeout := cc.t.pingTimeout()
702 // We don't need to periodically ping in the health check, because the readLoop of ClientConn will
703 // trigger the healthCheck again if there is no frame received.
704 ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
705 defer cancel()
706 err := cc.Ping(ctx)
707 if err != nil {
708 cc.closeForLostPing()
709 cc.t.connPool().MarkDead(cc)
710 return
711 }
712}
713
khenaidooac637102019-01-14 15:44:34 -0500714func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
715 cc.mu.Lock()
716 defer cc.mu.Unlock()
717
718 old := cc.goAway
719 cc.goAway = f
720
721 // Merge the previous and current GoAway error frames.
722 if cc.goAwayDebug == "" {
723 cc.goAwayDebug = string(f.DebugData())
724 }
725 if old != nil && old.ErrCode != ErrCodeNo {
726 cc.goAway.ErrCode = old.ErrCode
727 }
728 last := f.LastStreamID
729 for streamID, cs := range cc.streams {
730 if streamID > last {
731 select {
732 case cs.resc <- resAndError{err: errClientConnGotGoAway}:
733 default:
734 }
735 }
736 }
737}
738
739// CanTakeNewRequest reports whether the connection can take a new request,
740// meaning it has not been closed or received or sent a GOAWAY.
741func (cc *ClientConn) CanTakeNewRequest() bool {
742 cc.mu.Lock()
743 defer cc.mu.Unlock()
744 return cc.canTakeNewRequestLocked()
745}
746
747// clientConnIdleState describes the suitability of a client
748// connection to initiate a new RoundTrip request.
749type clientConnIdleState struct {
750 canTakeNewRequest bool
751 freshConn bool // whether it's unused by any previous request
752}
753
754func (cc *ClientConn) idleState() clientConnIdleState {
755 cc.mu.Lock()
756 defer cc.mu.Unlock()
757 return cc.idleStateLocked()
758}
759
760func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
761 if cc.singleUse && cc.nextStreamID > 1 {
762 return
763 }
764 var maxConcurrentOkay bool
765 if cc.t.StrictMaxConcurrentStreams {
766 // We'll tell the caller we can take a new request to
767 // prevent the caller from dialing a new TCP
768 // connection, but then we'll block later before
769 // writing it.
770 maxConcurrentOkay = true
771 } else {
772 maxConcurrentOkay = int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams)
773 }
774
775 st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
Andrea Campanella3614a922021-02-25 12:40:42 +0100776 int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
777 !cc.tooIdleLocked()
khenaidooac637102019-01-14 15:44:34 -0500778 st.freshConn = cc.nextStreamID == 1 && st.canTakeNewRequest
779 return
780}
781
782func (cc *ClientConn) canTakeNewRequestLocked() bool {
783 st := cc.idleStateLocked()
784 return st.canTakeNewRequest
785}
786
Andrea Campanella3614a922021-02-25 12:40:42 +0100787// tooIdleLocked reports whether this connection has been been sitting idle
788// for too much wall time.
789func (cc *ClientConn) tooIdleLocked() bool {
790 // The Round(0) strips the monontonic clock reading so the
791 // times are compared based on their wall time. We don't want
792 // to reuse a connection that's been sitting idle during
793 // VM/laptop suspend if monotonic time was also frozen.
794 return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
795}
796
khenaidooac637102019-01-14 15:44:34 -0500797// onIdleTimeout is called from a time.AfterFunc goroutine. It will
798// only be called when we're idle, but because we're coming from a new
799// goroutine, there could be a new request coming in at the same time,
800// so this simply calls the synchronized closeIfIdle to shut down this
801// connection. The timer could just call closeIfIdle, but this is more
802// clear.
803func (cc *ClientConn) onIdleTimeout() {
804 cc.closeIfIdle()
805}
806
807func (cc *ClientConn) closeIfIdle() {
808 cc.mu.Lock()
809 if len(cc.streams) > 0 {
810 cc.mu.Unlock()
811 return
812 }
813 cc.closed = true
814 nextID := cc.nextStreamID
815 // TODO: do clients send GOAWAY too? maybe? Just Close:
816 cc.mu.Unlock()
817
818 if VerboseLogs {
819 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
820 }
821 cc.tconn.Close()
822}
823
824var shutdownEnterWaitStateHook = func() {}
825
826// Shutdown gracefully close the client connection, waiting for running streams to complete.
827func (cc *ClientConn) Shutdown(ctx context.Context) error {
828 if err := cc.sendGoAway(); err != nil {
829 return err
830 }
831 // Wait for all in-flight streams to complete or connection to close
832 done := make(chan error, 1)
833 cancelled := false // guarded by cc.mu
834 go func() {
835 cc.mu.Lock()
836 defer cc.mu.Unlock()
837 for {
838 if len(cc.streams) == 0 || cc.closed {
839 cc.closed = true
840 done <- cc.tconn.Close()
841 break
842 }
843 if cancelled {
844 break
845 }
846 cc.cond.Wait()
847 }
848 }()
849 shutdownEnterWaitStateHook()
850 select {
851 case err := <-done:
852 return err
853 case <-ctx.Done():
854 cc.mu.Lock()
855 // Free the goroutine above
856 cancelled = true
857 cc.cond.Broadcast()
858 cc.mu.Unlock()
859 return ctx.Err()
860 }
861}
862
863func (cc *ClientConn) sendGoAway() error {
864 cc.mu.Lock()
865 defer cc.mu.Unlock()
866 cc.wmu.Lock()
867 defer cc.wmu.Unlock()
868 if cc.closing {
869 // GOAWAY sent already
870 return nil
871 }
872 // Send a graceful shutdown frame to server
873 maxStreamID := cc.nextStreamID
874 if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
875 return err
876 }
877 if err := cc.bw.Flush(); err != nil {
878 return err
879 }
880 // Prevent new requests
881 cc.closing = true
882 return nil
883}
884
Andrea Campanella3614a922021-02-25 12:40:42 +0100885// closes the client connection immediately. In-flight requests are interrupted.
886// err is sent to streams.
887func (cc *ClientConn) closeForError(err error) error {
khenaidooac637102019-01-14 15:44:34 -0500888 cc.mu.Lock()
889 defer cc.cond.Broadcast()
890 defer cc.mu.Unlock()
khenaidooac637102019-01-14 15:44:34 -0500891 for id, cs := range cc.streams {
892 select {
893 case cs.resc <- resAndError{err: err}:
894 default:
895 }
896 cs.bufPipe.CloseWithError(err)
897 delete(cc.streams, id)
898 }
899 cc.closed = true
900 return cc.tconn.Close()
901}
902
Andrea Campanella3614a922021-02-25 12:40:42 +0100903// Close closes the client connection immediately.
904//
905// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
906func (cc *ClientConn) Close() error {
907 err := errors.New("http2: client connection force closed via ClientConn.Close")
908 return cc.closeForError(err)
909}
910
911// closes the client connection immediately. In-flight requests are interrupted.
912func (cc *ClientConn) closeForLostPing() error {
913 err := errors.New("http2: client connection lost")
914 return cc.closeForError(err)
915}
916
khenaidooac637102019-01-14 15:44:34 -0500917const maxAllocFrameSize = 512 << 10
918
919// frameBuffer returns a scratch buffer suitable for writing DATA frames.
920// They're capped at the min of the peer's max frame size or 512KB
921// (kinda arbitrarily), but definitely capped so we don't allocate 4GB
922// bufers.
923func (cc *ClientConn) frameScratchBuffer() []byte {
924 cc.mu.Lock()
925 size := cc.maxFrameSize
926 if size > maxAllocFrameSize {
927 size = maxAllocFrameSize
928 }
929 for i, buf := range cc.freeBuf {
930 if len(buf) >= int(size) {
931 cc.freeBuf[i] = nil
932 cc.mu.Unlock()
933 return buf[:size]
934 }
935 }
936 cc.mu.Unlock()
937 return make([]byte, size)
938}
939
940func (cc *ClientConn) putFrameScratchBuffer(buf []byte) {
941 cc.mu.Lock()
942 defer cc.mu.Unlock()
943 const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate.
944 if len(cc.freeBuf) < maxBufs {
945 cc.freeBuf = append(cc.freeBuf, buf)
946 return
947 }
948 for i, old := range cc.freeBuf {
949 if old == nil {
950 cc.freeBuf[i] = buf
951 return
952 }
953 }
954 // forget about it.
955}
956
957// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
958// exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
959var errRequestCanceled = errors.New("net/http: request canceled")
960
961func commaSeparatedTrailers(req *http.Request) (string, error) {
962 keys := make([]string, 0, len(req.Trailer))
963 for k := range req.Trailer {
964 k = http.CanonicalHeaderKey(k)
965 switch k {
966 case "Transfer-Encoding", "Trailer", "Content-Length":
Andrea Campanella3614a922021-02-25 12:40:42 +0100967 return "", fmt.Errorf("invalid Trailer key %q", k)
khenaidooac637102019-01-14 15:44:34 -0500968 }
969 keys = append(keys, k)
970 }
971 if len(keys) > 0 {
972 sort.Strings(keys)
973 return strings.Join(keys, ","), nil
974 }
975 return "", nil
976}
977
978func (cc *ClientConn) responseHeaderTimeout() time.Duration {
979 if cc.t.t1 != nil {
980 return cc.t.t1.ResponseHeaderTimeout
981 }
982 // No way to do this (yet?) with just an http2.Transport. Probably
983 // no need. Request.Cancel this is the new way. We only need to support
984 // this for compatibility with the old http.Transport fields when
985 // we're doing transparent http2.
986 return 0
987}
988
989// checkConnHeaders checks whether req has any invalid connection-level headers.
990// per RFC 7540 section 8.1.2.2: Connection-Specific Header Fields.
991// Certain headers are special-cased as okay but not transmitted later.
992func checkConnHeaders(req *http.Request) error {
993 if v := req.Header.Get("Upgrade"); v != "" {
994 return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])
995 }
996 if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
997 return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
998 }
999 if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) {
1000 return fmt.Errorf("http2: invalid Connection request header: %q", vv)
1001 }
1002 return nil
1003}
1004
1005// actualContentLength returns a sanitized version of
1006// req.ContentLength, where 0 actually means zero (not unknown) and -1
1007// means unknown.
1008func actualContentLength(req *http.Request) int64 {
1009 if req.Body == nil || req.Body == http.NoBody {
1010 return 0
1011 }
1012 if req.ContentLength != 0 {
1013 return req.ContentLength
1014 }
1015 return -1
1016}
1017
1018func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
1019 resp, _, err := cc.roundTrip(req)
1020 return resp, err
1021}
1022
1023func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAfterReqBodyWrite bool, err error) {
1024 if err := checkConnHeaders(req); err != nil {
1025 return nil, false, err
1026 }
1027 if cc.idleTimer != nil {
1028 cc.idleTimer.Stop()
1029 }
1030
1031 trailers, err := commaSeparatedTrailers(req)
1032 if err != nil {
1033 return nil, false, err
1034 }
1035 hasTrailers := trailers != ""
1036
1037 cc.mu.Lock()
1038 if err := cc.awaitOpenSlotForRequest(req); err != nil {
1039 cc.mu.Unlock()
1040 return nil, false, err
1041 }
1042
1043 body := req.Body
1044 contentLen := actualContentLength(req)
1045 hasBody := contentLen != 0
1046
1047 // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
1048 var requestedGzip bool
1049 if !cc.t.disableCompression() &&
1050 req.Header.Get("Accept-Encoding") == "" &&
1051 req.Header.Get("Range") == "" &&
1052 req.Method != "HEAD" {
1053 // Request gzip only, not deflate. Deflate is ambiguous and
1054 // not as universally supported anyway.
Scott Baker8461e152019-10-01 14:44:30 -07001055 // See: https://zlib.net/zlib_faq.html#faq39
khenaidooac637102019-01-14 15:44:34 -05001056 //
1057 // Note that we don't request this for HEAD requests,
1058 // due to a bug in nginx:
1059 // http://trac.nginx.org/nginx/ticket/358
1060 // https://golang.org/issue/5522
1061 //
1062 // We don't request gzip if the request is for a range, since
1063 // auto-decoding a portion of a gzipped document will just fail
1064 // anyway. See https://golang.org/issue/8923
1065 requestedGzip = true
1066 }
1067
1068 // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
1069 // sent by writeRequestBody below, along with any Trailers,
1070 // again in form HEADERS{1}, CONTINUATION{0,})
1071 hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen)
1072 if err != nil {
1073 cc.mu.Unlock()
1074 return nil, false, err
1075 }
1076
1077 cs := cc.newStream()
1078 cs.req = req
1079 cs.trace = httptrace.ContextClientTrace(req.Context())
1080 cs.requestedGzip = requestedGzip
1081 bodyWriter := cc.t.getBodyWriterState(cs, body)
1082 cs.on100 = bodyWriter.on100
1083
Andrea Campanella3614a922021-02-25 12:40:42 +01001084 defer func() {
1085 cc.wmu.Lock()
1086 werr := cc.werr
1087 cc.wmu.Unlock()
1088 if werr != nil {
1089 cc.Close()
1090 }
1091 }()
1092
khenaidooac637102019-01-14 15:44:34 -05001093 cc.wmu.Lock()
1094 endStream := !hasBody && !hasTrailers
1095 werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
1096 cc.wmu.Unlock()
1097 traceWroteHeaders(cs.trace)
1098 cc.mu.Unlock()
1099
1100 if werr != nil {
1101 if hasBody {
1102 req.Body.Close() // per RoundTripper contract
1103 bodyWriter.cancel()
1104 }
1105 cc.forgetStreamID(cs.ID)
1106 // Don't bother sending a RST_STREAM (our write already failed;
1107 // no need to keep writing)
1108 traceWroteRequest(cs.trace, werr)
1109 return nil, false, werr
1110 }
1111
1112 var respHeaderTimer <-chan time.Time
1113 if hasBody {
1114 bodyWriter.scheduleBodyWrite()
1115 } else {
1116 traceWroteRequest(cs.trace, nil)
1117 if d := cc.responseHeaderTimeout(); d != 0 {
1118 timer := time.NewTimer(d)
1119 defer timer.Stop()
1120 respHeaderTimer = timer.C
1121 }
1122 }
1123
1124 readLoopResCh := cs.resc
1125 bodyWritten := false
1126 ctx := req.Context()
1127
1128 handleReadLoopResponse := func(re resAndError) (*http.Response, bool, error) {
1129 res := re.res
1130 if re.err != nil || res.StatusCode > 299 {
1131 // On error or status code 3xx, 4xx, 5xx, etc abort any
1132 // ongoing write, assuming that the server doesn't care
1133 // about our request body. If the server replied with 1xx or
1134 // 2xx, however, then assume the server DOES potentially
1135 // want our body (e.g. full-duplex streaming:
1136 // golang.org/issue/13444). If it turns out the server
1137 // doesn't, they'll RST_STREAM us soon enough. This is a
1138 // heuristic to avoid adding knobs to Transport. Hopefully
1139 // we can keep it.
1140 bodyWriter.cancel()
1141 cs.abortRequestBodyWrite(errStopReqBodyWrite)
1142 }
1143 if re.err != nil {
1144 cc.forgetStreamID(cs.ID)
1145 return nil, cs.getStartedWrite(), re.err
1146 }
1147 res.Request = req
1148 res.TLS = cc.tlsState
1149 return res, false, nil
1150 }
1151
1152 for {
1153 select {
1154 case re := <-readLoopResCh:
1155 return handleReadLoopResponse(re)
1156 case <-respHeaderTimer:
1157 if !hasBody || bodyWritten {
1158 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
1159 } else {
1160 bodyWriter.cancel()
1161 cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
1162 }
1163 cc.forgetStreamID(cs.ID)
1164 return nil, cs.getStartedWrite(), errTimeout
1165 case <-ctx.Done():
1166 if !hasBody || bodyWritten {
1167 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
1168 } else {
1169 bodyWriter.cancel()
1170 cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
1171 }
1172 cc.forgetStreamID(cs.ID)
1173 return nil, cs.getStartedWrite(), ctx.Err()
1174 case <-req.Cancel:
1175 if !hasBody || bodyWritten {
1176 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
1177 } else {
1178 bodyWriter.cancel()
1179 cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
1180 }
1181 cc.forgetStreamID(cs.ID)
1182 return nil, cs.getStartedWrite(), errRequestCanceled
1183 case <-cs.peerReset:
1184 // processResetStream already removed the
1185 // stream from the streams map; no need for
1186 // forgetStreamID.
1187 return nil, cs.getStartedWrite(), cs.resetErr
1188 case err := <-bodyWriter.resc:
1189 // Prefer the read loop's response, if available. Issue 16102.
1190 select {
1191 case re := <-readLoopResCh:
1192 return handleReadLoopResponse(re)
1193 default:
1194 }
1195 if err != nil {
1196 cc.forgetStreamID(cs.ID)
1197 return nil, cs.getStartedWrite(), err
1198 }
1199 bodyWritten = true
1200 if d := cc.responseHeaderTimeout(); d != 0 {
1201 timer := time.NewTimer(d)
1202 defer timer.Stop()
1203 respHeaderTimer = timer.C
1204 }
1205 }
1206 }
1207}
1208
1209// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
1210// Must hold cc.mu.
1211func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error {
1212 var waitingForConn chan struct{}
1213 var waitingForConnErr error // guarded by cc.mu
1214 for {
1215 cc.lastActive = time.Now()
1216 if cc.closed || !cc.canTakeNewRequestLocked() {
1217 if waitingForConn != nil {
1218 close(waitingForConn)
1219 }
1220 return errClientConnUnusable
1221 }
Andrea Campanella3614a922021-02-25 12:40:42 +01001222 cc.lastIdle = time.Time{}
khenaidooac637102019-01-14 15:44:34 -05001223 if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
1224 if waitingForConn != nil {
1225 close(waitingForConn)
1226 }
1227 return nil
1228 }
1229 // Unfortunately, we cannot wait on a condition variable and channel at
1230 // the same time, so instead, we spin up a goroutine to check if the
1231 // request is canceled while we wait for a slot to open in the connection.
1232 if waitingForConn == nil {
1233 waitingForConn = make(chan struct{})
1234 go func() {
1235 if err := awaitRequestCancel(req, waitingForConn); err != nil {
1236 cc.mu.Lock()
1237 waitingForConnErr = err
1238 cc.cond.Broadcast()
1239 cc.mu.Unlock()
1240 }
1241 }()
1242 }
1243 cc.pendingRequests++
1244 cc.cond.Wait()
1245 cc.pendingRequests--
1246 if waitingForConnErr != nil {
1247 return waitingForConnErr
1248 }
1249 }
1250}
1251
1252// requires cc.wmu be held
1253func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
1254 first := true // first frame written (HEADERS is first, then CONTINUATION)
1255 for len(hdrs) > 0 && cc.werr == nil {
1256 chunk := hdrs
1257 if len(chunk) > maxFrameSize {
1258 chunk = chunk[:maxFrameSize]
1259 }
1260 hdrs = hdrs[len(chunk):]
1261 endHeaders := len(hdrs) == 0
1262 if first {
1263 cc.fr.WriteHeaders(HeadersFrameParam{
1264 StreamID: streamID,
1265 BlockFragment: chunk,
1266 EndStream: endStream,
1267 EndHeaders: endHeaders,
1268 })
1269 first = false
1270 } else {
1271 cc.fr.WriteContinuation(streamID, endHeaders, chunk)
1272 }
1273 }
1274 // TODO(bradfitz): this Flush could potentially block (as
1275 // could the WriteHeaders call(s) above), which means they
1276 // wouldn't respond to Request.Cancel being readable. That's
1277 // rare, but this should probably be in a goroutine.
1278 cc.bw.Flush()
1279 return cc.werr
1280}
1281
1282// internal error values; they don't escape to callers
1283var (
1284 // abort request body write; don't send cancel
1285 errStopReqBodyWrite = errors.New("http2: aborting request body write")
1286
1287 // abort request body write, but send stream reset of cancel.
1288 errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
Scott Baker8461e152019-10-01 14:44:30 -07001289
1290 errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
khenaidooac637102019-01-14 15:44:34 -05001291)
1292
1293func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
1294 cc := cs.cc
1295 sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
1296 buf := cc.frameScratchBuffer()
1297 defer cc.putFrameScratchBuffer(buf)
1298
1299 defer func() {
1300 traceWroteRequest(cs.trace, err)
1301 // TODO: write h12Compare test showing whether
1302 // Request.Body is closed by the Transport,
1303 // and in multiple cases: server replies <=299 and >299
1304 // while still writing request body
1305 cerr := bodyCloser.Close()
1306 if err == nil {
1307 err = cerr
1308 }
1309 }()
1310
1311 req := cs.req
1312 hasTrailers := req.Trailer != nil
Scott Baker8461e152019-10-01 14:44:30 -07001313 remainLen := actualContentLength(req)
1314 hasContentLen := remainLen != -1
khenaidooac637102019-01-14 15:44:34 -05001315
1316 var sawEOF bool
1317 for !sawEOF {
Scott Baker8461e152019-10-01 14:44:30 -07001318 n, err := body.Read(buf[:len(buf)-1])
1319 if hasContentLen {
1320 remainLen -= int64(n)
1321 if remainLen == 0 && err == nil {
1322 // The request body's Content-Length was predeclared and
1323 // we just finished reading it all, but the underlying io.Reader
1324 // returned the final chunk with a nil error (which is one of
1325 // the two valid things a Reader can do at EOF). Because we'd prefer
1326 // to send the END_STREAM bit early, double-check that we're actually
1327 // at EOF. Subsequent reads should return (0, EOF) at this point.
1328 // If either value is different, we return an error in one of two ways below.
1329 var n1 int
1330 n1, err = body.Read(buf[n:])
1331 remainLen -= int64(n1)
1332 }
1333 if remainLen < 0 {
1334 err = errReqBodyTooLong
1335 cc.writeStreamReset(cs.ID, ErrCodeCancel, err)
1336 return err
1337 }
1338 }
khenaidooac637102019-01-14 15:44:34 -05001339 if err == io.EOF {
1340 sawEOF = true
1341 err = nil
1342 } else if err != nil {
1343 cc.writeStreamReset(cs.ID, ErrCodeCancel, err)
1344 return err
1345 }
1346
1347 remain := buf[:n]
1348 for len(remain) > 0 && err == nil {
1349 var allowed int32
1350 allowed, err = cs.awaitFlowControl(len(remain))
1351 switch {
1352 case err == errStopReqBodyWrite:
1353 return err
1354 case err == errStopReqBodyWriteAndCancel:
1355 cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
1356 return err
1357 case err != nil:
1358 return err
1359 }
1360 cc.wmu.Lock()
1361 data := remain[:allowed]
1362 remain = remain[allowed:]
1363 sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
1364 err = cc.fr.WriteData(cs.ID, sentEnd, data)
1365 if err == nil {
1366 // TODO(bradfitz): this flush is for latency, not bandwidth.
1367 // Most requests won't need this. Make this opt-in or
1368 // opt-out? Use some heuristic on the body type? Nagel-like
1369 // timers? Based on 'n'? Only last chunk of this for loop,
1370 // unless flow control tokens are low? For now, always.
1371 // If we change this, see comment below.
1372 err = cc.bw.Flush()
1373 }
1374 cc.wmu.Unlock()
1375 }
1376 if err != nil {
1377 return err
1378 }
1379 }
1380
1381 if sentEnd {
1382 // Already sent END_STREAM (which implies we have no
1383 // trailers) and flushed, because currently all
1384 // WriteData frames above get a flush. So we're done.
1385 return nil
1386 }
1387
1388 var trls []byte
1389 if hasTrailers {
1390 cc.mu.Lock()
1391 trls, err = cc.encodeTrailers(req)
1392 cc.mu.Unlock()
1393 if err != nil {
1394 cc.writeStreamReset(cs.ID, ErrCodeInternal, err)
1395 cc.forgetStreamID(cs.ID)
1396 return err
1397 }
1398 }
1399
1400 cc.mu.Lock()
1401 maxFrameSize := int(cc.maxFrameSize)
1402 cc.mu.Unlock()
1403
1404 cc.wmu.Lock()
1405 defer cc.wmu.Unlock()
1406
1407 // Two ways to send END_STREAM: either with trailers, or
1408 // with an empty DATA frame.
1409 if len(trls) > 0 {
1410 err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
1411 } else {
1412 err = cc.fr.WriteData(cs.ID, true, nil)
1413 }
1414 if ferr := cc.bw.Flush(); ferr != nil && err == nil {
1415 err = ferr
1416 }
1417 return err
1418}
1419
1420// awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
1421// control tokens from the server.
1422// It returns either the non-zero number of tokens taken or an error
1423// if the stream is dead.
1424func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
1425 cc := cs.cc
1426 cc.mu.Lock()
1427 defer cc.mu.Unlock()
1428 for {
1429 if cc.closed {
1430 return 0, errClientConnClosed
1431 }
1432 if cs.stopReqBody != nil {
1433 return 0, cs.stopReqBody
1434 }
1435 if err := cs.checkResetOrDone(); err != nil {
1436 return 0, err
1437 }
1438 if a := cs.flow.available(); a > 0 {
1439 take := a
1440 if int(take) > maxBytes {
1441
1442 take = int32(maxBytes) // can't truncate int; take is int32
1443 }
1444 if take > int32(cc.maxFrameSize) {
1445 take = int32(cc.maxFrameSize)
1446 }
1447 cs.flow.take(take)
1448 return take, nil
1449 }
1450 cc.cond.Wait()
1451 }
1452}
1453
khenaidooac637102019-01-14 15:44:34 -05001454// requires cc.mu be held.
1455func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) {
1456 cc.hbuf.Reset()
1457
1458 host := req.Host
1459 if host == "" {
1460 host = req.URL.Host
1461 }
1462 host, err := httpguts.PunycodeHostPort(host)
1463 if err != nil {
1464 return nil, err
1465 }
1466
1467 var path string
1468 if req.Method != "CONNECT" {
1469 path = req.URL.RequestURI()
1470 if !validPseudoPath(path) {
1471 orig := path
1472 path = strings.TrimPrefix(path, req.URL.Scheme+"://"+host)
1473 if !validPseudoPath(path) {
1474 if req.URL.Opaque != "" {
1475 return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", orig, req.URL.Opaque)
1476 } else {
1477 return nil, fmt.Errorf("invalid request :path %q", orig)
1478 }
1479 }
1480 }
1481 }
1482
1483 // Check for any invalid headers and return an error before we
1484 // potentially pollute our hpack state. (We want to be able to
1485 // continue to reuse the hpack encoder for future requests)
1486 for k, vv := range req.Header {
1487 if !httpguts.ValidHeaderFieldName(k) {
1488 return nil, fmt.Errorf("invalid HTTP header name %q", k)
1489 }
1490 for _, v := range vv {
1491 if !httpguts.ValidHeaderFieldValue(v) {
1492 return nil, fmt.Errorf("invalid HTTP header value %q for header %q", v, k)
1493 }
1494 }
1495 }
1496
1497 enumerateHeaders := func(f func(name, value string)) {
1498 // 8.1.2.3 Request Pseudo-Header Fields
1499 // The :path pseudo-header field includes the path and query parts of the
1500 // target URI (the path-absolute production and optionally a '?' character
1501 // followed by the query production (see Sections 3.3 and 3.4 of
1502 // [RFC3986]).
1503 f(":authority", host)
Scott Baker8461e152019-10-01 14:44:30 -07001504 m := req.Method
1505 if m == "" {
1506 m = http.MethodGet
1507 }
1508 f(":method", m)
khenaidooac637102019-01-14 15:44:34 -05001509 if req.Method != "CONNECT" {
1510 f(":path", path)
1511 f(":scheme", req.URL.Scheme)
1512 }
1513 if trailers != "" {
1514 f("trailer", trailers)
1515 }
1516
1517 var didUA bool
1518 for k, vv := range req.Header {
1519 if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") {
1520 // Host is :authority, already sent.
1521 // Content-Length is automatic, set below.
1522 continue
1523 } else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") ||
1524 strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") ||
1525 strings.EqualFold(k, "keep-alive") {
1526 // Per 8.1.2.2 Connection-Specific Header
1527 // Fields, don't send connection-specific
1528 // fields. We have already checked if any
1529 // are error-worthy so just ignore the rest.
1530 continue
1531 } else if strings.EqualFold(k, "user-agent") {
1532 // Match Go's http1 behavior: at most one
1533 // User-Agent. If set to nil or empty string,
1534 // then omit it. Otherwise if not mentioned,
1535 // include the default (below).
1536 didUA = true
1537 if len(vv) < 1 {
1538 continue
1539 }
1540 vv = vv[:1]
1541 if vv[0] == "" {
1542 continue
1543 }
Andrea Campanella3614a922021-02-25 12:40:42 +01001544 } else if strings.EqualFold(k, "cookie") {
1545 // Per 8.1.2.5 To allow for better compression efficiency, the
1546 // Cookie header field MAY be split into separate header fields,
1547 // each with one or more cookie-pairs.
1548 for _, v := range vv {
1549 for {
1550 p := strings.IndexByte(v, ';')
1551 if p < 0 {
1552 break
1553 }
1554 f("cookie", v[:p])
1555 p++
1556 // strip space after semicolon if any.
1557 for p+1 <= len(v) && v[p] == ' ' {
1558 p++
1559 }
1560 v = v[p:]
1561 }
1562 if len(v) > 0 {
1563 f("cookie", v)
1564 }
1565 }
1566 continue
khenaidooac637102019-01-14 15:44:34 -05001567 }
1568
1569 for _, v := range vv {
1570 f(k, v)
1571 }
1572 }
1573 if shouldSendReqContentLength(req.Method, contentLength) {
1574 f("content-length", strconv.FormatInt(contentLength, 10))
1575 }
1576 if addGzipHeader {
1577 f("accept-encoding", "gzip")
1578 }
1579 if !didUA {
1580 f("user-agent", defaultUserAgent)
1581 }
1582 }
1583
1584 // Do a first pass over the headers counting bytes to ensure
1585 // we don't exceed cc.peerMaxHeaderListSize. This is done as a
1586 // separate pass before encoding the headers to prevent
1587 // modifying the hpack state.
1588 hlSize := uint64(0)
1589 enumerateHeaders(func(name, value string) {
1590 hf := hpack.HeaderField{Name: name, Value: value}
1591 hlSize += uint64(hf.Size())
1592 })
1593
1594 if hlSize > cc.peerMaxHeaderListSize {
1595 return nil, errRequestHeaderListSize
1596 }
1597
1598 trace := httptrace.ContextClientTrace(req.Context())
1599 traceHeaders := traceHasWroteHeaderField(trace)
1600
1601 // Header list size is ok. Write the headers.
1602 enumerateHeaders(func(name, value string) {
1603 name = strings.ToLower(name)
1604 cc.writeHeader(name, value)
1605 if traceHeaders {
1606 traceWroteHeaderField(trace, name, value)
1607 }
1608 })
1609
1610 return cc.hbuf.Bytes(), nil
1611}
1612
1613// shouldSendReqContentLength reports whether the http2.Transport should send
1614// a "content-length" request header. This logic is basically a copy of the net/http
1615// transferWriter.shouldSendContentLength.
1616// The contentLength is the corrected contentLength (so 0 means actually 0, not unknown).
1617// -1 means unknown.
1618func shouldSendReqContentLength(method string, contentLength int64) bool {
1619 if contentLength > 0 {
1620 return true
1621 }
1622 if contentLength < 0 {
1623 return false
1624 }
1625 // For zero bodies, whether we send a content-length depends on the method.
1626 // It also kinda doesn't matter for http2 either way, with END_STREAM.
1627 switch method {
1628 case "POST", "PUT", "PATCH":
1629 return true
1630 default:
1631 return false
1632 }
1633}
1634
1635// requires cc.mu be held.
1636func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
1637 cc.hbuf.Reset()
1638
1639 hlSize := uint64(0)
1640 for k, vv := range req.Trailer {
1641 for _, v := range vv {
1642 hf := hpack.HeaderField{Name: k, Value: v}
1643 hlSize += uint64(hf.Size())
1644 }
1645 }
1646 if hlSize > cc.peerMaxHeaderListSize {
1647 return nil, errRequestHeaderListSize
1648 }
1649
1650 for k, vv := range req.Trailer {
1651 // Transfer-Encoding, etc.. have already been filtered at the
1652 // start of RoundTrip
1653 lowKey := strings.ToLower(k)
1654 for _, v := range vv {
1655 cc.writeHeader(lowKey, v)
1656 }
1657 }
1658 return cc.hbuf.Bytes(), nil
1659}
1660
1661func (cc *ClientConn) writeHeader(name, value string) {
1662 if VerboseLogs {
1663 log.Printf("http2: Transport encoding header %q = %q", name, value)
1664 }
1665 cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
1666}
1667
1668type resAndError struct {
Andrea Campanella3614a922021-02-25 12:40:42 +01001669 _ incomparable
khenaidooac637102019-01-14 15:44:34 -05001670 res *http.Response
1671 err error
1672}
1673
1674// requires cc.mu be held.
1675func (cc *ClientConn) newStream() *clientStream {
1676 cs := &clientStream{
1677 cc: cc,
1678 ID: cc.nextStreamID,
1679 resc: make(chan resAndError, 1),
1680 peerReset: make(chan struct{}),
1681 done: make(chan struct{}),
1682 }
1683 cs.flow.add(int32(cc.initialWindowSize))
1684 cs.flow.setConnFlow(&cc.flow)
1685 cs.inflow.add(transportDefaultStreamFlow)
1686 cs.inflow.setConnFlow(&cc.inflow)
1687 cc.nextStreamID += 2
1688 cc.streams[cs.ID] = cs
1689 return cs
1690}
1691
1692func (cc *ClientConn) forgetStreamID(id uint32) {
1693 cc.streamByID(id, true)
1694}
1695
1696func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
1697 cc.mu.Lock()
1698 defer cc.mu.Unlock()
1699 cs := cc.streams[id]
1700 if andRemove && cs != nil && !cc.closed {
1701 cc.lastActive = time.Now()
1702 delete(cc.streams, id)
1703 if len(cc.streams) == 0 && cc.idleTimer != nil {
1704 cc.idleTimer.Reset(cc.idleTimeout)
Andrea Campanella3614a922021-02-25 12:40:42 +01001705 cc.lastIdle = time.Now()
khenaidooac637102019-01-14 15:44:34 -05001706 }
1707 close(cs.done)
1708 // Wake up checkResetOrDone via clientStream.awaitFlowControl and
1709 // wake up RoundTrip if there is a pending request.
1710 cc.cond.Broadcast()
1711 }
1712 return cs
1713}
1714
1715// clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
1716type clientConnReadLoop struct {
Andrea Campanella3614a922021-02-25 12:40:42 +01001717 _ incomparable
khenaidooac637102019-01-14 15:44:34 -05001718 cc *ClientConn
1719 closeWhenIdle bool
1720}
1721
1722// readLoop runs in its own goroutine and reads and dispatches frames.
1723func (cc *ClientConn) readLoop() {
1724 rl := &clientConnReadLoop{cc: cc}
1725 defer rl.cleanup()
1726 cc.readerErr = rl.run()
1727 if ce, ok := cc.readerErr.(ConnectionError); ok {
1728 cc.wmu.Lock()
1729 cc.fr.WriteGoAway(0, ErrCode(ce), nil)
1730 cc.wmu.Unlock()
1731 }
1732}
1733
1734// GoAwayError is returned by the Transport when the server closes the
1735// TCP connection after sending a GOAWAY frame.
1736type GoAwayError struct {
1737 LastStreamID uint32
1738 ErrCode ErrCode
1739 DebugData string
1740}
1741
1742func (e GoAwayError) Error() string {
1743 return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
1744 e.LastStreamID, e.ErrCode, e.DebugData)
1745}
1746
1747func isEOFOrNetReadError(err error) bool {
1748 if err == io.EOF {
1749 return true
1750 }
1751 ne, ok := err.(*net.OpError)
1752 return ok && ne.Op == "read"
1753}
1754
1755func (rl *clientConnReadLoop) cleanup() {
1756 cc := rl.cc
1757 defer cc.tconn.Close()
1758 defer cc.t.connPool().MarkDead(cc)
1759 defer close(cc.readerDone)
1760
1761 if cc.idleTimer != nil {
1762 cc.idleTimer.Stop()
1763 }
1764
1765 // Close any response bodies if the server closes prematurely.
1766 // TODO: also do this if we've written the headers but not
1767 // gotten a response yet.
1768 err := cc.readerErr
1769 cc.mu.Lock()
1770 if cc.goAway != nil && isEOFOrNetReadError(err) {
1771 err = GoAwayError{
1772 LastStreamID: cc.goAway.LastStreamID,
1773 ErrCode: cc.goAway.ErrCode,
1774 DebugData: cc.goAwayDebug,
1775 }
1776 } else if err == io.EOF {
1777 err = io.ErrUnexpectedEOF
1778 }
1779 for _, cs := range cc.streams {
1780 cs.bufPipe.CloseWithError(err) // no-op if already closed
1781 select {
1782 case cs.resc <- resAndError{err: err}:
1783 default:
1784 }
1785 close(cs.done)
1786 }
1787 cc.closed = true
1788 cc.cond.Broadcast()
1789 cc.mu.Unlock()
1790}
1791
1792func (rl *clientConnReadLoop) run() error {
1793 cc := rl.cc
1794 rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
1795 gotReply := false // ever saw a HEADERS reply
1796 gotSettings := false
Andrea Campanella3614a922021-02-25 12:40:42 +01001797 readIdleTimeout := cc.t.ReadIdleTimeout
1798 var t *time.Timer
1799 if readIdleTimeout != 0 {
1800 t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
1801 defer t.Stop()
1802 }
khenaidooac637102019-01-14 15:44:34 -05001803 for {
1804 f, err := cc.fr.ReadFrame()
Andrea Campanella3614a922021-02-25 12:40:42 +01001805 if t != nil {
1806 t.Reset(readIdleTimeout)
1807 }
khenaidooac637102019-01-14 15:44:34 -05001808 if err != nil {
1809 cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
1810 }
1811 if se, ok := err.(StreamError); ok {
1812 if cs := cc.streamByID(se.StreamID, false); cs != nil {
1813 cs.cc.writeStreamReset(cs.ID, se.Code, err)
1814 cs.cc.forgetStreamID(cs.ID)
1815 if se.Cause == nil {
1816 se.Cause = cc.fr.errDetail
1817 }
1818 rl.endStreamError(cs, se)
1819 }
1820 continue
1821 } else if err != nil {
1822 return err
1823 }
1824 if VerboseLogs {
1825 cc.vlogf("http2: Transport received %s", summarizeFrame(f))
1826 }
1827 if !gotSettings {
1828 if _, ok := f.(*SettingsFrame); !ok {
1829 cc.logf("protocol error: received %T before a SETTINGS frame", f)
1830 return ConnectionError(ErrCodeProtocol)
1831 }
1832 gotSettings = true
1833 }
1834 maybeIdle := false // whether frame might transition us to idle
1835
1836 switch f := f.(type) {
1837 case *MetaHeadersFrame:
1838 err = rl.processHeaders(f)
1839 maybeIdle = true
1840 gotReply = true
1841 case *DataFrame:
1842 err = rl.processData(f)
1843 maybeIdle = true
1844 case *GoAwayFrame:
1845 err = rl.processGoAway(f)
1846 maybeIdle = true
1847 case *RSTStreamFrame:
1848 err = rl.processResetStream(f)
1849 maybeIdle = true
1850 case *SettingsFrame:
1851 err = rl.processSettings(f)
1852 case *PushPromiseFrame:
1853 err = rl.processPushPromise(f)
1854 case *WindowUpdateFrame:
1855 err = rl.processWindowUpdate(f)
1856 case *PingFrame:
1857 err = rl.processPing(f)
1858 default:
1859 cc.logf("Transport: unhandled response frame type %T", f)
1860 }
1861 if err != nil {
1862 if VerboseLogs {
1863 cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
1864 }
1865 return err
1866 }
1867 if rl.closeWhenIdle && gotReply && maybeIdle {
1868 cc.closeIfIdle()
1869 }
1870 }
1871}
1872
1873func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
1874 cc := rl.cc
1875 cs := cc.streamByID(f.StreamID, false)
1876 if cs == nil {
1877 // We'd get here if we canceled a request while the
1878 // server had its response still in flight. So if this
1879 // was just something we canceled, ignore it.
1880 return nil
1881 }
1882 if f.StreamEnded() {
1883 // Issue 20521: If the stream has ended, streamByID() causes
1884 // clientStream.done to be closed, which causes the request's bodyWriter
1885 // to be closed with an errStreamClosed, which may be received by
1886 // clientConn.RoundTrip before the result of processing these headers.
1887 // Deferring stream closure allows the header processing to occur first.
1888 // clientConn.RoundTrip may still receive the bodyWriter error first, but
1889 // the fix for issue 16102 prioritises any response.
1890 //
1891 // Issue 22413: If there is no request body, we should close the
1892 // stream before writing to cs.resc so that the stream is closed
1893 // immediately once RoundTrip returns.
1894 if cs.req.Body != nil {
1895 defer cc.forgetStreamID(f.StreamID)
1896 } else {
1897 cc.forgetStreamID(f.StreamID)
1898 }
1899 }
1900 if !cs.firstByte {
1901 if cs.trace != nil {
1902 // TODO(bradfitz): move first response byte earlier,
1903 // when we first read the 9 byte header, not waiting
1904 // until all the HEADERS+CONTINUATION frames have been
1905 // merged. This works for now.
1906 traceFirstResponseByte(cs.trace)
1907 }
1908 cs.firstByte = true
1909 }
1910 if !cs.pastHeaders {
1911 cs.pastHeaders = true
1912 } else {
1913 return rl.processTrailers(cs, f)
1914 }
1915
1916 res, err := rl.handleResponse(cs, f)
1917 if err != nil {
1918 if _, ok := err.(ConnectionError); ok {
1919 return err
1920 }
1921 // Any other error type is a stream error.
1922 cs.cc.writeStreamReset(f.StreamID, ErrCodeProtocol, err)
1923 cc.forgetStreamID(cs.ID)
1924 cs.resc <- resAndError{err: err}
1925 return nil // return nil from process* funcs to keep conn alive
1926 }
1927 if res == nil {
1928 // (nil, nil) special case. See handleResponse docs.
1929 return nil
1930 }
1931 cs.resTrailer = &res.Trailer
1932 cs.resc <- resAndError{res: res}
1933 return nil
1934}
1935
1936// may return error types nil, or ConnectionError. Any other error value
1937// is a StreamError of type ErrCodeProtocol. The returned error in that case
1938// is the detail.
1939//
1940// As a special case, handleResponse may return (nil, nil) to skip the
1941// frame (currently only used for 1xx responses).
1942func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) {
1943 if f.Truncated {
1944 return nil, errResponseHeaderListSize
1945 }
1946
1947 status := f.PseudoValue("status")
1948 if status == "" {
1949 return nil, errors.New("malformed response from server: missing status pseudo header")
1950 }
1951 statusCode, err := strconv.Atoi(status)
1952 if err != nil {
1953 return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
1954 }
1955
Andrea Campanella3614a922021-02-25 12:40:42 +01001956 regularFields := f.RegularFields()
1957 strs := make([]string, len(regularFields))
1958 header := make(http.Header, len(regularFields))
khenaidooac637102019-01-14 15:44:34 -05001959 res := &http.Response{
1960 Proto: "HTTP/2.0",
1961 ProtoMajor: 2,
1962 Header: header,
1963 StatusCode: statusCode,
1964 Status: status + " " + http.StatusText(statusCode),
1965 }
Andrea Campanella3614a922021-02-25 12:40:42 +01001966 for _, hf := range regularFields {
khenaidooac637102019-01-14 15:44:34 -05001967 key := http.CanonicalHeaderKey(hf.Name)
1968 if key == "Trailer" {
1969 t := res.Trailer
1970 if t == nil {
1971 t = make(http.Header)
1972 res.Trailer = t
1973 }
1974 foreachHeaderElement(hf.Value, func(v string) {
1975 t[http.CanonicalHeaderKey(v)] = nil
1976 })
1977 } else {
Andrea Campanella3614a922021-02-25 12:40:42 +01001978 vv := header[key]
1979 if vv == nil && len(strs) > 0 {
1980 // More than likely this will be a single-element key.
1981 // Most headers aren't multi-valued.
1982 // Set the capacity on strs[0] to 1, so any future append
1983 // won't extend the slice into the other strings.
1984 vv, strs = strs[:1:1], strs[1:]
1985 vv[0] = hf.Value
1986 header[key] = vv
1987 } else {
1988 header[key] = append(vv, hf.Value)
1989 }
khenaidooac637102019-01-14 15:44:34 -05001990 }
1991 }
1992
1993 if statusCode >= 100 && statusCode <= 199 {
1994 cs.num1xx++
1995 const max1xxResponses = 5 // arbitrary bound on number of informational responses, same as net/http
1996 if cs.num1xx > max1xxResponses {
1997 return nil, errors.New("http2: too many 1xx informational responses")
1998 }
1999 if fn := cs.get1xxTraceFunc(); fn != nil {
2000 if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
2001 return nil, err
2002 }
2003 }
2004 if statusCode == 100 {
2005 traceGot100Continue(cs.trace)
2006 if cs.on100 != nil {
2007 cs.on100() // forces any write delay timer to fire
2008 }
2009 }
2010 cs.pastHeaders = false // do it all again
2011 return nil, nil
2012 }
2013
2014 streamEnded := f.StreamEnded()
2015 isHead := cs.req.Method == "HEAD"
2016 if !streamEnded || isHead {
2017 res.ContentLength = -1
2018 if clens := res.Header["Content-Length"]; len(clens) == 1 {
Andrea Campanella3614a922021-02-25 12:40:42 +01002019 if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
2020 res.ContentLength = int64(cl)
khenaidooac637102019-01-14 15:44:34 -05002021 } else {
2022 // TODO: care? unlike http/1, it won't mess up our framing, so it's
2023 // more safe smuggling-wise to ignore.
2024 }
2025 } else if len(clens) > 1 {
2026 // TODO: care? unlike http/1, it won't mess up our framing, so it's
2027 // more safe smuggling-wise to ignore.
2028 }
2029 }
2030
2031 if streamEnded || isHead {
2032 res.Body = noBody
2033 return res, nil
2034 }
2035
2036 cs.bufPipe = pipe{b: &dataBuffer{expected: res.ContentLength}}
2037 cs.bytesRemain = res.ContentLength
2038 res.Body = transportResponseBody{cs}
2039 go cs.awaitRequestCancel(cs.req)
2040
2041 if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
2042 res.Header.Del("Content-Encoding")
2043 res.Header.Del("Content-Length")
2044 res.ContentLength = -1
2045 res.Body = &gzipReader{body: res.Body}
2046 res.Uncompressed = true
2047 }
2048 return res, nil
2049}
2050
2051func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
2052 if cs.pastTrailers {
2053 // Too many HEADERS frames for this stream.
2054 return ConnectionError(ErrCodeProtocol)
2055 }
2056 cs.pastTrailers = true
2057 if !f.StreamEnded() {
2058 // We expect that any headers for trailers also
2059 // has END_STREAM.
2060 return ConnectionError(ErrCodeProtocol)
2061 }
2062 if len(f.PseudoFields()) > 0 {
2063 // No pseudo header fields are defined for trailers.
2064 // TODO: ConnectionError might be overly harsh? Check.
2065 return ConnectionError(ErrCodeProtocol)
2066 }
2067
2068 trailer := make(http.Header)
2069 for _, hf := range f.RegularFields() {
2070 key := http.CanonicalHeaderKey(hf.Name)
2071 trailer[key] = append(trailer[key], hf.Value)
2072 }
2073 cs.trailer = trailer
2074
2075 rl.endStream(cs)
2076 return nil
2077}
2078
2079// transportResponseBody is the concrete type of Transport.RoundTrip's
2080// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
2081// On Close it sends RST_STREAM if EOF wasn't already seen.
2082type transportResponseBody struct {
2083 cs *clientStream
2084}
2085
2086func (b transportResponseBody) Read(p []byte) (n int, err error) {
2087 cs := b.cs
2088 cc := cs.cc
2089
2090 if cs.readErr != nil {
2091 return 0, cs.readErr
2092 }
2093 n, err = b.cs.bufPipe.Read(p)
2094 if cs.bytesRemain != -1 {
2095 if int64(n) > cs.bytesRemain {
2096 n = int(cs.bytesRemain)
2097 if err == nil {
2098 err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
2099 cc.writeStreamReset(cs.ID, ErrCodeProtocol, err)
2100 }
2101 cs.readErr = err
2102 return int(cs.bytesRemain), err
2103 }
2104 cs.bytesRemain -= int64(n)
2105 if err == io.EOF && cs.bytesRemain > 0 {
2106 err = io.ErrUnexpectedEOF
2107 cs.readErr = err
2108 return n, err
2109 }
2110 }
2111 if n == 0 {
2112 // No flow control tokens to send back.
2113 return
2114 }
2115
2116 cc.mu.Lock()
2117 defer cc.mu.Unlock()
2118
2119 var connAdd, streamAdd int32
2120 // Check the conn-level first, before the stream-level.
2121 if v := cc.inflow.available(); v < transportDefaultConnFlow/2 {
2122 connAdd = transportDefaultConnFlow - v
2123 cc.inflow.add(connAdd)
2124 }
2125 if err == nil { // No need to refresh if the stream is over or failed.
2126 // Consider any buffered body data (read from the conn but not
2127 // consumed by the client) when computing flow control for this
2128 // stream.
2129 v := int(cs.inflow.available()) + cs.bufPipe.Len()
2130 if v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh {
2131 streamAdd = int32(transportDefaultStreamFlow - v)
2132 cs.inflow.add(streamAdd)
2133 }
2134 }
2135 if connAdd != 0 || streamAdd != 0 {
2136 cc.wmu.Lock()
2137 defer cc.wmu.Unlock()
2138 if connAdd != 0 {
2139 cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
2140 }
2141 if streamAdd != 0 {
2142 cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
2143 }
2144 cc.bw.Flush()
2145 }
2146 return
2147}
2148
2149var errClosedResponseBody = errors.New("http2: response body closed")
2150
2151func (b transportResponseBody) Close() error {
2152 cs := b.cs
2153 cc := cs.cc
2154
2155 serverSentStreamEnd := cs.bufPipe.Err() == io.EOF
2156 unread := cs.bufPipe.Len()
2157
2158 if unread > 0 || !serverSentStreamEnd {
2159 cc.mu.Lock()
2160 cc.wmu.Lock()
2161 if !serverSentStreamEnd {
2162 cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
2163 cs.didReset = true
2164 }
2165 // Return connection-level flow control.
2166 if unread > 0 {
2167 cc.inflow.add(int32(unread))
2168 cc.fr.WriteWindowUpdate(0, uint32(unread))
2169 }
2170 cc.bw.Flush()
2171 cc.wmu.Unlock()
2172 cc.mu.Unlock()
2173 }
2174
2175 cs.bufPipe.BreakWithError(errClosedResponseBody)
2176 cc.forgetStreamID(cs.ID)
2177 return nil
2178}
2179
2180func (rl *clientConnReadLoop) processData(f *DataFrame) error {
2181 cc := rl.cc
2182 cs := cc.streamByID(f.StreamID, f.StreamEnded())
2183 data := f.Data()
2184 if cs == nil {
2185 cc.mu.Lock()
2186 neverSent := cc.nextStreamID
2187 cc.mu.Unlock()
2188 if f.StreamID >= neverSent {
2189 // We never asked for this.
2190 cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
2191 return ConnectionError(ErrCodeProtocol)
2192 }
2193 // We probably did ask for this, but canceled. Just ignore it.
2194 // TODO: be stricter here? only silently ignore things which
2195 // we canceled, but not things which were closed normally
2196 // by the peer? Tough without accumulating too much state.
2197
2198 // But at least return their flow control:
2199 if f.Length > 0 {
2200 cc.mu.Lock()
2201 cc.inflow.add(int32(f.Length))
2202 cc.mu.Unlock()
2203
2204 cc.wmu.Lock()
2205 cc.fr.WriteWindowUpdate(0, uint32(f.Length))
2206 cc.bw.Flush()
2207 cc.wmu.Unlock()
2208 }
2209 return nil
2210 }
2211 if !cs.firstByte {
2212 cc.logf("protocol error: received DATA before a HEADERS frame")
2213 rl.endStreamError(cs, StreamError{
2214 StreamID: f.StreamID,
2215 Code: ErrCodeProtocol,
2216 })
2217 return nil
2218 }
2219 if f.Length > 0 {
2220 if cs.req.Method == "HEAD" && len(data) > 0 {
2221 cc.logf("protocol error: received DATA on a HEAD request")
2222 rl.endStreamError(cs, StreamError{
2223 StreamID: f.StreamID,
2224 Code: ErrCodeProtocol,
2225 })
2226 return nil
2227 }
2228 // Check connection-level flow control.
2229 cc.mu.Lock()
2230 if cs.inflow.available() >= int32(f.Length) {
2231 cs.inflow.take(int32(f.Length))
2232 } else {
2233 cc.mu.Unlock()
2234 return ConnectionError(ErrCodeFlowControl)
2235 }
2236 // Return any padded flow control now, since we won't
2237 // refund it later on body reads.
2238 var refund int
2239 if pad := int(f.Length) - len(data); pad > 0 {
2240 refund += pad
2241 }
2242 // Return len(data) now if the stream is already closed,
2243 // since data will never be read.
2244 didReset := cs.didReset
2245 if didReset {
2246 refund += len(data)
2247 }
2248 if refund > 0 {
2249 cc.inflow.add(int32(refund))
2250 cc.wmu.Lock()
2251 cc.fr.WriteWindowUpdate(0, uint32(refund))
2252 if !didReset {
2253 cs.inflow.add(int32(refund))
2254 cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
2255 }
2256 cc.bw.Flush()
2257 cc.wmu.Unlock()
2258 }
2259 cc.mu.Unlock()
2260
2261 if len(data) > 0 && !didReset {
2262 if _, err := cs.bufPipe.Write(data); err != nil {
2263 rl.endStreamError(cs, err)
2264 return err
2265 }
2266 }
2267 }
2268
2269 if f.StreamEnded() {
2270 rl.endStream(cs)
2271 }
2272 return nil
2273}
2274
khenaidooac637102019-01-14 15:44:34 -05002275func (rl *clientConnReadLoop) endStream(cs *clientStream) {
2276 // TODO: check that any declared content-length matches, like
2277 // server.go's (*stream).endStream method.
2278 rl.endStreamError(cs, nil)
2279}
2280
2281func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
2282 var code func()
2283 if err == nil {
2284 err = io.EOF
2285 code = cs.copyTrailers
2286 }
2287 if isConnectionCloseRequest(cs.req) {
2288 rl.closeWhenIdle = true
2289 }
2290 cs.bufPipe.closeWithErrorAndCode(err, code)
2291
2292 select {
2293 case cs.resc <- resAndError{err: err}:
2294 default:
2295 }
2296}
2297
2298func (cs *clientStream) copyTrailers() {
2299 for k, vv := range cs.trailer {
2300 t := cs.resTrailer
2301 if *t == nil {
2302 *t = make(http.Header)
2303 }
2304 (*t)[k] = vv
2305 }
2306}
2307
2308func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
2309 cc := rl.cc
2310 cc.t.connPool().MarkDead(cc)
2311 if f.ErrCode != 0 {
2312 // TODO: deal with GOAWAY more. particularly the error code
2313 cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
2314 }
2315 cc.setGoAway(f)
2316 return nil
2317}
2318
2319func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2320 cc := rl.cc
2321 cc.mu.Lock()
2322 defer cc.mu.Unlock()
2323
2324 if f.IsAck() {
2325 if cc.wantSettingsAck {
2326 cc.wantSettingsAck = false
2327 return nil
2328 }
2329 return ConnectionError(ErrCodeProtocol)
2330 }
2331
2332 err := f.ForeachSetting(func(s Setting) error {
2333 switch s.ID {
2334 case SettingMaxFrameSize:
2335 cc.maxFrameSize = s.Val
2336 case SettingMaxConcurrentStreams:
2337 cc.maxConcurrentStreams = s.Val
2338 case SettingMaxHeaderListSize:
2339 cc.peerMaxHeaderListSize = uint64(s.Val)
2340 case SettingInitialWindowSize:
2341 // Values above the maximum flow-control
2342 // window size of 2^31-1 MUST be treated as a
2343 // connection error (Section 5.4.1) of type
2344 // FLOW_CONTROL_ERROR.
2345 if s.Val > math.MaxInt32 {
2346 return ConnectionError(ErrCodeFlowControl)
2347 }
2348
2349 // Adjust flow control of currently-open
2350 // frames by the difference of the old initial
2351 // window size and this one.
2352 delta := int32(s.Val) - int32(cc.initialWindowSize)
2353 for _, cs := range cc.streams {
2354 cs.flow.add(delta)
2355 }
2356 cc.cond.Broadcast()
2357
2358 cc.initialWindowSize = s.Val
2359 default:
2360 // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
2361 cc.vlogf("Unhandled Setting: %v", s)
2362 }
2363 return nil
2364 })
2365 if err != nil {
2366 return err
2367 }
2368
2369 cc.wmu.Lock()
2370 defer cc.wmu.Unlock()
2371
2372 cc.fr.WriteSettingsAck()
2373 cc.bw.Flush()
2374 return cc.werr
2375}
2376
2377func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
2378 cc := rl.cc
2379 cs := cc.streamByID(f.StreamID, false)
2380 if f.StreamID != 0 && cs == nil {
2381 return nil
2382 }
2383
2384 cc.mu.Lock()
2385 defer cc.mu.Unlock()
2386
2387 fl := &cc.flow
2388 if cs != nil {
2389 fl = &cs.flow
2390 }
2391 if !fl.add(int32(f.Increment)) {
2392 return ConnectionError(ErrCodeFlowControl)
2393 }
2394 cc.cond.Broadcast()
2395 return nil
2396}
2397
2398func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
2399 cs := rl.cc.streamByID(f.StreamID, true)
2400 if cs == nil {
2401 // TODO: return error if server tries to RST_STEAM an idle stream
2402 return nil
2403 }
2404 select {
2405 case <-cs.peerReset:
2406 // Already reset.
2407 // This is the only goroutine
2408 // which closes this, so there
2409 // isn't a race.
2410 default:
2411 err := streamError(cs.ID, f.ErrCode)
2412 cs.resetErr = err
2413 close(cs.peerReset)
2414 cs.bufPipe.CloseWithError(err)
2415 cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
2416 }
2417 return nil
2418}
2419
2420// Ping sends a PING frame to the server and waits for the ack.
2421func (cc *ClientConn) Ping(ctx context.Context) error {
2422 c := make(chan struct{})
2423 // Generate a random payload
2424 var p [8]byte
2425 for {
2426 if _, err := rand.Read(p[:]); err != nil {
2427 return err
2428 }
2429 cc.mu.Lock()
2430 // check for dup before insert
2431 if _, found := cc.pings[p]; !found {
2432 cc.pings[p] = c
2433 cc.mu.Unlock()
2434 break
2435 }
2436 cc.mu.Unlock()
2437 }
2438 cc.wmu.Lock()
2439 if err := cc.fr.WritePing(false, p); err != nil {
2440 cc.wmu.Unlock()
2441 return err
2442 }
2443 if err := cc.bw.Flush(); err != nil {
2444 cc.wmu.Unlock()
2445 return err
2446 }
2447 cc.wmu.Unlock()
2448 select {
2449 case <-c:
2450 return nil
2451 case <-ctx.Done():
2452 return ctx.Err()
2453 case <-cc.readerDone:
2454 // connection closed
2455 return cc.readerErr
2456 }
2457}
2458
2459func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
2460 if f.IsAck() {
2461 cc := rl.cc
2462 cc.mu.Lock()
2463 defer cc.mu.Unlock()
2464 // If ack, notify listener if any
2465 if c, ok := cc.pings[f.Data]; ok {
2466 close(c)
2467 delete(cc.pings, f.Data)
2468 }
2469 return nil
2470 }
2471 cc := rl.cc
2472 cc.wmu.Lock()
2473 defer cc.wmu.Unlock()
2474 if err := cc.fr.WritePing(true, f.Data); err != nil {
2475 return err
2476 }
2477 return cc.bw.Flush()
2478}
2479
2480func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
2481 // We told the peer we don't want them.
2482 // Spec says:
2483 // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
2484 // setting of the peer endpoint is set to 0. An endpoint that
2485 // has set this setting and has received acknowledgement MUST
2486 // treat the receipt of a PUSH_PROMISE frame as a connection
2487 // error (Section 5.4.1) of type PROTOCOL_ERROR."
2488 return ConnectionError(ErrCodeProtocol)
2489}
2490
2491func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
2492 // TODO: map err to more interesting error codes, once the
2493 // HTTP community comes up with some. But currently for
2494 // RST_STREAM there's no equivalent to GOAWAY frame's debug
2495 // data, and the error codes are all pretty vague ("cancel").
2496 cc.wmu.Lock()
2497 cc.fr.WriteRSTStream(streamID, code)
2498 cc.bw.Flush()
2499 cc.wmu.Unlock()
2500}
2501
2502var (
2503 errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
2504 errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit")
khenaidooac637102019-01-14 15:44:34 -05002505)
2506
2507func (cc *ClientConn) logf(format string, args ...interface{}) {
2508 cc.t.logf(format, args...)
2509}
2510
2511func (cc *ClientConn) vlogf(format string, args ...interface{}) {
2512 cc.t.vlogf(format, args...)
2513}
2514
2515func (t *Transport) vlogf(format string, args ...interface{}) {
2516 if VerboseLogs {
2517 t.logf(format, args...)
2518 }
2519}
2520
2521func (t *Transport) logf(format string, args ...interface{}) {
2522 log.Printf(format, args...)
2523}
2524
2525var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
2526
2527func strSliceContains(ss []string, s string) bool {
2528 for _, v := range ss {
2529 if v == s {
2530 return true
2531 }
2532 }
2533 return false
2534}
2535
2536type erringRoundTripper struct{ err error }
2537
Andrea Campanella3614a922021-02-25 12:40:42 +01002538func (rt erringRoundTripper) RoundTripErr() error { return rt.err }
khenaidooac637102019-01-14 15:44:34 -05002539func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err }
2540
2541// gzipReader wraps a response body so it can lazily
2542// call gzip.NewReader on the first call to Read
2543type gzipReader struct {
Andrea Campanella3614a922021-02-25 12:40:42 +01002544 _ incomparable
khenaidooac637102019-01-14 15:44:34 -05002545 body io.ReadCloser // underlying Response.Body
2546 zr *gzip.Reader // lazily-initialized gzip reader
2547 zerr error // sticky error
2548}
2549
2550func (gz *gzipReader) Read(p []byte) (n int, err error) {
2551 if gz.zerr != nil {
2552 return 0, gz.zerr
2553 }
2554 if gz.zr == nil {
2555 gz.zr, err = gzip.NewReader(gz.body)
2556 if err != nil {
2557 gz.zerr = err
2558 return 0, err
2559 }
2560 }
2561 return gz.zr.Read(p)
2562}
2563
2564func (gz *gzipReader) Close() error {
2565 return gz.body.Close()
2566}
2567
2568type errorReader struct{ err error }
2569
2570func (r errorReader) Read(p []byte) (int, error) { return 0, r.err }
2571
2572// bodyWriterState encapsulates various state around the Transport's writing
2573// of the request body, particularly regarding doing delayed writes of the body
2574// when the request contains "Expect: 100-continue".
2575type bodyWriterState struct {
2576 cs *clientStream
2577 timer *time.Timer // if non-nil, we're doing a delayed write
2578 fnonce *sync.Once // to call fn with
2579 fn func() // the code to run in the goroutine, writing the body
2580 resc chan error // result of fn's execution
2581 delay time.Duration // how long we should delay a delayed write for
2582}
2583
2584func (t *Transport) getBodyWriterState(cs *clientStream, body io.Reader) (s bodyWriterState) {
2585 s.cs = cs
2586 if body == nil {
2587 return
2588 }
2589 resc := make(chan error, 1)
2590 s.resc = resc
2591 s.fn = func() {
2592 cs.cc.mu.Lock()
2593 cs.startedWrite = true
2594 cs.cc.mu.Unlock()
2595 resc <- cs.writeRequestBody(body, cs.req.Body)
2596 }
2597 s.delay = t.expectContinueTimeout()
2598 if s.delay == 0 ||
2599 !httpguts.HeaderValuesContainsToken(
2600 cs.req.Header["Expect"],
2601 "100-continue") {
2602 return
2603 }
2604 s.fnonce = new(sync.Once)
2605
2606 // Arm the timer with a very large duration, which we'll
2607 // intentionally lower later. It has to be large now because
2608 // we need a handle to it before writing the headers, but the
2609 // s.delay value is defined to not start until after the
2610 // request headers were written.
2611 const hugeDuration = 365 * 24 * time.Hour
2612 s.timer = time.AfterFunc(hugeDuration, func() {
2613 s.fnonce.Do(s.fn)
2614 })
2615 return
2616}
2617
2618func (s bodyWriterState) cancel() {
2619 if s.timer != nil {
2620 s.timer.Stop()
2621 }
2622}
2623
2624func (s bodyWriterState) on100() {
2625 if s.timer == nil {
2626 // If we didn't do a delayed write, ignore the server's
2627 // bogus 100 continue response.
2628 return
2629 }
2630 s.timer.Stop()
2631 go func() { s.fnonce.Do(s.fn) }()
2632}
2633
2634// scheduleBodyWrite starts writing the body, either immediately (in
2635// the common case) or after the delay timeout. It should not be
2636// called until after the headers have been written.
2637func (s bodyWriterState) scheduleBodyWrite() {
2638 if s.timer == nil {
2639 // We're not doing a delayed write (see
2640 // getBodyWriterState), so just start the writing
2641 // goroutine immediately.
2642 go s.fn()
2643 return
2644 }
2645 traceWait100Continue(s.cs.trace)
2646 if s.timer.Stop() {
2647 s.timer.Reset(s.delay)
2648 }
2649}
2650
2651// isConnectionCloseRequest reports whether req should use its own
2652// connection for a single request and then close the connection.
2653func isConnectionCloseRequest(req *http.Request) bool {
2654 return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
2655}
2656
2657// registerHTTPSProtocol calls Transport.RegisterProtocol but
2658// converting panics into errors.
2659func registerHTTPSProtocol(t *http.Transport, rt noDialH2RoundTripper) (err error) {
2660 defer func() {
2661 if e := recover(); e != nil {
2662 err = fmt.Errorf("%v", e)
2663 }
2664 }()
2665 t.RegisterProtocol("https", rt)
2666 return nil
2667}
2668
2669// noDialH2RoundTripper is a RoundTripper which only tries to complete the request
2670// if there's already has a cached connection to the host.
2671// (The field is exported so it can be accessed via reflect from net/http; tested
2672// by TestNoDialH2RoundTripperType)
2673type noDialH2RoundTripper struct{ *Transport }
2674
2675func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
2676 res, err := rt.Transport.RoundTrip(req)
2677 if isNoCachedConnError(err) {
2678 return nil, http.ErrSkipAltProtocol
2679 }
2680 return res, err
2681}
2682
2683func (t *Transport) idleConnTimeout() time.Duration {
2684 if t.t1 != nil {
2685 return t.t1.IdleConnTimeout
2686 }
2687 return 0
2688}
2689
2690func traceGetConn(req *http.Request, hostPort string) {
2691 trace := httptrace.ContextClientTrace(req.Context())
2692 if trace == nil || trace.GetConn == nil {
2693 return
2694 }
2695 trace.GetConn(hostPort)
2696}
2697
Scott Baker8461e152019-10-01 14:44:30 -07002698func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
khenaidooac637102019-01-14 15:44:34 -05002699 trace := httptrace.ContextClientTrace(req.Context())
2700 if trace == nil || trace.GotConn == nil {
2701 return
2702 }
2703 ci := httptrace.GotConnInfo{Conn: cc.tconn}
Scott Baker8461e152019-10-01 14:44:30 -07002704 ci.Reused = reused
khenaidooac637102019-01-14 15:44:34 -05002705 cc.mu.Lock()
Scott Baker8461e152019-10-01 14:44:30 -07002706 ci.WasIdle = len(cc.streams) == 0 && reused
khenaidooac637102019-01-14 15:44:34 -05002707 if ci.WasIdle && !cc.lastActive.IsZero() {
2708 ci.IdleTime = time.Now().Sub(cc.lastActive)
2709 }
2710 cc.mu.Unlock()
2711
2712 trace.GotConn(ci)
2713}
2714
2715func traceWroteHeaders(trace *httptrace.ClientTrace) {
2716 if trace != nil && trace.WroteHeaders != nil {
2717 trace.WroteHeaders()
2718 }
2719}
2720
2721func traceGot100Continue(trace *httptrace.ClientTrace) {
2722 if trace != nil && trace.Got100Continue != nil {
2723 trace.Got100Continue()
2724 }
2725}
2726
2727func traceWait100Continue(trace *httptrace.ClientTrace) {
2728 if trace != nil && trace.Wait100Continue != nil {
2729 trace.Wait100Continue()
2730 }
2731}
2732
2733func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
2734 if trace != nil && trace.WroteRequest != nil {
2735 trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
2736 }
2737}
2738
2739func traceFirstResponseByte(trace *httptrace.ClientTrace) {
2740 if trace != nil && trace.GotFirstResponseByte != nil {
2741 trace.GotFirstResponseByte()
2742 }
2743}