blob: 326bf08480002d71b23f1027b728b706bf174822 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "context"
23 "fmt"
24 "io"
25 "math"
26 "net"
27 "net/http"
khenaidoo5cb0d402021-12-08 14:09:16 -050028 "path/filepath"
khenaidoo5fc5cea2021-08-11 17:39:16 -040029 "strconv"
30 "strings"
31 "sync"
32 "sync/atomic"
33 "time"
34
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/credentials"
39 "google.golang.org/grpc/internal/channelz"
40 icredentials "google.golang.org/grpc/internal/credentials"
Akash Kankanala761955c2024-02-21 19:32:20 +053041 "google.golang.org/grpc/internal/grpclog"
42 "google.golang.org/grpc/internal/grpcsync"
khenaidoo5fc5cea2021-08-11 17:39:16 -040043 "google.golang.org/grpc/internal/grpcutil"
44 imetadata "google.golang.org/grpc/internal/metadata"
Akash Kankanala761955c2024-02-21 19:32:20 +053045 istatus "google.golang.org/grpc/internal/status"
khenaidoo5fc5cea2021-08-11 17:39:16 -040046 "google.golang.org/grpc/internal/syscall"
47 "google.golang.org/grpc/internal/transport/networktype"
48 "google.golang.org/grpc/keepalive"
49 "google.golang.org/grpc/metadata"
50 "google.golang.org/grpc/peer"
51 "google.golang.org/grpc/resolver"
52 "google.golang.org/grpc/stats"
53 "google.golang.org/grpc/status"
54)
55
56// clientConnectionCounter counts the number of connections a client has
57// initiated (equal to the number of http2Clients created). Must be accessed
58// atomically.
59var clientConnectionCounter uint64
60
61// http2Client implements the ClientTransport interface with HTTP2.
62type http2Client struct {
Akash Kankanala761955c2024-02-21 19:32:20 +053063 lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
64 ctx context.Context
65 cancel context.CancelFunc
66 ctxDone <-chan struct{} // Cache the ctx.Done() chan.
67 userAgent string
68 // address contains the resolver returned address for this transport.
69 // If the `ServerName` field is set, it takes precedence over `CallHdr.Host`
70 // passed to `NewStream`, when determining the :authority header.
71 address resolver.Address
khenaidoo5fc5cea2021-08-11 17:39:16 -040072 md metadata.MD
73 conn net.Conn // underlying communication channel
74 loopy *loopyWriter
75 remoteAddr net.Addr
76 localAddr net.Addr
77 authInfo credentials.AuthInfo // auth info about the connection
78
79 readerDone chan struct{} // sync point to enable testing.
80 writerDone chan struct{} // sync point to enable testing.
81 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
82 // that the server sent GoAway on this transport.
83 goAway chan struct{}
84
85 framer *framer
86 // controlBuf delivers all the control related tasks (e.g., window
87 // updates, reset streams, and various settings) to the controller.
Akash Kankanala761955c2024-02-21 19:32:20 +053088 // Do not access controlBuf with mu held.
khenaidoo5fc5cea2021-08-11 17:39:16 -040089 controlBuf *controlBuffer
90 fc *trInFlow
91 // The scheme used: https if TLS is on, http otherwise.
92 scheme string
93
94 isSecure bool
95
96 perRPCCreds []credentials.PerRPCCredentials
97
98 kp keepalive.ClientParameters
99 keepaliveEnabled bool
100
Akash Kankanala761955c2024-02-21 19:32:20 +0530101 statsHandlers []stats.Handler
khenaidoo5fc5cea2021-08-11 17:39:16 -0400102
103 initialWindowSize int32
104
105 // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
106 maxSendHeaderListSize *uint32
107
108 bdpEst *bdpEstimator
khenaidoo5fc5cea2021-08-11 17:39:16 -0400109
110 maxConcurrentStreams uint32
111 streamQuota int64
112 streamsQuotaAvailable chan struct{}
113 waitingStreams uint32
114 nextID uint32
Akash Kankanala761955c2024-02-21 19:32:20 +0530115 registeredCompressors string
khenaidoo5fc5cea2021-08-11 17:39:16 -0400116
Akash Kankanala761955c2024-02-21 19:32:20 +0530117 // Do not access controlBuf with mu held.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400118 mu sync.Mutex // guard the following variables
119 state transportState
120 activeStreams map[uint32]*Stream
121 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
122 prevGoAwayID uint32
123 // goAwayReason records the http2.ErrCode and debug data received with the
124 // GoAway frame.
125 goAwayReason GoAwayReason
126 // goAwayDebugMessage contains a detailed human readable string about a
127 // GoAway frame, useful for error messages.
128 goAwayDebugMessage string
129 // A condition variable used to signal when the keepalive goroutine should
130 // go dormant. The condition for dormancy is based on the number of active
131 // streams and the `PermitWithoutStream` keepalive client parameter. And
132 // since the number of active streams is guarded by the above mutex, we use
133 // the same for this condition variable as well.
134 kpDormancyCond *sync.Cond
135 // A boolean to track whether the keepalive goroutine is dormant or not.
136 // This is checked before attempting to signal the above condition
137 // variable.
138 kpDormant bool
139
140 // Fields below are for channelz metric collection.
Akash Kankanala761955c2024-02-21 19:32:20 +0530141 channelzID *channelz.Identifier
khenaidoo5fc5cea2021-08-11 17:39:16 -0400142 czData *channelzData
143
Akash Kankanala761955c2024-02-21 19:32:20 +0530144 onClose func(GoAwayReason)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400145
146 bufferPool *bufferPool
147
148 connectionID uint64
Akash Kankanala761955c2024-02-21 19:32:20 +0530149 logger *grpclog.PrefixLogger
khenaidoo5fc5cea2021-08-11 17:39:16 -0400150}
151
152func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
153 address := addr.Addr
154 networkType, ok := networktype.Get(addr)
155 if fn != nil {
khenaidoo5cb0d402021-12-08 14:09:16 -0500156 // Special handling for unix scheme with custom dialer. Back in the day,
157 // we did not have a unix resolver and therefore targets with a unix
158 // scheme would end up using the passthrough resolver. So, user's used a
159 // custom dialer in this case and expected the original dial target to
160 // be passed to the custom dialer. Now, we have a unix resolver. But if
161 // a custom dialer is specified, we want to retain the old behavior in
162 // terms of the address being passed to the custom dialer.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400163 if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
khenaidoo5cb0d402021-12-08 14:09:16 -0500164 // Supported unix targets are either "unix://absolute-path" or
165 // "unix:relative-path".
166 if filepath.IsAbs(address) {
167 return fn(ctx, "unix://"+address)
168 }
169 return fn(ctx, "unix:"+address)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400170 }
171 return fn(ctx, address)
172 }
173 if !ok {
174 networkType, address = parseDialTarget(address)
175 }
176 if networkType == "tcp" && useProxy {
177 return proxyDial(ctx, address, grpcUA)
178 }
179 return (&net.Dialer{}).DialContext(ctx, networkType, address)
180}
181
182func isTemporary(err error) bool {
183 switch err := err.(type) {
184 case interface {
185 Temporary() bool
186 }:
187 return err.Temporary()
188 case interface {
189 Timeout() bool
190 }:
191 // Timeouts may be resolved upon retry, and are thus treated as
192 // temporary.
193 return err.Timeout()
194 }
195 return true
196}
197
198// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
199// and starts to receive messages on it. Non-nil error returns if construction
200// fails.
Akash Kankanala761955c2024-02-21 19:32:20 +0530201func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400202 scheme := "http"
203 ctx, cancel := context.WithCancel(ctx)
204 defer func() {
205 if err != nil {
206 cancel()
207 }
208 }()
209
khenaidoo257f3192021-12-15 16:46:37 -0500210 // gRPC, resolver, balancer etc. can specify arbitrary data in the
211 // Attributes field of resolver.Address, which is shoved into connectCtx
212 // and passed to the dialer and credential handshaker. This makes it possible for
213 // address specific arbitrary data to reach custom dialers and credential handshakers.
214 connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
215
khenaidoo5fc5cea2021-08-11 17:39:16 -0400216 conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
217 if err != nil {
218 if opts.FailOnNonTempDialError {
219 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
220 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530221 return nil, connectionErrorf(true, err, "transport: Error while dialing: %v", err)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400222 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530223
khenaidoo5fc5cea2021-08-11 17:39:16 -0400224 // Any further errors will close the underlying connection
225 defer func(conn net.Conn) {
226 if err != nil {
227 conn.Close()
228 }
229 }(conn)
Akash Kankanala761955c2024-02-21 19:32:20 +0530230
231 // The following defer and goroutine monitor the connectCtx for cancelation
232 // and deadline. On context expiration, the connection is hard closed and
233 // this function will naturally fail as a result. Otherwise, the defer
234 // waits for the goroutine to exit to prevent the context from being
235 // monitored (and to prevent the connection from ever being closed) after
236 // returning from this function.
237 ctxMonitorDone := grpcsync.NewEvent()
238 newClientCtx, newClientDone := context.WithCancel(connectCtx)
239 defer func() {
240 newClientDone() // Awaken the goroutine below if connectCtx hasn't expired.
241 <-ctxMonitorDone.Done() // Wait for the goroutine below to exit.
242 }()
243 go func(conn net.Conn) {
244 defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
245 <-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
246 if err := connectCtx.Err(); err != nil {
247 // connectCtx expired before exiting the function. Hard close the connection.
248 if logger.V(logLevel) {
249 logger.Infof("Aborting due to connect deadline expiring: %v", err)
250 }
251 conn.Close()
252 }
253 }(conn)
254
khenaidoo5fc5cea2021-08-11 17:39:16 -0400255 kp := opts.KeepaliveParams
256 // Validate keepalive parameters.
257 if kp.Time == 0 {
258 kp.Time = defaultClientKeepaliveTime
259 }
260 if kp.Timeout == 0 {
261 kp.Timeout = defaultClientKeepaliveTimeout
262 }
263 keepaliveEnabled := false
264 if kp.Time != infinity {
265 if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
266 return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
267 }
268 keepaliveEnabled = true
269 }
270 var (
271 isSecure bool
272 authInfo credentials.AuthInfo
273 )
274 transportCreds := opts.TransportCredentials
275 perRPCCreds := opts.PerRPCCredentials
276
277 if b := opts.CredsBundle; b != nil {
278 if t := b.TransportCredentials(); t != nil {
279 transportCreds = t
280 }
281 if t := b.PerRPCCredentials(); t != nil {
282 perRPCCreds = append(perRPCCreds, t)
283 }
284 }
285 if transportCreds != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530286 conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400287 if err != nil {
288 return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
289 }
290 for _, cd := range perRPCCreds {
291 if cd.RequireTransportSecurity() {
292 if ci, ok := authInfo.(interface {
293 GetCommonAuthInfo() credentials.CommonAuthInfo
294 }); ok {
295 secLevel := ci.GetCommonAuthInfo().SecurityLevel
296 if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
297 return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
298 }
299 }
300 }
301 }
302 isSecure = true
303 if transportCreds.Info().SecurityProtocol == "tls" {
304 scheme = "https"
305 }
306 }
307 dynamicWindow := true
308 icwz := int32(initialWindowSize)
309 if opts.InitialConnWindowSize >= defaultWindowSize {
310 icwz = opts.InitialConnWindowSize
311 dynamicWindow = false
312 }
313 writeBufSize := opts.WriteBufferSize
314 readBufSize := opts.ReadBufferSize
315 maxHeaderListSize := defaultClientMaxHeaderListSize
316 if opts.MaxHeaderListSize != nil {
317 maxHeaderListSize = *opts.MaxHeaderListSize
318 }
319 t := &http2Client{
320 ctx: ctx,
321 ctxDone: ctx.Done(), // Cache Done chan.
322 cancel: cancel,
323 userAgent: opts.UserAgent,
Akash Kankanala761955c2024-02-21 19:32:20 +0530324 registeredCompressors: grpcutil.RegisteredCompressors(),
325 address: addr,
khenaidoo5fc5cea2021-08-11 17:39:16 -0400326 conn: conn,
327 remoteAddr: conn.RemoteAddr(),
328 localAddr: conn.LocalAddr(),
329 authInfo: authInfo,
330 readerDone: make(chan struct{}),
331 writerDone: make(chan struct{}),
332 goAway: make(chan struct{}),
333 framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
334 fc: &trInFlow{limit: uint32(icwz)},
335 scheme: scheme,
336 activeStreams: make(map[uint32]*Stream),
337 isSecure: isSecure,
338 perRPCCreds: perRPCCreds,
339 kp: kp,
Akash Kankanala761955c2024-02-21 19:32:20 +0530340 statsHandlers: opts.StatsHandlers,
khenaidoo5fc5cea2021-08-11 17:39:16 -0400341 initialWindowSize: initialWindowSize,
khenaidoo5fc5cea2021-08-11 17:39:16 -0400342 nextID: 1,
343 maxConcurrentStreams: defaultMaxStreamsClient,
344 streamQuota: defaultMaxStreamsClient,
345 streamsQuotaAvailable: make(chan struct{}, 1),
346 czData: new(channelzData),
khenaidoo5fc5cea2021-08-11 17:39:16 -0400347 keepaliveEnabled: keepaliveEnabled,
348 bufferPool: newBufferPool(),
Akash Kankanala761955c2024-02-21 19:32:20 +0530349 onClose: onClose,
khenaidoo5fc5cea2021-08-11 17:39:16 -0400350 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530351 t.logger = prefixLoggerForClientTransport(t)
352 // Add peer information to the http2client context.
353 t.ctx = peer.NewContext(t.ctx, t.getPeer())
khenaidoo5fc5cea2021-08-11 17:39:16 -0400354
355 if md, ok := addr.Metadata.(*metadata.MD); ok {
356 t.md = *md
357 } else if md := imetadata.Get(addr); md != nil {
358 t.md = md
359 }
360 t.controlBuf = newControlBuffer(t.ctxDone)
361 if opts.InitialWindowSize >= defaultWindowSize {
362 t.initialWindowSize = opts.InitialWindowSize
363 dynamicWindow = false
364 }
365 if dynamicWindow {
366 t.bdpEst = &bdpEstimator{
367 bdp: initialWindowSize,
368 updateFlowControl: t.updateFlowControl,
369 }
370 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530371 for _, sh := range t.statsHandlers {
372 t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
khenaidoo5fc5cea2021-08-11 17:39:16 -0400373 RemoteAddr: t.remoteAddr,
374 LocalAddr: t.localAddr,
375 })
376 connBegin := &stats.ConnBegin{
377 Client: true,
378 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530379 sh.HandleConn(t.ctx, connBegin)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400380 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530381 t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
382 if err != nil {
383 return nil, err
khenaidoo5fc5cea2021-08-11 17:39:16 -0400384 }
385 if t.keepaliveEnabled {
386 t.kpDormancyCond = sync.NewCond(&t.mu)
387 go t.keepalive()
388 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530389
390 // Start the reader goroutine for incoming messages. Each transport has a
391 // dedicated goroutine which reads HTTP2 frames from the network. Then it
392 // dispatches the frame to the corresponding stream entity. When the
393 // server preface is received, readerErrCh is closed. If an error occurs
394 // first, an error is pushed to the channel. This must be checked before
395 // returning from this function.
396 readerErrCh := make(chan error, 1)
397 go t.reader(readerErrCh)
398 defer func() {
399 if err == nil {
400 err = <-readerErrCh
401 }
402 if err != nil {
403 t.Close(err)
404 }
405 }()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400406
407 // Send connection preface to server.
408 n, err := t.conn.Write(clientPreface)
409 if err != nil {
410 err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400411 return nil, err
412 }
413 if n != len(clientPreface) {
414 err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
khenaidoo5fc5cea2021-08-11 17:39:16 -0400415 return nil, err
416 }
417 var ss []http2.Setting
418
419 if t.initialWindowSize != defaultWindowSize {
420 ss = append(ss, http2.Setting{
421 ID: http2.SettingInitialWindowSize,
422 Val: uint32(t.initialWindowSize),
423 })
424 }
425 if opts.MaxHeaderListSize != nil {
426 ss = append(ss, http2.Setting{
427 ID: http2.SettingMaxHeaderListSize,
428 Val: *opts.MaxHeaderListSize,
429 })
430 }
431 err = t.framer.fr.WriteSettings(ss...)
432 if err != nil {
433 err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400434 return nil, err
435 }
436 // Adjust the connection flow control window if needed.
437 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
438 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
439 err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400440 return nil, err
441 }
442 }
443
444 t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
445
446 if err := t.framer.writer.Flush(); err != nil {
447 return nil, err
448 }
449 go func() {
Akash Kankanala761955c2024-02-21 19:32:20 +0530450 t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
451 t.loopy.run()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400452 close(t.writerDone)
453 }()
454 return t, nil
455}
456
457func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
458 // TODO(zhaoq): Handle uint32 overflow of Stream.id.
459 s := &Stream{
460 ct: t,
461 done: make(chan struct{}),
462 method: callHdr.Method,
463 sendCompress: callHdr.SendCompress,
464 buf: newRecvBuffer(),
465 headerChan: make(chan struct{}),
466 contentSubtype: callHdr.ContentSubtype,
467 doneFunc: callHdr.DoneFunc,
468 }
469 s.wq = newWriteQuota(defaultWriteQuota, s.done)
470 s.requestRead = func(n int) {
471 t.adjustWindow(s, uint32(n))
472 }
473 // The client side stream context should have exactly the same life cycle with the user provided context.
474 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
475 // So we use the original context here instead of creating a copy.
476 s.ctx = ctx
477 s.trReader = &transportReader{
478 reader: &recvBufferReader{
479 ctx: s.ctx,
480 ctxDone: s.ctx.Done(),
481 recv: s.buf,
482 closeStream: func(err error) {
483 t.CloseStream(s, err)
484 },
485 freeBuffer: t.bufferPool.put,
486 },
487 windowHandler: func(n int) {
488 t.updateWindow(s, uint32(n))
489 },
490 }
491 return s
492}
493
494func (t *http2Client) getPeer() *peer.Peer {
495 return &peer.Peer{
496 Addr: t.remoteAddr,
Akash Kankanala761955c2024-02-21 19:32:20 +0530497 AuthInfo: t.authInfo, // Can be nil
khenaidoo5fc5cea2021-08-11 17:39:16 -0400498 }
499}
500
501func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
502 aud := t.createAudience(callHdr)
503 ri := credentials.RequestInfo{
504 Method: callHdr.Method,
505 AuthInfo: t.authInfo,
506 }
507 ctxWithRequestInfo := icredentials.NewRequestInfoContext(ctx, ri)
508 authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
509 if err != nil {
510 return nil, err
511 }
512 callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
513 if err != nil {
514 return nil, err
515 }
516 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
517 // first and create a slice of that exact size.
518 // Make the slice of certain predictable size to reduce allocations made by append.
519 hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
520 hfLen += len(authData) + len(callAuthData)
521 headerFields := make([]hpack.HeaderField, 0, hfLen)
522 headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
523 headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
524 headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
525 headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
526 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(callHdr.ContentSubtype)})
527 headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
528 headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
529 if callHdr.PreviousAttempts > 0 {
530 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
531 }
532
Akash Kankanala761955c2024-02-21 19:32:20 +0530533 registeredCompressors := t.registeredCompressors
khenaidoo5fc5cea2021-08-11 17:39:16 -0400534 if callHdr.SendCompress != "" {
535 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
Akash Kankanala761955c2024-02-21 19:32:20 +0530536 // Include the outgoing compressor name when compressor is not registered
537 // via encoding.RegisterCompressor. This is possible when client uses
538 // WithCompressor dial option.
539 if !grpcutil.IsCompressorNameRegistered(callHdr.SendCompress) {
540 if registeredCompressors != "" {
541 registeredCompressors += ","
542 }
543 registeredCompressors += callHdr.SendCompress
544 }
545 }
546
547 if registeredCompressors != "" {
548 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: registeredCompressors})
khenaidoo5fc5cea2021-08-11 17:39:16 -0400549 }
550 if dl, ok := ctx.Deadline(); ok {
551 // Send out timeout regardless its value. The server can detect timeout context by itself.
552 // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
553 timeout := time.Until(dl)
554 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
555 }
556 for k, v := range authData {
557 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
558 }
559 for k, v := range callAuthData {
560 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
561 }
562 if b := stats.OutgoingTags(ctx); b != nil {
563 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
564 }
565 if b := stats.OutgoingTrace(ctx); b != nil {
566 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
567 }
568
569 if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
570 var k string
571 for k, vv := range md {
572 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
573 if isReservedHeader(k) {
574 continue
575 }
576 for _, v := range vv {
577 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
578 }
579 }
580 for _, vv := range added {
581 for i, v := range vv {
582 if i%2 == 0 {
583 k = strings.ToLower(v)
584 continue
585 }
586 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
587 if isReservedHeader(k) {
588 continue
589 }
590 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
591 }
592 }
593 }
594 for k, vv := range t.md {
595 if isReservedHeader(k) {
596 continue
597 }
598 for _, v := range vv {
599 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
600 }
601 }
602 return headerFields, nil
603}
604
605func (t *http2Client) createAudience(callHdr *CallHdr) string {
606 // Create an audience string only if needed.
607 if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
608 return ""
609 }
610 // Construct URI required to get auth request metadata.
611 // Omit port if it is the default one.
612 host := strings.TrimSuffix(callHdr.Host, ":443")
613 pos := strings.LastIndex(callHdr.Method, "/")
614 if pos == -1 {
615 pos = len(callHdr.Method)
616 }
617 return "https://" + host + callHdr.Method[:pos]
618}
619
620func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
621 if len(t.perRPCCreds) == 0 {
622 return nil, nil
623 }
624 authData := map[string]string{}
625 for _, c := range t.perRPCCreds {
626 data, err := c.GetRequestMetadata(ctx, audience)
627 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530628 if st, ok := status.FromError(err); ok {
629 // Restrict the code to the list allowed by gRFC A54.
630 if istatus.IsRestrictedControlPlaneCode(st) {
631 err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
632 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400633 return nil, err
634 }
635
khenaidoo257f3192021-12-15 16:46:37 -0500636 return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", err)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400637 }
638 for k, v := range data {
639 // Capital header names are illegal in HTTP/2.
640 k = strings.ToLower(k)
641 authData[k] = v
642 }
643 }
644 return authData, nil
645}
646
647func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
648 var callAuthData map[string]string
649 // Check if credentials.PerRPCCredentials were provided via call options.
650 // Note: if these credentials are provided both via dial options and call
651 // options, then both sets of credentials will be applied.
652 if callCreds := callHdr.Creds; callCreds != nil {
653 if callCreds.RequireTransportSecurity() {
654 ri, _ := credentials.RequestInfoFromContext(ctx)
655 if !t.isSecure || credentials.CheckSecurityLevel(ri.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
656 return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
657 }
658 }
659 data, err := callCreds.GetRequestMetadata(ctx, audience)
660 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530661 if st, ok := status.FromError(err); ok {
662 // Restrict the code to the list allowed by gRFC A54.
663 if istatus.IsRestrictedControlPlaneCode(st) {
664 err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
665 }
666 return nil, err
667 }
668 return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", err)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400669 }
670 callAuthData = make(map[string]string, len(data))
671 for k, v := range data {
672 // Capital header names are illegal in HTTP/2
673 k = strings.ToLower(k)
674 callAuthData[k] = v
675 }
676 }
677 return callAuthData, nil
678}
679
680// NewStreamError wraps an error and reports additional information. Typically
681// NewStream errors result in transparent retry, as they mean nothing went onto
682// the wire. However, there are two notable exceptions:
683//
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500684// 1. If the stream headers violate the max header list size allowed by the
Akash Kankanala761955c2024-02-21 19:32:20 +0530685// server. It's possible this could succeed on another transport, even if
686// it's unlikely, but do not transparently retry.
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500687// 2. If the credentials errored when requesting their headers. In this case,
688// it's possible a retry can fix the problem, but indefinitely transparently
689// retrying is not appropriate as it is likely the credentials, if they can
690// eventually succeed, would need I/O to do so.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400691type NewStreamError struct {
692 Err error
693
Akash Kankanala761955c2024-02-21 19:32:20 +0530694 AllowTransparentRetry bool
khenaidoo5fc5cea2021-08-11 17:39:16 -0400695}
696
697func (e NewStreamError) Error() string {
698 return e.Err.Error()
699}
700
701// NewStream creates a stream and registers it into the transport as "active"
702// streams. All non-nil errors returned will be *NewStreamError.
Akash Kankanala761955c2024-02-21 19:32:20 +0530703func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400704 ctx = peer.NewContext(ctx, t.getPeer())
Akash Kankanala761955c2024-02-21 19:32:20 +0530705
706 // ServerName field of the resolver returned address takes precedence over
707 // Host field of CallHdr to determine the :authority header. This is because,
708 // the ServerName field takes precedence for server authentication during
709 // TLS handshake, and the :authority header should match the value used
710 // for server authentication.
711 if t.address.ServerName != "" {
712 newCallHdr := *callHdr
713 newCallHdr.Host = t.address.ServerName
714 callHdr = &newCallHdr
715 }
716
khenaidoo5fc5cea2021-08-11 17:39:16 -0400717 headerFields, err := t.createHeaderFields(ctx, callHdr)
718 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530719 return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
khenaidoo5fc5cea2021-08-11 17:39:16 -0400720 }
721 s := t.newStream(ctx, callHdr)
722 cleanup := func(err error) {
723 if s.swapState(streamDone) == streamDone {
724 // If it was already done, return.
725 return
726 }
727 // The stream was unprocessed by the server.
728 atomic.StoreUint32(&s.unprocessed, 1)
729 s.write(recvMsg{err: err})
730 close(s.done)
731 // If headerChan isn't closed, then close it.
732 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
733 close(s.headerChan)
734 }
735 }
736 hdr := &headerFrame{
737 hf: headerFields,
738 endStream: false,
739 initStream: func(id uint32) error {
740 t.mu.Lock()
Akash Kankanala761955c2024-02-21 19:32:20 +0530741 // TODO: handle transport closure in loopy instead and remove this
742 // initStream is never called when transport is draining.
743 if t.state == closing {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400744 t.mu.Unlock()
Akash Kankanala761955c2024-02-21 19:32:20 +0530745 cleanup(ErrConnClosing)
746 return ErrConnClosing
khenaidoo5fc5cea2021-08-11 17:39:16 -0400747 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400748 if channelz.IsOn() {
749 atomic.AddInt64(&t.czData.streamsStarted, 1)
750 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
751 }
752 // If the keepalive goroutine has gone dormant, wake it up.
753 if t.kpDormant {
754 t.kpDormancyCond.Signal()
755 }
756 t.mu.Unlock()
757 return nil
758 },
759 onOrphaned: cleanup,
760 wq: s.wq,
761 }
762 firstTry := true
763 var ch chan struct{}
Akash Kankanala761955c2024-02-21 19:32:20 +0530764 transportDrainRequired := false
khenaidoo5fc5cea2021-08-11 17:39:16 -0400765 checkForStreamQuota := func(it interface{}) bool {
766 if t.streamQuota <= 0 { // Can go negative if server decreases it.
767 if firstTry {
768 t.waitingStreams++
769 }
770 ch = t.streamsQuotaAvailable
771 return false
772 }
773 if !firstTry {
774 t.waitingStreams--
775 }
776 t.streamQuota--
777 h := it.(*headerFrame)
778 h.streamID = t.nextID
779 t.nextID += 2
Akash Kankanala761955c2024-02-21 19:32:20 +0530780
781 // Drain client transport if nextID > MaxStreamID which signals gRPC that
782 // the connection is closed and a new one must be created for subsequent RPCs.
783 transportDrainRequired = t.nextID > MaxStreamID
784
khenaidoo5fc5cea2021-08-11 17:39:16 -0400785 s.id = h.streamID
786 s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
Akash Kankanala761955c2024-02-21 19:32:20 +0530787 t.mu.Lock()
788 if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
789 t.mu.Unlock()
790 return false // Don't create a stream if the transport is already closed.
791 }
792 t.activeStreams[s.id] = s
793 t.mu.Unlock()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400794 if t.streamQuota > 0 && t.waitingStreams > 0 {
795 select {
796 case t.streamsQuotaAvailable <- struct{}{}:
797 default:
798 }
799 }
800 return true
801 }
802 var hdrListSizeErr error
803 checkForHeaderListSize := func(it interface{}) bool {
804 if t.maxSendHeaderListSize == nil {
805 return true
806 }
807 hdrFrame := it.(*headerFrame)
808 var sz int64
809 for _, f := range hdrFrame.hf {
810 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
811 hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
812 return false
813 }
814 }
815 return true
816 }
817 for {
818 success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
Akash Kankanala761955c2024-02-21 19:32:20 +0530819 return checkForHeaderListSize(it) && checkForStreamQuota(it)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400820 }, hdr)
821 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530822 // Connection closed.
823 return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
khenaidoo5fc5cea2021-08-11 17:39:16 -0400824 }
825 if success {
826 break
827 }
828 if hdrListSizeErr != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530829 return nil, &NewStreamError{Err: hdrListSizeErr}
khenaidoo5fc5cea2021-08-11 17:39:16 -0400830 }
831 firstTry = false
832 select {
833 case <-ch:
834 case <-ctx.Done():
835 return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
836 case <-t.goAway:
Akash Kankanala761955c2024-02-21 19:32:20 +0530837 return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
khenaidoo5fc5cea2021-08-11 17:39:16 -0400838 case <-t.ctx.Done():
Akash Kankanala761955c2024-02-21 19:32:20 +0530839 return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
khenaidoo5fc5cea2021-08-11 17:39:16 -0400840 }
841 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530842 if len(t.statsHandlers) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400843 header, ok := metadata.FromOutgoingContext(ctx)
844 if ok {
845 header.Set("user-agent", t.userAgent)
846 } else {
847 header = metadata.Pairs("user-agent", t.userAgent)
848 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530849 for _, sh := range t.statsHandlers {
850 // Note: The header fields are compressed with hpack after this call returns.
851 // No WireLength field is set here.
852 // Note: Creating a new stats object to prevent pollution.
853 outHeader := &stats.OutHeader{
854 Client: true,
855 FullMethod: callHdr.Method,
856 RemoteAddr: t.remoteAddr,
857 LocalAddr: t.localAddr,
858 Compression: callHdr.SendCompress,
859 Header: header,
860 }
861 sh.HandleRPC(s.ctx, outHeader)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400862 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530863 }
864 if transportDrainRequired {
865 if t.logger.V(logLevel) {
866 t.logger.Infof("Draining transport: t.nextID > MaxStreamID")
867 }
868 t.GracefulClose()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400869 }
870 return s, nil
871}
872
873// CloseStream clears the footprint of a stream when the stream is not needed any more.
874// This must not be executed in reader's goroutine.
875func (t *http2Client) CloseStream(s *Stream, err error) {
876 var (
877 rst bool
878 rstCode http2.ErrCode
879 )
880 if err != nil {
881 rst = true
882 rstCode = http2.ErrCodeCancel
883 }
884 t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
885}
886
887func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
888 // Set stream status to done.
889 if s.swapState(streamDone) == streamDone {
890 // If it was already done, return. If multiple closeStream calls
891 // happen simultaneously, wait for the first to finish.
892 <-s.done
893 return
894 }
895 // status and trailers can be updated here without any synchronization because the stream goroutine will
896 // only read it after it sees an io.EOF error from read or write and we'll write those errors
897 // only after updating this.
898 s.status = st
899 if len(mdata) > 0 {
900 s.trailer = mdata
901 }
902 if err != nil {
903 // This will unblock reads eventually.
904 s.write(recvMsg{err: err})
905 }
906 // If headerChan isn't closed, then close it.
907 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
908 s.noHeaders = true
909 close(s.headerChan)
910 }
911 cleanup := &cleanupStream{
912 streamID: s.id,
913 onWrite: func() {
914 t.mu.Lock()
915 if t.activeStreams != nil {
916 delete(t.activeStreams, s.id)
917 }
918 t.mu.Unlock()
919 if channelz.IsOn() {
920 if eosReceived {
921 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
922 } else {
923 atomic.AddInt64(&t.czData.streamsFailed, 1)
924 }
925 }
926 },
927 rst: rst,
928 rstCode: rstCode,
929 }
930 addBackStreamQuota := func(interface{}) bool {
931 t.streamQuota++
932 if t.streamQuota > 0 && t.waitingStreams > 0 {
933 select {
934 case t.streamsQuotaAvailable <- struct{}{}:
935 default:
936 }
937 }
938 return true
939 }
940 t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
941 // This will unblock write.
942 close(s.done)
943 if s.doneFunc != nil {
944 s.doneFunc()
945 }
946}
947
948// Close kicks off the shutdown process of the transport. This should be called
949// only once on a transport. Once it is called, the transport should not be
950// accessed any more.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400951func (t *http2Client) Close(err error) {
952 t.mu.Lock()
Akash Kankanala761955c2024-02-21 19:32:20 +0530953 // Make sure we only close once.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400954 if t.state == closing {
955 t.mu.Unlock()
956 return
957 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530958 if t.logger.V(logLevel) {
959 t.logger.Infof("Closing: %v", err)
960 }
961 // Call t.onClose ASAP to prevent the client from attempting to create new
962 // streams.
963 if t.state != draining {
964 t.onClose(GoAwayInvalid)
965 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400966 t.state = closing
967 streams := t.activeStreams
968 t.activeStreams = nil
969 if t.kpDormant {
970 // If the keepalive goroutine is blocked on this condition variable, we
971 // should unblock it so that the goroutine eventually exits.
972 t.kpDormancyCond.Signal()
973 }
974 t.mu.Unlock()
975 t.controlBuf.finish()
976 t.cancel()
977 t.conn.Close()
Akash Kankanala761955c2024-02-21 19:32:20 +0530978 channelz.RemoveEntry(t.channelzID)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400979 // Append info about previous goaways if there were any, since this may be important
980 // for understanding the root cause for this connection to be closed.
981 _, goAwayDebugMessage := t.GetGoAwayReason()
982
983 var st *status.Status
984 if len(goAwayDebugMessage) > 0 {
985 st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
986 err = st.Err()
987 } else {
988 st = status.New(codes.Unavailable, err.Error())
989 }
990
991 // Notify all active streams.
992 for _, s := range streams {
993 t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
994 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530995 for _, sh := range t.statsHandlers {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400996 connEnd := &stats.ConnEnd{
997 Client: true,
998 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530999 sh.HandleConn(t.ctx, connEnd)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001000 }
1001}
1002
1003// GracefulClose sets the state to draining, which prevents new streams from
1004// being created and causes the transport to be closed when the last active
1005// stream is closed. If there are no active streams, the transport is closed
1006// immediately. This does nothing if the transport is already draining or
1007// closing.
1008func (t *http2Client) GracefulClose() {
1009 t.mu.Lock()
1010 // Make sure we move to draining only from active.
1011 if t.state == draining || t.state == closing {
1012 t.mu.Unlock()
1013 return
1014 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301015 if t.logger.V(logLevel) {
1016 t.logger.Infof("GracefulClose called")
1017 }
1018 t.onClose(GoAwayInvalid)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001019 t.state = draining
1020 active := len(t.activeStreams)
1021 t.mu.Unlock()
1022 if active == 0 {
Akash Kankanala761955c2024-02-21 19:32:20 +05301023 t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
khenaidoo5fc5cea2021-08-11 17:39:16 -04001024 return
1025 }
1026 t.controlBuf.put(&incomingGoAway{})
1027}
1028
1029// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
1030// should proceed only if Write returns nil.
1031func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
1032 if opts.Last {
1033 // If it's the last message, update stream state.
1034 if !s.compareAndSwapState(streamActive, streamWriteDone) {
1035 return errStreamDone
1036 }
1037 } else if s.getState() != streamActive {
1038 return errStreamDone
1039 }
1040 df := &dataFrame{
1041 streamID: s.id,
1042 endStream: opts.Last,
1043 h: hdr,
1044 d: data,
1045 }
1046 if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
1047 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
1048 return err
1049 }
1050 }
1051 return t.controlBuf.put(df)
1052}
1053
1054func (t *http2Client) getStream(f http2.Frame) *Stream {
1055 t.mu.Lock()
1056 s := t.activeStreams[f.Header().StreamID]
1057 t.mu.Unlock()
1058 return s
1059}
1060
1061// adjustWindow sends out extra window update over the initial window size
1062// of stream if the application is requesting data larger in size than
1063// the window.
1064func (t *http2Client) adjustWindow(s *Stream, n uint32) {
1065 if w := s.fc.maybeAdjust(n); w > 0 {
1066 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
1067 }
1068}
1069
1070// updateWindow adjusts the inbound quota for the stream.
1071// Window updates will be sent out when the cumulative quota
1072// exceeds the corresponding threshold.
1073func (t *http2Client) updateWindow(s *Stream, n uint32) {
1074 if w := s.fc.onRead(n); w > 0 {
1075 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
1076 }
1077}
1078
1079// updateFlowControl updates the incoming flow control windows
1080// for the transport and the stream based on the current bdp
1081// estimation.
1082func (t *http2Client) updateFlowControl(n uint32) {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001083 updateIWS := func(interface{}) bool {
1084 t.initialWindowSize = int32(n)
Akash Kankanala761955c2024-02-21 19:32:20 +05301085 t.mu.Lock()
1086 for _, s := range t.activeStreams {
1087 s.fc.newLimit(n)
1088 }
1089 t.mu.Unlock()
khenaidoo5fc5cea2021-08-11 17:39:16 -04001090 return true
1091 }
1092 t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
1093 t.controlBuf.put(&outgoingSettings{
1094 ss: []http2.Setting{
1095 {
1096 ID: http2.SettingInitialWindowSize,
1097 Val: n,
1098 },
1099 },
1100 })
1101}
1102
1103func (t *http2Client) handleData(f *http2.DataFrame) {
1104 size := f.Header().Length
1105 var sendBDPPing bool
1106 if t.bdpEst != nil {
1107 sendBDPPing = t.bdpEst.add(size)
1108 }
1109 // Decouple connection's flow control from application's read.
1110 // An update on connection's flow control should not depend on
1111 // whether user application has read the data or not. Such a
1112 // restriction is already imposed on the stream's flow control,
1113 // and therefore the sender will be blocked anyways.
1114 // Decoupling the connection flow control will prevent other
1115 // active(fast) streams from starving in presence of slow or
1116 // inactive streams.
1117 //
1118 if w := t.fc.onData(size); w > 0 {
1119 t.controlBuf.put(&outgoingWindowUpdate{
1120 streamID: 0,
1121 increment: w,
1122 })
1123 }
1124 if sendBDPPing {
1125 // Avoid excessive ping detection (e.g. in an L7 proxy)
1126 // by sending a window update prior to the BDP ping.
1127
1128 if w := t.fc.reset(); w > 0 {
1129 t.controlBuf.put(&outgoingWindowUpdate{
1130 streamID: 0,
1131 increment: w,
1132 })
1133 }
1134
1135 t.controlBuf.put(bdpPing)
1136 }
1137 // Select the right stream to dispatch.
1138 s := t.getStream(f)
1139 if s == nil {
1140 return
1141 }
1142 if size > 0 {
1143 if err := s.fc.onData(size); err != nil {
1144 t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
1145 return
1146 }
1147 if f.Header().Flags.Has(http2.FlagDataPadded) {
1148 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
1149 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
1150 }
1151 }
1152 // TODO(bradfitz, zhaoq): A copy is required here because there is no
1153 // guarantee f.Data() is consumed before the arrival of next frame.
1154 // Can this copy be eliminated?
1155 if len(f.Data()) > 0 {
1156 buffer := t.bufferPool.get()
1157 buffer.Reset()
1158 buffer.Write(f.Data())
1159 s.write(recvMsg{buffer: buffer})
1160 }
1161 }
1162 // The server has closed the stream without sending trailers. Record that
1163 // the read direction is closed, and set the status appropriately.
khenaidoo5cb0d402021-12-08 14:09:16 -05001164 if f.StreamEnded() {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001165 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
1166 }
1167}
1168
1169func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
1170 s := t.getStream(f)
1171 if s == nil {
1172 return
1173 }
1174 if f.ErrCode == http2.ErrCodeRefusedStream {
1175 // The stream was unprocessed by the server.
1176 atomic.StoreUint32(&s.unprocessed, 1)
1177 }
1178 statusCode, ok := http2ErrConvTab[f.ErrCode]
1179 if !ok {
Akash Kankanala761955c2024-02-21 19:32:20 +05301180 if t.logger.V(logLevel) {
1181 t.logger.Infof("Received a RST_STREAM frame with code %q, but found no mapped gRPC status", f.ErrCode)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001182 }
1183 statusCode = codes.Unknown
1184 }
1185 if statusCode == codes.Canceled {
1186 if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
1187 // Our deadline was already exceeded, and that was likely the cause
1188 // of this cancelation. Alter the status code accordingly.
1189 statusCode = codes.DeadlineExceeded
1190 }
1191 }
1192 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
1193}
1194
1195func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
1196 if f.IsAck() {
1197 return
1198 }
1199 var maxStreams *uint32
1200 var ss []http2.Setting
1201 var updateFuncs []func()
1202 f.ForeachSetting(func(s http2.Setting) error {
1203 switch s.ID {
1204 case http2.SettingMaxConcurrentStreams:
1205 maxStreams = new(uint32)
1206 *maxStreams = s.Val
1207 case http2.SettingMaxHeaderListSize:
1208 updateFuncs = append(updateFuncs, func() {
1209 t.maxSendHeaderListSize = new(uint32)
1210 *t.maxSendHeaderListSize = s.Val
1211 })
1212 default:
1213 ss = append(ss, s)
1214 }
1215 return nil
1216 })
1217 if isFirst && maxStreams == nil {
1218 maxStreams = new(uint32)
1219 *maxStreams = math.MaxUint32
1220 }
1221 sf := &incomingSettings{
1222 ss: ss,
1223 }
1224 if maxStreams != nil {
1225 updateStreamQuota := func() {
1226 delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
1227 t.maxConcurrentStreams = *maxStreams
1228 t.streamQuota += delta
1229 if delta > 0 && t.waitingStreams > 0 {
1230 close(t.streamsQuotaAvailable) // wake all of them up.
1231 t.streamsQuotaAvailable = make(chan struct{}, 1)
1232 }
1233 }
1234 updateFuncs = append(updateFuncs, updateStreamQuota)
1235 }
1236 t.controlBuf.executeAndPut(func(interface{}) bool {
1237 for _, f := range updateFuncs {
1238 f()
1239 }
1240 return true
1241 }, sf)
1242}
1243
1244func (t *http2Client) handlePing(f *http2.PingFrame) {
1245 if f.IsAck() {
1246 // Maybe it's a BDP ping.
1247 if t.bdpEst != nil {
1248 t.bdpEst.calculate(f.Data)
1249 }
1250 return
1251 }
1252 pingAck := &ping{ack: true}
1253 copy(pingAck.data[:], f.Data[:])
1254 t.controlBuf.put(pingAck)
1255}
1256
1257func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
1258 t.mu.Lock()
1259 if t.state == closing {
1260 t.mu.Unlock()
1261 return
1262 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301263 if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
1264 // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1265 // data equal to ASCII "too_many_pings", it should log the occurrence at a log level that is
1266 // enabled by default and double the configure KEEPALIVE_TIME used for new connections
1267 // on that channel.
1268 logger.Errorf("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\".")
khenaidoo5fc5cea2021-08-11 17:39:16 -04001269 }
1270 id := f.LastStreamID
1271 if id > 0 && id%2 == 0 {
1272 t.mu.Unlock()
1273 t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
1274 return
1275 }
1276 // A client can receive multiple GoAways from the server (see
1277 // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
1278 // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
1279 // sent after an RTT delay with the ID of the last stream the server will
1280 // process.
1281 //
1282 // Therefore, when we get the first GoAway we don't necessarily close any
1283 // streams. While in case of second GoAway we close all streams created after
1284 // the GoAwayId. This way streams that were in-flight while the GoAway from
1285 // server was being sent don't get killed.
1286 select {
1287 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1288 // If there are multiple GoAways the first one should always have an ID greater than the following ones.
1289 if id > t.prevGoAwayID {
1290 t.mu.Unlock()
1291 t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
1292 return
1293 }
1294 default:
1295 t.setGoAwayReason(f)
1296 close(t.goAway)
Akash Kankanala761955c2024-02-21 19:32:20 +05301297 defer t.controlBuf.put(&incomingGoAway{}) // Defer as t.mu is currently held.
khenaidoo5fc5cea2021-08-11 17:39:16 -04001298 // Notify the clientconn about the GOAWAY before we set the state to
1299 // draining, to allow the client to stop attempting to create streams
1300 // before disallowing new streams on this connection.
Akash Kankanala761955c2024-02-21 19:32:20 +05301301 if t.state != draining {
1302 t.onClose(t.goAwayReason)
1303 t.state = draining
1304 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001305 }
1306 // All streams with IDs greater than the GoAwayId
1307 // and smaller than the previous GoAway ID should be killed.
1308 upperLimit := t.prevGoAwayID
1309 if upperLimit == 0 { // This is the first GoAway Frame.
1310 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1311 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301312
1313 t.prevGoAwayID = id
1314 if len(t.activeStreams) == 0 {
1315 t.mu.Unlock()
1316 t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
1317 return
1318 }
1319
1320 streamsToClose := make([]*Stream, 0)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001321 for streamID, stream := range t.activeStreams {
1322 if streamID > id && streamID <= upperLimit {
1323 // The stream was unprocessed by the server.
Akash Kankanala761955c2024-02-21 19:32:20 +05301324 if streamID > id && streamID <= upperLimit {
1325 atomic.StoreUint32(&stream.unprocessed, 1)
1326 streamsToClose = append(streamsToClose, stream)
1327 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001328 }
1329 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001330 t.mu.Unlock()
Akash Kankanala761955c2024-02-21 19:32:20 +05301331 // Called outside t.mu because closeStream can take controlBuf's mu, which
1332 // could induce deadlock and is not allowed.
1333 for _, stream := range streamsToClose {
1334 t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001335 }
1336}
1337
1338// setGoAwayReason sets the value of t.goAwayReason based
1339// on the GoAway frame received.
Akash Kankanala761955c2024-02-21 19:32:20 +05301340// It expects a lock on transport's mutex to be held by
khenaidoo5fc5cea2021-08-11 17:39:16 -04001341// the caller.
1342func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1343 t.goAwayReason = GoAwayNoReason
1344 switch f.ErrCode {
1345 case http2.ErrCodeEnhanceYourCalm:
1346 if string(f.DebugData()) == "too_many_pings" {
1347 t.goAwayReason = GoAwayTooManyPings
1348 }
1349 }
1350 if len(f.DebugData()) == 0 {
1351 t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
1352 } else {
1353 t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData()))
1354 }
1355}
1356
1357func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
1358 t.mu.Lock()
1359 defer t.mu.Unlock()
1360 return t.goAwayReason, t.goAwayDebugMessage
1361}
1362
1363func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1364 t.controlBuf.put(&incomingWindowUpdate{
1365 streamID: f.Header().StreamID,
1366 increment: f.Increment,
1367 })
1368}
1369
1370// operateHeaders takes action on the decoded headers.
1371func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1372 s := t.getStream(frame)
1373 if s == nil {
1374 return
1375 }
1376 endStream := frame.StreamEnded()
1377 atomic.StoreUint32(&s.bytesReceived, 1)
1378 initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
1379
1380 if !initialHeader && !endStream {
1381 // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
1382 st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
1383 t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
1384 return
1385 }
1386
1387 // frame.Truncated is set to true when framer detects that the current header
1388 // list size hits MaxHeaderListSize limit.
1389 if frame.Truncated {
1390 se := status.New(codes.Internal, "peer header list size exceeded limit")
1391 t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream)
1392 return
1393 }
1394
1395 var (
1396 // If a gRPC Response-Headers has already been received, then it means
1397 // that the peer is speaking gRPC and we are in gRPC mode.
1398 isGRPC = !initialHeader
1399 mdata = make(map[string][]string)
1400 contentTypeErr = "malformed header: missing HTTP content-type"
1401 grpcMessage string
1402 statusGen *status.Status
1403 recvCompress string
1404 httpStatusCode *int
1405 httpStatusErr string
1406 rawStatusCode = codes.Unknown
1407 // headerError is set if an error is encountered while parsing the headers
1408 headerError string
1409 )
1410
1411 if initialHeader {
1412 httpStatusErr = "malformed header: missing HTTP status"
1413 }
1414
1415 for _, hf := range frame.Fields {
1416 switch hf.Name {
1417 case "content-type":
1418 if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType {
1419 contentTypeErr = fmt.Sprintf("transport: received unexpected content-type %q", hf.Value)
1420 break
1421 }
1422 contentTypeErr = ""
1423 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
1424 isGRPC = true
1425 case "grpc-encoding":
1426 recvCompress = hf.Value
1427 case "grpc-status":
1428 code, err := strconv.ParseInt(hf.Value, 10, 32)
1429 if err != nil {
1430 se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err))
1431 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1432 return
1433 }
1434 rawStatusCode = codes.Code(uint32(code))
1435 case "grpc-message":
1436 grpcMessage = decodeGrpcMessage(hf.Value)
1437 case "grpc-status-details-bin":
1438 var err error
1439 statusGen, err = decodeGRPCStatusDetails(hf.Value)
1440 if err != nil {
1441 headerError = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)
1442 }
1443 case ":status":
1444 if hf.Value == "200" {
1445 httpStatusErr = ""
1446 statusCode := 200
1447 httpStatusCode = &statusCode
1448 break
1449 }
1450
1451 c, err := strconv.ParseInt(hf.Value, 10, 32)
1452 if err != nil {
1453 se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
1454 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1455 return
1456 }
1457 statusCode := int(c)
1458 httpStatusCode = &statusCode
1459
1460 httpStatusErr = fmt.Sprintf(
1461 "unexpected HTTP status code received from server: %d (%s)",
1462 statusCode,
1463 http.StatusText(statusCode),
1464 )
1465 default:
1466 if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
1467 break
1468 }
1469 v, err := decodeMetadataHeader(hf.Name, hf.Value)
1470 if err != nil {
1471 headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
1472 logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
1473 break
1474 }
1475 mdata[hf.Name] = append(mdata[hf.Name], v)
1476 }
1477 }
1478
1479 if !isGRPC || httpStatusErr != "" {
1480 var code = codes.Internal // when header does not include HTTP status, return INTERNAL
1481
1482 if httpStatusCode != nil {
1483 var ok bool
1484 code, ok = HTTPStatusConvTab[*httpStatusCode]
1485 if !ok {
1486 code = codes.Unknown
1487 }
1488 }
1489 var errs []string
1490 if httpStatusErr != "" {
1491 errs = append(errs, httpStatusErr)
1492 }
1493 if contentTypeErr != "" {
1494 errs = append(errs, contentTypeErr)
1495 }
1496 // Verify the HTTP response is a 200.
1497 se := status.New(code, strings.Join(errs, "; "))
1498 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1499 return
1500 }
1501
1502 if headerError != "" {
1503 se := status.New(codes.Internal, headerError)
1504 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1505 return
1506 }
1507
1508 isHeader := false
khenaidoo5fc5cea2021-08-11 17:39:16 -04001509
1510 // If headerChan hasn't been closed yet
1511 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
1512 s.headerValid = true
1513 if !endStream {
1514 // HEADERS frame block carries a Response-Headers.
1515 isHeader = true
1516 // These values can be set without any synchronization because
1517 // stream goroutine will read it only after seeing a closed
1518 // headerChan which we'll close after setting this.
1519 s.recvCompress = recvCompress
1520 if len(mdata) > 0 {
1521 s.header = mdata
1522 }
1523 } else {
1524 // HEADERS frame block carries a Trailers-Only.
1525 s.noHeaders = true
1526 }
1527 close(s.headerChan)
1528 }
1529
Akash Kankanala761955c2024-02-21 19:32:20 +05301530 for _, sh := range t.statsHandlers {
khenaidoo5cb0d402021-12-08 14:09:16 -05001531 if isHeader {
1532 inHeader := &stats.InHeader{
1533 Client: true,
1534 WireLength: int(frame.Header().Length),
1535 Header: metadata.MD(mdata).Copy(),
1536 Compression: s.recvCompress,
1537 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301538 sh.HandleRPC(s.ctx, inHeader)
khenaidoo5cb0d402021-12-08 14:09:16 -05001539 } else {
1540 inTrailer := &stats.InTrailer{
1541 Client: true,
1542 WireLength: int(frame.Header().Length),
1543 Trailer: metadata.MD(mdata).Copy(),
1544 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301545 sh.HandleRPC(s.ctx, inTrailer)
khenaidoo5cb0d402021-12-08 14:09:16 -05001546 }
1547 }
1548
khenaidoo5fc5cea2021-08-11 17:39:16 -04001549 if !endStream {
1550 return
1551 }
1552
1553 if statusGen == nil {
1554 statusGen = status.New(rawStatusCode, grpcMessage)
1555 }
1556
1557 // if client received END_STREAM from server while stream was still active, send RST_STREAM
1558 rst := s.getState() == streamActive
1559 t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
1560}
1561
Akash Kankanala761955c2024-02-21 19:32:20 +05301562// readServerPreface reads and handles the initial settings frame from the
1563// server.
1564func (t *http2Client) readServerPreface() error {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001565 frame, err := t.framer.fr.ReadFrame()
1566 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +05301567 return connectionErrorf(true, err, "error reading server preface: %v", err)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001568 }
1569 sf, ok := frame.(*http2.SettingsFrame)
1570 if !ok {
Akash Kankanala761955c2024-02-21 19:32:20 +05301571 return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame)
1572 }
1573 t.handleSettings(sf, true)
1574 return nil
1575}
1576
1577// reader verifies the server preface and reads all subsequent data from
1578// network connection. If the server preface is not read successfully, an
1579// error is pushed to errCh; otherwise errCh is closed with no error.
1580func (t *http2Client) reader(errCh chan<- error) {
1581 defer close(t.readerDone)
1582
1583 if err := t.readServerPreface(); err != nil {
1584 errCh <- err
khenaidoo5fc5cea2021-08-11 17:39:16 -04001585 return
1586 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301587 close(errCh)
1588 if t.keepaliveEnabled {
1589 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
1590 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001591
1592 // loop to keep reading incoming messages on this transport.
1593 for {
1594 t.controlBuf.throttle()
1595 frame, err := t.framer.fr.ReadFrame()
1596 if t.keepaliveEnabled {
1597 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
1598 }
1599 if err != nil {
1600 // Abort an active stream if the http2.Framer returns a
1601 // http2.StreamError. This can happen only if the server's response
1602 // is malformed http2.
1603 if se, ok := err.(http2.StreamError); ok {
1604 t.mu.Lock()
1605 s := t.activeStreams[se.StreamID]
1606 t.mu.Unlock()
1607 if s != nil {
1608 // use error detail to provide better err message
1609 code := http2ErrConvTab[se.Code]
1610 errorDetail := t.framer.fr.ErrorDetail()
1611 var msg string
1612 if errorDetail != nil {
1613 msg = errorDetail.Error()
1614 } else {
1615 msg = "received invalid frame"
1616 }
1617 t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
1618 }
1619 continue
1620 } else {
1621 // Transport error.
1622 t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
1623 return
1624 }
1625 }
1626 switch frame := frame.(type) {
1627 case *http2.MetaHeadersFrame:
1628 t.operateHeaders(frame)
1629 case *http2.DataFrame:
1630 t.handleData(frame)
1631 case *http2.RSTStreamFrame:
1632 t.handleRSTStream(frame)
1633 case *http2.SettingsFrame:
1634 t.handleSettings(frame, false)
1635 case *http2.PingFrame:
1636 t.handlePing(frame)
1637 case *http2.GoAwayFrame:
1638 t.handleGoAway(frame)
1639 case *http2.WindowUpdateFrame:
1640 t.handleWindowUpdate(frame)
1641 default:
1642 if logger.V(logLevel) {
1643 logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1644 }
1645 }
1646 }
1647}
1648
1649func minTime(a, b time.Duration) time.Duration {
1650 if a < b {
1651 return a
1652 }
1653 return b
1654}
1655
khenaidoo257f3192021-12-15 16:46:37 -05001656// keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
khenaidoo5fc5cea2021-08-11 17:39:16 -04001657func (t *http2Client) keepalive() {
1658 p := &ping{data: [8]byte{}}
1659 // True iff a ping has been sent, and no data has been received since then.
1660 outstandingPing := false
1661 // Amount of time remaining before which we should receive an ACK for the
1662 // last sent ping.
1663 timeoutLeft := time.Duration(0)
1664 // Records the last value of t.lastRead before we go block on the timer.
1665 // This is required to check for read activity since then.
1666 prevNano := time.Now().UnixNano()
1667 timer := time.NewTimer(t.kp.Time)
1668 for {
1669 select {
1670 case <-timer.C:
1671 lastRead := atomic.LoadInt64(&t.lastRead)
1672 if lastRead > prevNano {
1673 // There has been read activity since the last time we were here.
1674 outstandingPing = false
1675 // Next timer should fire at kp.Time seconds from lastRead time.
1676 timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1677 prevNano = lastRead
1678 continue
1679 }
1680 if outstandingPing && timeoutLeft <= 0 {
1681 t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
1682 return
1683 }
1684 t.mu.Lock()
1685 if t.state == closing {
1686 // If the transport is closing, we should exit from the
1687 // keepalive goroutine here. If not, we could have a race
1688 // between the call to Signal() from Close() and the call to
1689 // Wait() here, whereby the keepalive goroutine ends up
1690 // blocking on the condition variable which will never be
1691 // signalled again.
1692 t.mu.Unlock()
1693 return
1694 }
1695 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1696 // If a ping was sent out previously (because there were active
1697 // streams at that point) which wasn't acked and its timeout
1698 // hadn't fired, but we got here and are about to go dormant,
1699 // we should make sure that we unconditionally send a ping once
1700 // we awaken.
1701 outstandingPing = false
1702 t.kpDormant = true
1703 t.kpDormancyCond.Wait()
1704 }
1705 t.kpDormant = false
1706 t.mu.Unlock()
1707
1708 // We get here either because we were dormant and a new stream was
1709 // created which unblocked the Wait() call, or because the
1710 // keepalive timer expired. In both cases, we need to send a ping.
1711 if !outstandingPing {
1712 if channelz.IsOn() {
1713 atomic.AddInt64(&t.czData.kpCount, 1)
1714 }
1715 t.controlBuf.put(p)
1716 timeoutLeft = t.kp.Timeout
1717 outstandingPing = true
1718 }
1719 // The amount of time to sleep here is the minimum of kp.Time and
1720 // timeoutLeft. This will ensure that we wait only for kp.Time
1721 // before sending out the next ping (for cases where the ping is
1722 // acked).
1723 sleepDuration := minTime(t.kp.Time, timeoutLeft)
1724 timeoutLeft -= sleepDuration
1725 timer.Reset(sleepDuration)
1726 case <-t.ctx.Done():
1727 if !timer.Stop() {
1728 <-timer.C
1729 }
1730 return
1731 }
1732 }
1733}
1734
1735func (t *http2Client) Error() <-chan struct{} {
1736 return t.ctx.Done()
1737}
1738
1739func (t *http2Client) GoAway() <-chan struct{} {
1740 return t.goAway
1741}
1742
1743func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
1744 s := channelz.SocketInternalMetric{
1745 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1746 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1747 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1748 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1749 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1750 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1751 LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1752 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1753 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1754 LocalFlowControlWindow: int64(t.fc.getSize()),
1755 SocketOptions: channelz.GetSocketOption(t.conn),
1756 LocalAddr: t.localAddr,
1757 RemoteAddr: t.remoteAddr,
1758 // RemoteName :
1759 }
1760 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1761 s.Security = au.GetSecurityValue()
1762 }
1763 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1764 return &s
1765}
1766
1767func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
1768
1769func (t *http2Client) IncrMsgSent() {
1770 atomic.AddInt64(&t.czData.msgSent, 1)
1771 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1772}
1773
1774func (t *http2Client) IncrMsgRecv() {
1775 atomic.AddInt64(&t.czData.msgRecv, 1)
1776 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1777}
1778
1779func (t *http2Client) getOutFlowWindow() int64 {
1780 resp := make(chan uint32, 1)
1781 timer := time.NewTimer(time.Second)
1782 defer timer.Stop()
1783 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1784 select {
1785 case sz := <-resp:
1786 return int64(sz)
1787 case <-t.ctxDone:
1788 return -1
1789 case <-timer.C:
1790 return -2
1791 }
1792}
Akash Kankanala761955c2024-02-21 19:32:20 +05301793
1794func (t *http2Client) stateForTesting() transportState {
1795 t.mu.Lock()
1796 defer t.mu.Unlock()
1797 return t.state
1798}