blob: 75586307435a01494b226845788c0095007000e2 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "context"
23 "fmt"
24 "io"
25 "math"
26 "net"
27 "net/http"
28 "strconv"
29 "strings"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "golang.org/x/net/http2"
35 "golang.org/x/net/http2/hpack"
36 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/internal/channelz"
39 icredentials "google.golang.org/grpc/internal/credentials"
40 "google.golang.org/grpc/internal/grpcutil"
41 imetadata "google.golang.org/grpc/internal/metadata"
42 "google.golang.org/grpc/internal/syscall"
43 "google.golang.org/grpc/internal/transport/networktype"
44 "google.golang.org/grpc/keepalive"
45 "google.golang.org/grpc/metadata"
46 "google.golang.org/grpc/peer"
47 "google.golang.org/grpc/resolver"
48 "google.golang.org/grpc/stats"
49 "google.golang.org/grpc/status"
50)
51
52// clientConnectionCounter counts the number of connections a client has
53// initiated (equal to the number of http2Clients created). Must be accessed
54// atomically.
55var clientConnectionCounter uint64
56
57// http2Client implements the ClientTransport interface with HTTP2.
58type http2Client struct {
59 lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
60 ctx context.Context
61 cancel context.CancelFunc
62 ctxDone <-chan struct{} // Cache the ctx.Done() chan.
63 userAgent string
64 md metadata.MD
65 conn net.Conn // underlying communication channel
66 loopy *loopyWriter
67 remoteAddr net.Addr
68 localAddr net.Addr
69 authInfo credentials.AuthInfo // auth info about the connection
70
71 readerDone chan struct{} // sync point to enable testing.
72 writerDone chan struct{} // sync point to enable testing.
73 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
74 // that the server sent GoAway on this transport.
75 goAway chan struct{}
76
77 framer *framer
78 // controlBuf delivers all the control related tasks (e.g., window
79 // updates, reset streams, and various settings) to the controller.
80 controlBuf *controlBuffer
81 fc *trInFlow
82 // The scheme used: https if TLS is on, http otherwise.
83 scheme string
84
85 isSecure bool
86
87 perRPCCreds []credentials.PerRPCCredentials
88
89 kp keepalive.ClientParameters
90 keepaliveEnabled bool
91
92 statsHandler stats.Handler
93
94 initialWindowSize int32
95
96 // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
97 maxSendHeaderListSize *uint32
98
99 bdpEst *bdpEstimator
100 // onPrefaceReceipt is a callback that client transport calls upon
101 // receiving server preface to signal that a succefull HTTP2
102 // connection was established.
103 onPrefaceReceipt func()
104
105 maxConcurrentStreams uint32
106 streamQuota int64
107 streamsQuotaAvailable chan struct{}
108 waitingStreams uint32
109 nextID uint32
110
111 mu sync.Mutex // guard the following variables
112 state transportState
113 activeStreams map[uint32]*Stream
114 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
115 prevGoAwayID uint32
116 // goAwayReason records the http2.ErrCode and debug data received with the
117 // GoAway frame.
118 goAwayReason GoAwayReason
119 // goAwayDebugMessage contains a detailed human readable string about a
120 // GoAway frame, useful for error messages.
121 goAwayDebugMessage string
122 // A condition variable used to signal when the keepalive goroutine should
123 // go dormant. The condition for dormancy is based on the number of active
124 // streams and the `PermitWithoutStream` keepalive client parameter. And
125 // since the number of active streams is guarded by the above mutex, we use
126 // the same for this condition variable as well.
127 kpDormancyCond *sync.Cond
128 // A boolean to track whether the keepalive goroutine is dormant or not.
129 // This is checked before attempting to signal the above condition
130 // variable.
131 kpDormant bool
132
133 // Fields below are for channelz metric collection.
134 channelzID int64 // channelz unique identification number
135 czData *channelzData
136
137 onGoAway func(GoAwayReason)
138 onClose func()
139
140 bufferPool *bufferPool
141
142 connectionID uint64
143}
144
145func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
146 address := addr.Addr
147 networkType, ok := networktype.Get(addr)
148 if fn != nil {
149 if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
150 // For backward compatibility, if the user dialed "unix:///path",
151 // the passthrough resolver would be used and the user's custom
152 // dialer would see "unix:///path". Since the unix resolver is used
153 // and the address is now "/path", prepend "unix://" so the user's
154 // custom dialer sees the same address.
155 return fn(ctx, "unix://"+address)
156 }
157 return fn(ctx, address)
158 }
159 if !ok {
160 networkType, address = parseDialTarget(address)
161 }
162 if networkType == "tcp" && useProxy {
163 return proxyDial(ctx, address, grpcUA)
164 }
165 return (&net.Dialer{}).DialContext(ctx, networkType, address)
166}
167
168func isTemporary(err error) bool {
169 switch err := err.(type) {
170 case interface {
171 Temporary() bool
172 }:
173 return err.Temporary()
174 case interface {
175 Timeout() bool
176 }:
177 // Timeouts may be resolved upon retry, and are thus treated as
178 // temporary.
179 return err.Timeout()
180 }
181 return true
182}
183
184// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
185// and starts to receive messages on it. Non-nil error returns if construction
186// fails.
187func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
188 scheme := "http"
189 ctx, cancel := context.WithCancel(ctx)
190 defer func() {
191 if err != nil {
192 cancel()
193 }
194 }()
195
196 conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
197 if err != nil {
198 if opts.FailOnNonTempDialError {
199 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
200 }
201 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
202 }
203 // Any further errors will close the underlying connection
204 defer func(conn net.Conn) {
205 if err != nil {
206 conn.Close()
207 }
208 }(conn)
209 kp := opts.KeepaliveParams
210 // Validate keepalive parameters.
211 if kp.Time == 0 {
212 kp.Time = defaultClientKeepaliveTime
213 }
214 if kp.Timeout == 0 {
215 kp.Timeout = defaultClientKeepaliveTimeout
216 }
217 keepaliveEnabled := false
218 if kp.Time != infinity {
219 if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
220 return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
221 }
222 keepaliveEnabled = true
223 }
224 var (
225 isSecure bool
226 authInfo credentials.AuthInfo
227 )
228 transportCreds := opts.TransportCredentials
229 perRPCCreds := opts.PerRPCCredentials
230
231 if b := opts.CredsBundle; b != nil {
232 if t := b.TransportCredentials(); t != nil {
233 transportCreds = t
234 }
235 if t := b.PerRPCCredentials(); t != nil {
236 perRPCCreds = append(perRPCCreds, t)
237 }
238 }
239 if transportCreds != nil {
240 // gRPC, resolver, balancer etc. can specify arbitrary data in the
241 // Attributes field of resolver.Address, which is shoved into connectCtx
242 // and passed to the credential handshaker. This makes it possible for
243 // address specific arbitrary data to reach the credential handshaker.
244 connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
245 rawConn := conn
246 // Pull the deadline from the connectCtx, which will be used for
247 // timeouts in the authentication protocol handshake. Can ignore the
248 // boolean as the deadline will return the zero value, which will make
249 // the conn not timeout on I/O operations.
250 deadline, _ := connectCtx.Deadline()
251 rawConn.SetDeadline(deadline)
252 conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, rawConn)
253 rawConn.SetDeadline(time.Time{})
254 if err != nil {
255 return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
256 }
257 for _, cd := range perRPCCreds {
258 if cd.RequireTransportSecurity() {
259 if ci, ok := authInfo.(interface {
260 GetCommonAuthInfo() credentials.CommonAuthInfo
261 }); ok {
262 secLevel := ci.GetCommonAuthInfo().SecurityLevel
263 if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
264 return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
265 }
266 }
267 }
268 }
269 isSecure = true
270 if transportCreds.Info().SecurityProtocol == "tls" {
271 scheme = "https"
272 }
273 }
274 dynamicWindow := true
275 icwz := int32(initialWindowSize)
276 if opts.InitialConnWindowSize >= defaultWindowSize {
277 icwz = opts.InitialConnWindowSize
278 dynamicWindow = false
279 }
280 writeBufSize := opts.WriteBufferSize
281 readBufSize := opts.ReadBufferSize
282 maxHeaderListSize := defaultClientMaxHeaderListSize
283 if opts.MaxHeaderListSize != nil {
284 maxHeaderListSize = *opts.MaxHeaderListSize
285 }
286 t := &http2Client{
287 ctx: ctx,
288 ctxDone: ctx.Done(), // Cache Done chan.
289 cancel: cancel,
290 userAgent: opts.UserAgent,
291 conn: conn,
292 remoteAddr: conn.RemoteAddr(),
293 localAddr: conn.LocalAddr(),
294 authInfo: authInfo,
295 readerDone: make(chan struct{}),
296 writerDone: make(chan struct{}),
297 goAway: make(chan struct{}),
298 framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
299 fc: &trInFlow{limit: uint32(icwz)},
300 scheme: scheme,
301 activeStreams: make(map[uint32]*Stream),
302 isSecure: isSecure,
303 perRPCCreds: perRPCCreds,
304 kp: kp,
305 statsHandler: opts.StatsHandler,
306 initialWindowSize: initialWindowSize,
307 onPrefaceReceipt: onPrefaceReceipt,
308 nextID: 1,
309 maxConcurrentStreams: defaultMaxStreamsClient,
310 streamQuota: defaultMaxStreamsClient,
311 streamsQuotaAvailable: make(chan struct{}, 1),
312 czData: new(channelzData),
313 onGoAway: onGoAway,
314 onClose: onClose,
315 keepaliveEnabled: keepaliveEnabled,
316 bufferPool: newBufferPool(),
317 }
318
319 if md, ok := addr.Metadata.(*metadata.MD); ok {
320 t.md = *md
321 } else if md := imetadata.Get(addr); md != nil {
322 t.md = md
323 }
324 t.controlBuf = newControlBuffer(t.ctxDone)
325 if opts.InitialWindowSize >= defaultWindowSize {
326 t.initialWindowSize = opts.InitialWindowSize
327 dynamicWindow = false
328 }
329 if dynamicWindow {
330 t.bdpEst = &bdpEstimator{
331 bdp: initialWindowSize,
332 updateFlowControl: t.updateFlowControl,
333 }
334 }
335 if t.statsHandler != nil {
336 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
337 RemoteAddr: t.remoteAddr,
338 LocalAddr: t.localAddr,
339 })
340 connBegin := &stats.ConnBegin{
341 Client: true,
342 }
343 t.statsHandler.HandleConn(t.ctx, connBegin)
344 }
345 if channelz.IsOn() {
346 t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
347 }
348 if t.keepaliveEnabled {
349 t.kpDormancyCond = sync.NewCond(&t.mu)
350 go t.keepalive()
351 }
352 // Start the reader goroutine for incoming message. Each transport has
353 // a dedicated goroutine which reads HTTP2 frame from network. Then it
354 // dispatches the frame to the corresponding stream entity.
355 go t.reader()
356
357 // Send connection preface to server.
358 n, err := t.conn.Write(clientPreface)
359 if err != nil {
360 err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
361 t.Close(err)
362 return nil, err
363 }
364 if n != len(clientPreface) {
365 err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
366 t.Close(err)
367 return nil, err
368 }
369 var ss []http2.Setting
370
371 if t.initialWindowSize != defaultWindowSize {
372 ss = append(ss, http2.Setting{
373 ID: http2.SettingInitialWindowSize,
374 Val: uint32(t.initialWindowSize),
375 })
376 }
377 if opts.MaxHeaderListSize != nil {
378 ss = append(ss, http2.Setting{
379 ID: http2.SettingMaxHeaderListSize,
380 Val: *opts.MaxHeaderListSize,
381 })
382 }
383 err = t.framer.fr.WriteSettings(ss...)
384 if err != nil {
385 err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
386 t.Close(err)
387 return nil, err
388 }
389 // Adjust the connection flow control window if needed.
390 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
391 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
392 err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
393 t.Close(err)
394 return nil, err
395 }
396 }
397
398 t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
399
400 if err := t.framer.writer.Flush(); err != nil {
401 return nil, err
402 }
403 go func() {
404 t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
405 err := t.loopy.run()
406 if err != nil {
407 if logger.V(logLevel) {
408 logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
409 }
410 }
411 // Do not close the transport. Let reader goroutine handle it since
412 // there might be data in the buffers.
413 t.conn.Close()
414 t.controlBuf.finish()
415 close(t.writerDone)
416 }()
417 return t, nil
418}
419
420func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
421 // TODO(zhaoq): Handle uint32 overflow of Stream.id.
422 s := &Stream{
423 ct: t,
424 done: make(chan struct{}),
425 method: callHdr.Method,
426 sendCompress: callHdr.SendCompress,
427 buf: newRecvBuffer(),
428 headerChan: make(chan struct{}),
429 contentSubtype: callHdr.ContentSubtype,
430 doneFunc: callHdr.DoneFunc,
431 }
432 s.wq = newWriteQuota(defaultWriteQuota, s.done)
433 s.requestRead = func(n int) {
434 t.adjustWindow(s, uint32(n))
435 }
436 // The client side stream context should have exactly the same life cycle with the user provided context.
437 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
438 // So we use the original context here instead of creating a copy.
439 s.ctx = ctx
440 s.trReader = &transportReader{
441 reader: &recvBufferReader{
442 ctx: s.ctx,
443 ctxDone: s.ctx.Done(),
444 recv: s.buf,
445 closeStream: func(err error) {
446 t.CloseStream(s, err)
447 },
448 freeBuffer: t.bufferPool.put,
449 },
450 windowHandler: func(n int) {
451 t.updateWindow(s, uint32(n))
452 },
453 }
454 return s
455}
456
457func (t *http2Client) getPeer() *peer.Peer {
458 return &peer.Peer{
459 Addr: t.remoteAddr,
460 AuthInfo: t.authInfo,
461 }
462}
463
464func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
465 aud := t.createAudience(callHdr)
466 ri := credentials.RequestInfo{
467 Method: callHdr.Method,
468 AuthInfo: t.authInfo,
469 }
470 ctxWithRequestInfo := icredentials.NewRequestInfoContext(ctx, ri)
471 authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
472 if err != nil {
473 return nil, err
474 }
475 callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
476 if err != nil {
477 return nil, err
478 }
479 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
480 // first and create a slice of that exact size.
481 // Make the slice of certain predictable size to reduce allocations made by append.
482 hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
483 hfLen += len(authData) + len(callAuthData)
484 headerFields := make([]hpack.HeaderField, 0, hfLen)
485 headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
486 headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
487 headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
488 headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
489 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(callHdr.ContentSubtype)})
490 headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
491 headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
492 if callHdr.PreviousAttempts > 0 {
493 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
494 }
495
496 if callHdr.SendCompress != "" {
497 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
498 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
499 }
500 if dl, ok := ctx.Deadline(); ok {
501 // Send out timeout regardless its value. The server can detect timeout context by itself.
502 // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
503 timeout := time.Until(dl)
504 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
505 }
506 for k, v := range authData {
507 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
508 }
509 for k, v := range callAuthData {
510 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
511 }
512 if b := stats.OutgoingTags(ctx); b != nil {
513 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
514 }
515 if b := stats.OutgoingTrace(ctx); b != nil {
516 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
517 }
518
519 if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
520 var k string
521 for k, vv := range md {
522 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
523 if isReservedHeader(k) {
524 continue
525 }
526 for _, v := range vv {
527 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
528 }
529 }
530 for _, vv := range added {
531 for i, v := range vv {
532 if i%2 == 0 {
533 k = strings.ToLower(v)
534 continue
535 }
536 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
537 if isReservedHeader(k) {
538 continue
539 }
540 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
541 }
542 }
543 }
544 for k, vv := range t.md {
545 if isReservedHeader(k) {
546 continue
547 }
548 for _, v := range vv {
549 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
550 }
551 }
552 return headerFields, nil
553}
554
555func (t *http2Client) createAudience(callHdr *CallHdr) string {
556 // Create an audience string only if needed.
557 if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
558 return ""
559 }
560 // Construct URI required to get auth request metadata.
561 // Omit port if it is the default one.
562 host := strings.TrimSuffix(callHdr.Host, ":443")
563 pos := strings.LastIndex(callHdr.Method, "/")
564 if pos == -1 {
565 pos = len(callHdr.Method)
566 }
567 return "https://" + host + callHdr.Method[:pos]
568}
569
570func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
571 if len(t.perRPCCreds) == 0 {
572 return nil, nil
573 }
574 authData := map[string]string{}
575 for _, c := range t.perRPCCreds {
576 data, err := c.GetRequestMetadata(ctx, audience)
577 if err != nil {
578 if _, ok := status.FromError(err); ok {
579 return nil, err
580 }
581
582 return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
583 }
584 for k, v := range data {
585 // Capital header names are illegal in HTTP/2.
586 k = strings.ToLower(k)
587 authData[k] = v
588 }
589 }
590 return authData, nil
591}
592
593func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
594 var callAuthData map[string]string
595 // Check if credentials.PerRPCCredentials were provided via call options.
596 // Note: if these credentials are provided both via dial options and call
597 // options, then both sets of credentials will be applied.
598 if callCreds := callHdr.Creds; callCreds != nil {
599 if callCreds.RequireTransportSecurity() {
600 ri, _ := credentials.RequestInfoFromContext(ctx)
601 if !t.isSecure || credentials.CheckSecurityLevel(ri.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
602 return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
603 }
604 }
605 data, err := callCreds.GetRequestMetadata(ctx, audience)
606 if err != nil {
607 return nil, status.Errorf(codes.Internal, "transport: %v", err)
608 }
609 callAuthData = make(map[string]string, len(data))
610 for k, v := range data {
611 // Capital header names are illegal in HTTP/2
612 k = strings.ToLower(k)
613 callAuthData[k] = v
614 }
615 }
616 return callAuthData, nil
617}
618
619// NewStreamError wraps an error and reports additional information. Typically
620// NewStream errors result in transparent retry, as they mean nothing went onto
621// the wire. However, there are two notable exceptions:
622//
623// 1. If the stream headers violate the max header list size allowed by the
624// server. In this case there is no reason to retry at all, as it is
625// assumed the RPC would continue to fail on subsequent attempts.
626// 2. If the credentials errored when requesting their headers. In this case,
627// it's possible a retry can fix the problem, but indefinitely transparently
628// retrying is not appropriate as it is likely the credentials, if they can
629// eventually succeed, would need I/O to do so.
630type NewStreamError struct {
631 Err error
632
633 DoNotRetry bool
634 DoNotTransparentRetry bool
635}
636
637func (e NewStreamError) Error() string {
638 return e.Err.Error()
639}
640
641// NewStream creates a stream and registers it into the transport as "active"
642// streams. All non-nil errors returned will be *NewStreamError.
643func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
644 ctx = peer.NewContext(ctx, t.getPeer())
645 headerFields, err := t.createHeaderFields(ctx, callHdr)
646 if err != nil {
647 return nil, &NewStreamError{Err: err, DoNotTransparentRetry: true}
648 }
649 s := t.newStream(ctx, callHdr)
650 cleanup := func(err error) {
651 if s.swapState(streamDone) == streamDone {
652 // If it was already done, return.
653 return
654 }
655 // The stream was unprocessed by the server.
656 atomic.StoreUint32(&s.unprocessed, 1)
657 s.write(recvMsg{err: err})
658 close(s.done)
659 // If headerChan isn't closed, then close it.
660 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
661 close(s.headerChan)
662 }
663 }
664 hdr := &headerFrame{
665 hf: headerFields,
666 endStream: false,
667 initStream: func(id uint32) error {
668 t.mu.Lock()
669 if state := t.state; state != reachable {
670 t.mu.Unlock()
671 // Do a quick cleanup.
672 err := error(errStreamDrain)
673 if state == closing {
674 err = ErrConnClosing
675 }
676 cleanup(err)
677 return err
678 }
679 t.activeStreams[id] = s
680 if channelz.IsOn() {
681 atomic.AddInt64(&t.czData.streamsStarted, 1)
682 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
683 }
684 // If the keepalive goroutine has gone dormant, wake it up.
685 if t.kpDormant {
686 t.kpDormancyCond.Signal()
687 }
688 t.mu.Unlock()
689 return nil
690 },
691 onOrphaned: cleanup,
692 wq: s.wq,
693 }
694 firstTry := true
695 var ch chan struct{}
696 checkForStreamQuota := func(it interface{}) bool {
697 if t.streamQuota <= 0 { // Can go negative if server decreases it.
698 if firstTry {
699 t.waitingStreams++
700 }
701 ch = t.streamsQuotaAvailable
702 return false
703 }
704 if !firstTry {
705 t.waitingStreams--
706 }
707 t.streamQuota--
708 h := it.(*headerFrame)
709 h.streamID = t.nextID
710 t.nextID += 2
711 s.id = h.streamID
712 s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
713 if t.streamQuota > 0 && t.waitingStreams > 0 {
714 select {
715 case t.streamsQuotaAvailable <- struct{}{}:
716 default:
717 }
718 }
719 return true
720 }
721 var hdrListSizeErr error
722 checkForHeaderListSize := func(it interface{}) bool {
723 if t.maxSendHeaderListSize == nil {
724 return true
725 }
726 hdrFrame := it.(*headerFrame)
727 var sz int64
728 for _, f := range hdrFrame.hf {
729 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
730 hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
731 return false
732 }
733 }
734 return true
735 }
736 for {
737 success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
738 if !checkForStreamQuota(it) {
739 return false
740 }
741 if !checkForHeaderListSize(it) {
742 return false
743 }
744 return true
745 }, hdr)
746 if err != nil {
747 return nil, &NewStreamError{Err: err}
748 }
749 if success {
750 break
751 }
752 if hdrListSizeErr != nil {
753 return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true}
754 }
755 firstTry = false
756 select {
757 case <-ch:
758 case <-ctx.Done():
759 return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
760 case <-t.goAway:
761 return nil, &NewStreamError{Err: errStreamDrain}
762 case <-t.ctx.Done():
763 return nil, &NewStreamError{Err: ErrConnClosing}
764 }
765 }
766 if t.statsHandler != nil {
767 header, ok := metadata.FromOutgoingContext(ctx)
768 if ok {
769 header.Set("user-agent", t.userAgent)
770 } else {
771 header = metadata.Pairs("user-agent", t.userAgent)
772 }
773 // Note: The header fields are compressed with hpack after this call returns.
774 // No WireLength field is set here.
775 outHeader := &stats.OutHeader{
776 Client: true,
777 FullMethod: callHdr.Method,
778 RemoteAddr: t.remoteAddr,
779 LocalAddr: t.localAddr,
780 Compression: callHdr.SendCompress,
781 Header: header,
782 }
783 t.statsHandler.HandleRPC(s.ctx, outHeader)
784 }
785 return s, nil
786}
787
788// CloseStream clears the footprint of a stream when the stream is not needed any more.
789// This must not be executed in reader's goroutine.
790func (t *http2Client) CloseStream(s *Stream, err error) {
791 var (
792 rst bool
793 rstCode http2.ErrCode
794 )
795 if err != nil {
796 rst = true
797 rstCode = http2.ErrCodeCancel
798 }
799 t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
800}
801
802func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
803 // Set stream status to done.
804 if s.swapState(streamDone) == streamDone {
805 // If it was already done, return. If multiple closeStream calls
806 // happen simultaneously, wait for the first to finish.
807 <-s.done
808 return
809 }
810 // status and trailers can be updated here without any synchronization because the stream goroutine will
811 // only read it after it sees an io.EOF error from read or write and we'll write those errors
812 // only after updating this.
813 s.status = st
814 if len(mdata) > 0 {
815 s.trailer = mdata
816 }
817 if err != nil {
818 // This will unblock reads eventually.
819 s.write(recvMsg{err: err})
820 }
821 // If headerChan isn't closed, then close it.
822 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
823 s.noHeaders = true
824 close(s.headerChan)
825 }
826 cleanup := &cleanupStream{
827 streamID: s.id,
828 onWrite: func() {
829 t.mu.Lock()
830 if t.activeStreams != nil {
831 delete(t.activeStreams, s.id)
832 }
833 t.mu.Unlock()
834 if channelz.IsOn() {
835 if eosReceived {
836 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
837 } else {
838 atomic.AddInt64(&t.czData.streamsFailed, 1)
839 }
840 }
841 },
842 rst: rst,
843 rstCode: rstCode,
844 }
845 addBackStreamQuota := func(interface{}) bool {
846 t.streamQuota++
847 if t.streamQuota > 0 && t.waitingStreams > 0 {
848 select {
849 case t.streamsQuotaAvailable <- struct{}{}:
850 default:
851 }
852 }
853 return true
854 }
855 t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
856 // This will unblock write.
857 close(s.done)
858 if s.doneFunc != nil {
859 s.doneFunc()
860 }
861}
862
863// Close kicks off the shutdown process of the transport. This should be called
864// only once on a transport. Once it is called, the transport should not be
865// accessed any more.
866//
867// This method blocks until the addrConn that initiated this transport is
868// re-connected. This happens because t.onClose() begins reconnect logic at the
869// addrConn level and blocks until the addrConn is successfully connected.
870func (t *http2Client) Close(err error) {
871 t.mu.Lock()
872 // Make sure we only Close once.
873 if t.state == closing {
874 t.mu.Unlock()
875 return
876 }
877 // Call t.onClose before setting the state to closing to prevent the client
878 // from attempting to create new streams ASAP.
879 t.onClose()
880 t.state = closing
881 streams := t.activeStreams
882 t.activeStreams = nil
883 if t.kpDormant {
884 // If the keepalive goroutine is blocked on this condition variable, we
885 // should unblock it so that the goroutine eventually exits.
886 t.kpDormancyCond.Signal()
887 }
888 t.mu.Unlock()
889 t.controlBuf.finish()
890 t.cancel()
891 t.conn.Close()
892 if channelz.IsOn() {
893 channelz.RemoveEntry(t.channelzID)
894 }
895 // Append info about previous goaways if there were any, since this may be important
896 // for understanding the root cause for this connection to be closed.
897 _, goAwayDebugMessage := t.GetGoAwayReason()
898
899 var st *status.Status
900 if len(goAwayDebugMessage) > 0 {
901 st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
902 err = st.Err()
903 } else {
904 st = status.New(codes.Unavailable, err.Error())
905 }
906
907 // Notify all active streams.
908 for _, s := range streams {
909 t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
910 }
911 if t.statsHandler != nil {
912 connEnd := &stats.ConnEnd{
913 Client: true,
914 }
915 t.statsHandler.HandleConn(t.ctx, connEnd)
916 }
917}
918
919// GracefulClose sets the state to draining, which prevents new streams from
920// being created and causes the transport to be closed when the last active
921// stream is closed. If there are no active streams, the transport is closed
922// immediately. This does nothing if the transport is already draining or
923// closing.
924func (t *http2Client) GracefulClose() {
925 t.mu.Lock()
926 // Make sure we move to draining only from active.
927 if t.state == draining || t.state == closing {
928 t.mu.Unlock()
929 return
930 }
931 t.state = draining
932 active := len(t.activeStreams)
933 t.mu.Unlock()
934 if active == 0 {
935 t.Close(ErrConnClosing)
936 return
937 }
938 t.controlBuf.put(&incomingGoAway{})
939}
940
941// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
942// should proceed only if Write returns nil.
943func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
944 if opts.Last {
945 // If it's the last message, update stream state.
946 if !s.compareAndSwapState(streamActive, streamWriteDone) {
947 return errStreamDone
948 }
949 } else if s.getState() != streamActive {
950 return errStreamDone
951 }
952 df := &dataFrame{
953 streamID: s.id,
954 endStream: opts.Last,
955 h: hdr,
956 d: data,
957 }
958 if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
959 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
960 return err
961 }
962 }
963 return t.controlBuf.put(df)
964}
965
966func (t *http2Client) getStream(f http2.Frame) *Stream {
967 t.mu.Lock()
968 s := t.activeStreams[f.Header().StreamID]
969 t.mu.Unlock()
970 return s
971}
972
973// adjustWindow sends out extra window update over the initial window size
974// of stream if the application is requesting data larger in size than
975// the window.
976func (t *http2Client) adjustWindow(s *Stream, n uint32) {
977 if w := s.fc.maybeAdjust(n); w > 0 {
978 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
979 }
980}
981
982// updateWindow adjusts the inbound quota for the stream.
983// Window updates will be sent out when the cumulative quota
984// exceeds the corresponding threshold.
985func (t *http2Client) updateWindow(s *Stream, n uint32) {
986 if w := s.fc.onRead(n); w > 0 {
987 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
988 }
989}
990
991// updateFlowControl updates the incoming flow control windows
992// for the transport and the stream based on the current bdp
993// estimation.
994func (t *http2Client) updateFlowControl(n uint32) {
995 t.mu.Lock()
996 for _, s := range t.activeStreams {
997 s.fc.newLimit(n)
998 }
999 t.mu.Unlock()
1000 updateIWS := func(interface{}) bool {
1001 t.initialWindowSize = int32(n)
1002 return true
1003 }
1004 t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
1005 t.controlBuf.put(&outgoingSettings{
1006 ss: []http2.Setting{
1007 {
1008 ID: http2.SettingInitialWindowSize,
1009 Val: n,
1010 },
1011 },
1012 })
1013}
1014
1015func (t *http2Client) handleData(f *http2.DataFrame) {
1016 size := f.Header().Length
1017 var sendBDPPing bool
1018 if t.bdpEst != nil {
1019 sendBDPPing = t.bdpEst.add(size)
1020 }
1021 // Decouple connection's flow control from application's read.
1022 // An update on connection's flow control should not depend on
1023 // whether user application has read the data or not. Such a
1024 // restriction is already imposed on the stream's flow control,
1025 // and therefore the sender will be blocked anyways.
1026 // Decoupling the connection flow control will prevent other
1027 // active(fast) streams from starving in presence of slow or
1028 // inactive streams.
1029 //
1030 if w := t.fc.onData(size); w > 0 {
1031 t.controlBuf.put(&outgoingWindowUpdate{
1032 streamID: 0,
1033 increment: w,
1034 })
1035 }
1036 if sendBDPPing {
1037 // Avoid excessive ping detection (e.g. in an L7 proxy)
1038 // by sending a window update prior to the BDP ping.
1039
1040 if w := t.fc.reset(); w > 0 {
1041 t.controlBuf.put(&outgoingWindowUpdate{
1042 streamID: 0,
1043 increment: w,
1044 })
1045 }
1046
1047 t.controlBuf.put(bdpPing)
1048 }
1049 // Select the right stream to dispatch.
1050 s := t.getStream(f)
1051 if s == nil {
1052 return
1053 }
1054 if size > 0 {
1055 if err := s.fc.onData(size); err != nil {
1056 t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
1057 return
1058 }
1059 if f.Header().Flags.Has(http2.FlagDataPadded) {
1060 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
1061 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
1062 }
1063 }
1064 // TODO(bradfitz, zhaoq): A copy is required here because there is no
1065 // guarantee f.Data() is consumed before the arrival of next frame.
1066 // Can this copy be eliminated?
1067 if len(f.Data()) > 0 {
1068 buffer := t.bufferPool.get()
1069 buffer.Reset()
1070 buffer.Write(f.Data())
1071 s.write(recvMsg{buffer: buffer})
1072 }
1073 }
1074 // The server has closed the stream without sending trailers. Record that
1075 // the read direction is closed, and set the status appropriately.
1076 if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
1077 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
1078 }
1079}
1080
1081func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
1082 s := t.getStream(f)
1083 if s == nil {
1084 return
1085 }
1086 if f.ErrCode == http2.ErrCodeRefusedStream {
1087 // The stream was unprocessed by the server.
1088 atomic.StoreUint32(&s.unprocessed, 1)
1089 }
1090 statusCode, ok := http2ErrConvTab[f.ErrCode]
1091 if !ok {
1092 if logger.V(logLevel) {
1093 logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
1094 }
1095 statusCode = codes.Unknown
1096 }
1097 if statusCode == codes.Canceled {
1098 if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
1099 // Our deadline was already exceeded, and that was likely the cause
1100 // of this cancelation. Alter the status code accordingly.
1101 statusCode = codes.DeadlineExceeded
1102 }
1103 }
1104 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
1105}
1106
1107func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
1108 if f.IsAck() {
1109 return
1110 }
1111 var maxStreams *uint32
1112 var ss []http2.Setting
1113 var updateFuncs []func()
1114 f.ForeachSetting(func(s http2.Setting) error {
1115 switch s.ID {
1116 case http2.SettingMaxConcurrentStreams:
1117 maxStreams = new(uint32)
1118 *maxStreams = s.Val
1119 case http2.SettingMaxHeaderListSize:
1120 updateFuncs = append(updateFuncs, func() {
1121 t.maxSendHeaderListSize = new(uint32)
1122 *t.maxSendHeaderListSize = s.Val
1123 })
1124 default:
1125 ss = append(ss, s)
1126 }
1127 return nil
1128 })
1129 if isFirst && maxStreams == nil {
1130 maxStreams = new(uint32)
1131 *maxStreams = math.MaxUint32
1132 }
1133 sf := &incomingSettings{
1134 ss: ss,
1135 }
1136 if maxStreams != nil {
1137 updateStreamQuota := func() {
1138 delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
1139 t.maxConcurrentStreams = *maxStreams
1140 t.streamQuota += delta
1141 if delta > 0 && t.waitingStreams > 0 {
1142 close(t.streamsQuotaAvailable) // wake all of them up.
1143 t.streamsQuotaAvailable = make(chan struct{}, 1)
1144 }
1145 }
1146 updateFuncs = append(updateFuncs, updateStreamQuota)
1147 }
1148 t.controlBuf.executeAndPut(func(interface{}) bool {
1149 for _, f := range updateFuncs {
1150 f()
1151 }
1152 return true
1153 }, sf)
1154}
1155
1156func (t *http2Client) handlePing(f *http2.PingFrame) {
1157 if f.IsAck() {
1158 // Maybe it's a BDP ping.
1159 if t.bdpEst != nil {
1160 t.bdpEst.calculate(f.Data)
1161 }
1162 return
1163 }
1164 pingAck := &ping{ack: true}
1165 copy(pingAck.data[:], f.Data[:])
1166 t.controlBuf.put(pingAck)
1167}
1168
1169func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
1170 t.mu.Lock()
1171 if t.state == closing {
1172 t.mu.Unlock()
1173 return
1174 }
1175 if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
1176 if logger.V(logLevel) {
1177 logger.Infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
1178 }
1179 }
1180 id := f.LastStreamID
1181 if id > 0 && id%2 == 0 {
1182 t.mu.Unlock()
1183 t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
1184 return
1185 }
1186 // A client can receive multiple GoAways from the server (see
1187 // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
1188 // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
1189 // sent after an RTT delay with the ID of the last stream the server will
1190 // process.
1191 //
1192 // Therefore, when we get the first GoAway we don't necessarily close any
1193 // streams. While in case of second GoAway we close all streams created after
1194 // the GoAwayId. This way streams that were in-flight while the GoAway from
1195 // server was being sent don't get killed.
1196 select {
1197 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1198 // If there are multiple GoAways the first one should always have an ID greater than the following ones.
1199 if id > t.prevGoAwayID {
1200 t.mu.Unlock()
1201 t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
1202 return
1203 }
1204 default:
1205 t.setGoAwayReason(f)
1206 close(t.goAway)
1207 t.controlBuf.put(&incomingGoAway{})
1208 // Notify the clientconn about the GOAWAY before we set the state to
1209 // draining, to allow the client to stop attempting to create streams
1210 // before disallowing new streams on this connection.
1211 t.onGoAway(t.goAwayReason)
1212 t.state = draining
1213 }
1214 // All streams with IDs greater than the GoAwayId
1215 // and smaller than the previous GoAway ID should be killed.
1216 upperLimit := t.prevGoAwayID
1217 if upperLimit == 0 { // This is the first GoAway Frame.
1218 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1219 }
1220 for streamID, stream := range t.activeStreams {
1221 if streamID > id && streamID <= upperLimit {
1222 // The stream was unprocessed by the server.
1223 atomic.StoreUint32(&stream.unprocessed, 1)
1224 t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1225 }
1226 }
1227 t.prevGoAwayID = id
1228 active := len(t.activeStreams)
1229 t.mu.Unlock()
1230 if active == 0 {
1231 t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
1232 }
1233}
1234
1235// setGoAwayReason sets the value of t.goAwayReason based
1236// on the GoAway frame received.
1237// It expects a lock on transport's mutext to be held by
1238// the caller.
1239func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1240 t.goAwayReason = GoAwayNoReason
1241 switch f.ErrCode {
1242 case http2.ErrCodeEnhanceYourCalm:
1243 if string(f.DebugData()) == "too_many_pings" {
1244 t.goAwayReason = GoAwayTooManyPings
1245 }
1246 }
1247 if len(f.DebugData()) == 0 {
1248 t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
1249 } else {
1250 t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData()))
1251 }
1252}
1253
1254func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
1255 t.mu.Lock()
1256 defer t.mu.Unlock()
1257 return t.goAwayReason, t.goAwayDebugMessage
1258}
1259
1260func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1261 t.controlBuf.put(&incomingWindowUpdate{
1262 streamID: f.Header().StreamID,
1263 increment: f.Increment,
1264 })
1265}
1266
1267// operateHeaders takes action on the decoded headers.
1268func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1269 s := t.getStream(frame)
1270 if s == nil {
1271 return
1272 }
1273 endStream := frame.StreamEnded()
1274 atomic.StoreUint32(&s.bytesReceived, 1)
1275 initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
1276
1277 if !initialHeader && !endStream {
1278 // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
1279 st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
1280 t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
1281 return
1282 }
1283
1284 // frame.Truncated is set to true when framer detects that the current header
1285 // list size hits MaxHeaderListSize limit.
1286 if frame.Truncated {
1287 se := status.New(codes.Internal, "peer header list size exceeded limit")
1288 t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream)
1289 return
1290 }
1291
1292 var (
1293 // If a gRPC Response-Headers has already been received, then it means
1294 // that the peer is speaking gRPC and we are in gRPC mode.
1295 isGRPC = !initialHeader
1296 mdata = make(map[string][]string)
1297 contentTypeErr = "malformed header: missing HTTP content-type"
1298 grpcMessage string
1299 statusGen *status.Status
1300 recvCompress string
1301 httpStatusCode *int
1302 httpStatusErr string
1303 rawStatusCode = codes.Unknown
1304 // headerError is set if an error is encountered while parsing the headers
1305 headerError string
1306 )
1307
1308 if initialHeader {
1309 httpStatusErr = "malformed header: missing HTTP status"
1310 }
1311
1312 for _, hf := range frame.Fields {
1313 switch hf.Name {
1314 case "content-type":
1315 if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType {
1316 contentTypeErr = fmt.Sprintf("transport: received unexpected content-type %q", hf.Value)
1317 break
1318 }
1319 contentTypeErr = ""
1320 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
1321 isGRPC = true
1322 case "grpc-encoding":
1323 recvCompress = hf.Value
1324 case "grpc-status":
1325 code, err := strconv.ParseInt(hf.Value, 10, 32)
1326 if err != nil {
1327 se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err))
1328 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1329 return
1330 }
1331 rawStatusCode = codes.Code(uint32(code))
1332 case "grpc-message":
1333 grpcMessage = decodeGrpcMessage(hf.Value)
1334 case "grpc-status-details-bin":
1335 var err error
1336 statusGen, err = decodeGRPCStatusDetails(hf.Value)
1337 if err != nil {
1338 headerError = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)
1339 }
1340 case ":status":
1341 if hf.Value == "200" {
1342 httpStatusErr = ""
1343 statusCode := 200
1344 httpStatusCode = &statusCode
1345 break
1346 }
1347
1348 c, err := strconv.ParseInt(hf.Value, 10, 32)
1349 if err != nil {
1350 se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
1351 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1352 return
1353 }
1354 statusCode := int(c)
1355 httpStatusCode = &statusCode
1356
1357 httpStatusErr = fmt.Sprintf(
1358 "unexpected HTTP status code received from server: %d (%s)",
1359 statusCode,
1360 http.StatusText(statusCode),
1361 )
1362 default:
1363 if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
1364 break
1365 }
1366 v, err := decodeMetadataHeader(hf.Name, hf.Value)
1367 if err != nil {
1368 headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
1369 logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
1370 break
1371 }
1372 mdata[hf.Name] = append(mdata[hf.Name], v)
1373 }
1374 }
1375
1376 if !isGRPC || httpStatusErr != "" {
1377 var code = codes.Internal // when header does not include HTTP status, return INTERNAL
1378
1379 if httpStatusCode != nil {
1380 var ok bool
1381 code, ok = HTTPStatusConvTab[*httpStatusCode]
1382 if !ok {
1383 code = codes.Unknown
1384 }
1385 }
1386 var errs []string
1387 if httpStatusErr != "" {
1388 errs = append(errs, httpStatusErr)
1389 }
1390 if contentTypeErr != "" {
1391 errs = append(errs, contentTypeErr)
1392 }
1393 // Verify the HTTP response is a 200.
1394 se := status.New(code, strings.Join(errs, "; "))
1395 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1396 return
1397 }
1398
1399 if headerError != "" {
1400 se := status.New(codes.Internal, headerError)
1401 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1402 return
1403 }
1404
1405 isHeader := false
1406 defer func() {
1407 if t.statsHandler != nil {
1408 if isHeader {
1409 inHeader := &stats.InHeader{
1410 Client: true,
1411 WireLength: int(frame.Header().Length),
1412 Header: s.header.Copy(),
1413 Compression: s.recvCompress,
1414 }
1415 t.statsHandler.HandleRPC(s.ctx, inHeader)
1416 } else {
1417 inTrailer := &stats.InTrailer{
1418 Client: true,
1419 WireLength: int(frame.Header().Length),
1420 Trailer: s.trailer.Copy(),
1421 }
1422 t.statsHandler.HandleRPC(s.ctx, inTrailer)
1423 }
1424 }
1425 }()
1426
1427 // If headerChan hasn't been closed yet
1428 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
1429 s.headerValid = true
1430 if !endStream {
1431 // HEADERS frame block carries a Response-Headers.
1432 isHeader = true
1433 // These values can be set without any synchronization because
1434 // stream goroutine will read it only after seeing a closed
1435 // headerChan which we'll close after setting this.
1436 s.recvCompress = recvCompress
1437 if len(mdata) > 0 {
1438 s.header = mdata
1439 }
1440 } else {
1441 // HEADERS frame block carries a Trailers-Only.
1442 s.noHeaders = true
1443 }
1444 close(s.headerChan)
1445 }
1446
1447 if !endStream {
1448 return
1449 }
1450
1451 if statusGen == nil {
1452 statusGen = status.New(rawStatusCode, grpcMessage)
1453 }
1454
1455 // if client received END_STREAM from server while stream was still active, send RST_STREAM
1456 rst := s.getState() == streamActive
1457 t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
1458}
1459
1460// reader runs as a separate goroutine in charge of reading data from network
1461// connection.
1462//
1463// TODO(zhaoq): currently one reader per transport. Investigate whether this is
1464// optimal.
1465// TODO(zhaoq): Check the validity of the incoming frame sequence.
1466func (t *http2Client) reader() {
1467 defer close(t.readerDone)
1468 // Check the validity of server preface.
1469 frame, err := t.framer.fr.ReadFrame()
1470 if err != nil {
1471 err = connectionErrorf(true, err, "error reading server preface: %v", err)
1472 t.Close(err) // this kicks off resetTransport, so must be last before return
1473 return
1474 }
1475 t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
1476 if t.keepaliveEnabled {
1477 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
1478 }
1479 sf, ok := frame.(*http2.SettingsFrame)
1480 if !ok {
1481 // this kicks off resetTransport, so must be last before return
1482 t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame))
1483 return
1484 }
1485 t.onPrefaceReceipt()
1486 t.handleSettings(sf, true)
1487
1488 // loop to keep reading incoming messages on this transport.
1489 for {
1490 t.controlBuf.throttle()
1491 frame, err := t.framer.fr.ReadFrame()
1492 if t.keepaliveEnabled {
1493 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
1494 }
1495 if err != nil {
1496 // Abort an active stream if the http2.Framer returns a
1497 // http2.StreamError. This can happen only if the server's response
1498 // is malformed http2.
1499 if se, ok := err.(http2.StreamError); ok {
1500 t.mu.Lock()
1501 s := t.activeStreams[se.StreamID]
1502 t.mu.Unlock()
1503 if s != nil {
1504 // use error detail to provide better err message
1505 code := http2ErrConvTab[se.Code]
1506 errorDetail := t.framer.fr.ErrorDetail()
1507 var msg string
1508 if errorDetail != nil {
1509 msg = errorDetail.Error()
1510 } else {
1511 msg = "received invalid frame"
1512 }
1513 t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
1514 }
1515 continue
1516 } else {
1517 // Transport error.
1518 t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
1519 return
1520 }
1521 }
1522 switch frame := frame.(type) {
1523 case *http2.MetaHeadersFrame:
1524 t.operateHeaders(frame)
1525 case *http2.DataFrame:
1526 t.handleData(frame)
1527 case *http2.RSTStreamFrame:
1528 t.handleRSTStream(frame)
1529 case *http2.SettingsFrame:
1530 t.handleSettings(frame, false)
1531 case *http2.PingFrame:
1532 t.handlePing(frame)
1533 case *http2.GoAwayFrame:
1534 t.handleGoAway(frame)
1535 case *http2.WindowUpdateFrame:
1536 t.handleWindowUpdate(frame)
1537 default:
1538 if logger.V(logLevel) {
1539 logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1540 }
1541 }
1542 }
1543}
1544
1545func minTime(a, b time.Duration) time.Duration {
1546 if a < b {
1547 return a
1548 }
1549 return b
1550}
1551
1552// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1553func (t *http2Client) keepalive() {
1554 p := &ping{data: [8]byte{}}
1555 // True iff a ping has been sent, and no data has been received since then.
1556 outstandingPing := false
1557 // Amount of time remaining before which we should receive an ACK for the
1558 // last sent ping.
1559 timeoutLeft := time.Duration(0)
1560 // Records the last value of t.lastRead before we go block on the timer.
1561 // This is required to check for read activity since then.
1562 prevNano := time.Now().UnixNano()
1563 timer := time.NewTimer(t.kp.Time)
1564 for {
1565 select {
1566 case <-timer.C:
1567 lastRead := atomic.LoadInt64(&t.lastRead)
1568 if lastRead > prevNano {
1569 // There has been read activity since the last time we were here.
1570 outstandingPing = false
1571 // Next timer should fire at kp.Time seconds from lastRead time.
1572 timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1573 prevNano = lastRead
1574 continue
1575 }
1576 if outstandingPing && timeoutLeft <= 0 {
1577 t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
1578 return
1579 }
1580 t.mu.Lock()
1581 if t.state == closing {
1582 // If the transport is closing, we should exit from the
1583 // keepalive goroutine here. If not, we could have a race
1584 // between the call to Signal() from Close() and the call to
1585 // Wait() here, whereby the keepalive goroutine ends up
1586 // blocking on the condition variable which will never be
1587 // signalled again.
1588 t.mu.Unlock()
1589 return
1590 }
1591 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1592 // If a ping was sent out previously (because there were active
1593 // streams at that point) which wasn't acked and its timeout
1594 // hadn't fired, but we got here and are about to go dormant,
1595 // we should make sure that we unconditionally send a ping once
1596 // we awaken.
1597 outstandingPing = false
1598 t.kpDormant = true
1599 t.kpDormancyCond.Wait()
1600 }
1601 t.kpDormant = false
1602 t.mu.Unlock()
1603
1604 // We get here either because we were dormant and a new stream was
1605 // created which unblocked the Wait() call, or because the
1606 // keepalive timer expired. In both cases, we need to send a ping.
1607 if !outstandingPing {
1608 if channelz.IsOn() {
1609 atomic.AddInt64(&t.czData.kpCount, 1)
1610 }
1611 t.controlBuf.put(p)
1612 timeoutLeft = t.kp.Timeout
1613 outstandingPing = true
1614 }
1615 // The amount of time to sleep here is the minimum of kp.Time and
1616 // timeoutLeft. This will ensure that we wait only for kp.Time
1617 // before sending out the next ping (for cases where the ping is
1618 // acked).
1619 sleepDuration := minTime(t.kp.Time, timeoutLeft)
1620 timeoutLeft -= sleepDuration
1621 timer.Reset(sleepDuration)
1622 case <-t.ctx.Done():
1623 if !timer.Stop() {
1624 <-timer.C
1625 }
1626 return
1627 }
1628 }
1629}
1630
1631func (t *http2Client) Error() <-chan struct{} {
1632 return t.ctx.Done()
1633}
1634
1635func (t *http2Client) GoAway() <-chan struct{} {
1636 return t.goAway
1637}
1638
1639func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
1640 s := channelz.SocketInternalMetric{
1641 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1642 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1643 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1644 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1645 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1646 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1647 LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1648 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1649 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1650 LocalFlowControlWindow: int64(t.fc.getSize()),
1651 SocketOptions: channelz.GetSocketOption(t.conn),
1652 LocalAddr: t.localAddr,
1653 RemoteAddr: t.remoteAddr,
1654 // RemoteName :
1655 }
1656 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1657 s.Security = au.GetSecurityValue()
1658 }
1659 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1660 return &s
1661}
1662
1663func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
1664
1665func (t *http2Client) IncrMsgSent() {
1666 atomic.AddInt64(&t.czData.msgSent, 1)
1667 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1668}
1669
1670func (t *http2Client) IncrMsgRecv() {
1671 atomic.AddInt64(&t.czData.msgRecv, 1)
1672 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1673}
1674
1675func (t *http2Client) getOutFlowWindow() int64 {
1676 resp := make(chan uint32, 1)
1677 timer := time.NewTimer(time.Second)
1678 defer timer.Stop()
1679 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1680 select {
1681 case sz := <-resp:
1682 return int64(sz)
1683 case <-t.ctxDone:
1684 return -1
1685 case <-timer.C:
1686 return -2
1687 }
1688}