blob: 294661a3f3374f0b02af8cef4554e1fc65d46094 [file] [log] [blame]
Don Newton98fd8812019-09-23 15:15:02 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "context"
23 "fmt"
24 "io"
25 "math"
26 "net"
27 "strconv"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "golang.org/x/net/http2"
34 "golang.org/x/net/http2/hpack"
35
36 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/credentials"
Don Newtone0d34a82019-11-14 10:58:06 -050038 "google.golang.org/grpc/internal"
Don Newton98fd8812019-09-23 15:15:02 -040039 "google.golang.org/grpc/internal/channelz"
40 "google.golang.org/grpc/internal/syscall"
41 "google.golang.org/grpc/keepalive"
42 "google.golang.org/grpc/metadata"
43 "google.golang.org/grpc/peer"
44 "google.golang.org/grpc/stats"
45 "google.golang.org/grpc/status"
46)
47
48// http2Client implements the ClientTransport interface with HTTP2.
49type http2Client struct {
Don Newtone0d34a82019-11-14 10:58:06 -050050 lastRead int64 // keep this field 64-bit aligned
Don Newton98fd8812019-09-23 15:15:02 -040051 ctx context.Context
52 cancel context.CancelFunc
53 ctxDone <-chan struct{} // Cache the ctx.Done() chan.
54 userAgent string
55 md interface{}
56 conn net.Conn // underlying communication channel
57 loopy *loopyWriter
58 remoteAddr net.Addr
59 localAddr net.Addr
60 authInfo credentials.AuthInfo // auth info about the connection
61
62 readerDone chan struct{} // sync point to enable testing.
63 writerDone chan struct{} // sync point to enable testing.
64 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
65 // that the server sent GoAway on this transport.
66 goAway chan struct{}
Don Newton98fd8812019-09-23 15:15:02 -040067
68 framer *framer
69 // controlBuf delivers all the control related tasks (e.g., window
70 // updates, reset streams, and various settings) to the controller.
71 controlBuf *controlBuffer
72 fc *trInFlow
73 // The scheme used: https if TLS is on, http otherwise.
74 scheme string
75
76 isSecure bool
77
78 perRPCCreds []credentials.PerRPCCredentials
79
Don Newton98fd8812019-09-23 15:15:02 -040080 kp keepalive.ClientParameters
81 keepaliveEnabled bool
82
83 statsHandler stats.Handler
84
85 initialWindowSize int32
86
87 // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
88 maxSendHeaderListSize *uint32
89
90 bdpEst *bdpEstimator
91 // onPrefaceReceipt is a callback that client transport calls upon
92 // receiving server preface to signal that a succefull HTTP2
93 // connection was established.
94 onPrefaceReceipt func()
95
96 maxConcurrentStreams uint32
97 streamQuota int64
98 streamsQuotaAvailable chan struct{}
99 waitingStreams uint32
100 nextID uint32
101
102 mu sync.Mutex // guard the following variables
103 state transportState
104 activeStreams map[uint32]*Stream
105 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
106 prevGoAwayID uint32
107 // goAwayReason records the http2.ErrCode and debug data received with the
108 // GoAway frame.
109 goAwayReason GoAwayReason
Don Newtone0d34a82019-11-14 10:58:06 -0500110 // A condition variable used to signal when the keepalive goroutine should
111 // go dormant. The condition for dormancy is based on the number of active
112 // streams and the `PermitWithoutStream` keepalive client parameter. And
113 // since the number of active streams is guarded by the above mutex, we use
114 // the same for this condition variable as well.
115 kpDormancyCond *sync.Cond
116 // A boolean to track whether the keepalive goroutine is dormant or not.
117 // This is checked before attempting to signal the above condition
118 // variable.
119 kpDormant bool
Don Newton98fd8812019-09-23 15:15:02 -0400120
121 // Fields below are for channelz metric collection.
122 channelzID int64 // channelz unique identification number
123 czData *channelzData
124
125 onGoAway func(GoAwayReason)
126 onClose func()
127
128 bufferPool *bufferPool
129}
130
131func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
132 if fn != nil {
133 return fn(ctx, addr)
134 }
135 return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
136}
137
138func isTemporary(err error) bool {
139 switch err := err.(type) {
140 case interface {
141 Temporary() bool
142 }:
143 return err.Temporary()
144 case interface {
145 Timeout() bool
146 }:
147 // Timeouts may be resolved upon retry, and are thus treated as
148 // temporary.
149 return err.Timeout()
150 }
151 return true
152}
153
154// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
155// and starts to receive messages on it. Non-nil error returns if construction
156// fails.
157func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
158 scheme := "http"
159 ctx, cancel := context.WithCancel(ctx)
160 defer func() {
161 if err != nil {
162 cancel()
163 }
164 }()
165
166 conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
167 if err != nil {
168 if opts.FailOnNonTempDialError {
169 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
170 }
171 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
172 }
173 // Any further errors will close the underlying connection
174 defer func(conn net.Conn) {
175 if err != nil {
176 conn.Close()
177 }
178 }(conn)
179 kp := opts.KeepaliveParams
180 // Validate keepalive parameters.
181 if kp.Time == 0 {
182 kp.Time = defaultClientKeepaliveTime
183 }
184 if kp.Timeout == 0 {
185 kp.Timeout = defaultClientKeepaliveTimeout
186 }
187 keepaliveEnabled := false
188 if kp.Time != infinity {
189 if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
190 return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
191 }
192 keepaliveEnabled = true
193 }
194 var (
195 isSecure bool
196 authInfo credentials.AuthInfo
197 )
198 transportCreds := opts.TransportCredentials
199 perRPCCreds := opts.PerRPCCredentials
200
201 if b := opts.CredsBundle; b != nil {
202 if t := b.TransportCredentials(); t != nil {
203 transportCreds = t
204 }
205 if t := b.PerRPCCredentials(); t != nil {
206 perRPCCreds = append(perRPCCreds, t)
207 }
208 }
209 if transportCreds != nil {
210 scheme = "https"
211 conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
212 if err != nil {
213 return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
214 }
215 isSecure = true
216 }
217 dynamicWindow := true
218 icwz := int32(initialWindowSize)
219 if opts.InitialConnWindowSize >= defaultWindowSize {
220 icwz = opts.InitialConnWindowSize
221 dynamicWindow = false
222 }
223 writeBufSize := opts.WriteBufferSize
224 readBufSize := opts.ReadBufferSize
225 maxHeaderListSize := defaultClientMaxHeaderListSize
226 if opts.MaxHeaderListSize != nil {
227 maxHeaderListSize = *opts.MaxHeaderListSize
228 }
229 t := &http2Client{
230 ctx: ctx,
231 ctxDone: ctx.Done(), // Cache Done chan.
232 cancel: cancel,
233 userAgent: opts.UserAgent,
234 md: addr.Metadata,
235 conn: conn,
236 remoteAddr: conn.RemoteAddr(),
237 localAddr: conn.LocalAddr(),
238 authInfo: authInfo,
239 readerDone: make(chan struct{}),
240 writerDone: make(chan struct{}),
241 goAway: make(chan struct{}),
Don Newton98fd8812019-09-23 15:15:02 -0400242 framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
243 fc: &trInFlow{limit: uint32(icwz)},
244 scheme: scheme,
245 activeStreams: make(map[uint32]*Stream),
246 isSecure: isSecure,
247 perRPCCreds: perRPCCreds,
248 kp: kp,
249 statsHandler: opts.StatsHandler,
250 initialWindowSize: initialWindowSize,
251 onPrefaceReceipt: onPrefaceReceipt,
252 nextID: 1,
253 maxConcurrentStreams: defaultMaxStreamsClient,
254 streamQuota: defaultMaxStreamsClient,
255 streamsQuotaAvailable: make(chan struct{}, 1),
256 czData: new(channelzData),
257 onGoAway: onGoAway,
258 onClose: onClose,
259 keepaliveEnabled: keepaliveEnabled,
260 bufferPool: newBufferPool(),
261 }
262 t.controlBuf = newControlBuffer(t.ctxDone)
263 if opts.InitialWindowSize >= defaultWindowSize {
264 t.initialWindowSize = opts.InitialWindowSize
265 dynamicWindow = false
266 }
267 if dynamicWindow {
268 t.bdpEst = &bdpEstimator{
269 bdp: initialWindowSize,
270 updateFlowControl: t.updateFlowControl,
271 }
272 }
Don Newton98fd8812019-09-23 15:15:02 -0400273 if t.statsHandler != nil {
274 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
275 RemoteAddr: t.remoteAddr,
276 LocalAddr: t.localAddr,
277 })
278 connBegin := &stats.ConnBegin{
279 Client: true,
280 }
281 t.statsHandler.HandleConn(t.ctx, connBegin)
282 }
283 if channelz.IsOn() {
284 t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
285 }
286 if t.keepaliveEnabled {
Don Newtone0d34a82019-11-14 10:58:06 -0500287 t.kpDormancyCond = sync.NewCond(&t.mu)
Don Newton98fd8812019-09-23 15:15:02 -0400288 go t.keepalive()
289 }
290 // Start the reader goroutine for incoming message. Each transport has
291 // a dedicated goroutine which reads HTTP2 frame from network. Then it
292 // dispatches the frame to the corresponding stream entity.
293 go t.reader()
294
295 // Send connection preface to server.
296 n, err := t.conn.Write(clientPreface)
297 if err != nil {
298 t.Close()
299 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
300 }
301 if n != len(clientPreface) {
302 t.Close()
303 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
304 }
305 var ss []http2.Setting
306
307 if t.initialWindowSize != defaultWindowSize {
308 ss = append(ss, http2.Setting{
309 ID: http2.SettingInitialWindowSize,
310 Val: uint32(t.initialWindowSize),
311 })
312 }
313 if opts.MaxHeaderListSize != nil {
314 ss = append(ss, http2.Setting{
315 ID: http2.SettingMaxHeaderListSize,
316 Val: *opts.MaxHeaderListSize,
317 })
318 }
319 err = t.framer.fr.WriteSettings(ss...)
320 if err != nil {
321 t.Close()
322 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
323 }
324 // Adjust the connection flow control window if needed.
325 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
326 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
327 t.Close()
328 return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
329 }
330 }
331
332 if err := t.framer.writer.Flush(); err != nil {
333 return nil, err
334 }
335 go func() {
336 t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
337 err := t.loopy.run()
338 if err != nil {
339 errorf("transport: loopyWriter.run returning. Err: %v", err)
340 }
341 // If it's a connection error, let reader goroutine handle it
342 // since there might be data in the buffers.
343 if _, ok := err.(net.Error); !ok {
344 t.conn.Close()
345 }
346 close(t.writerDone)
347 }()
348 return t, nil
349}
350
351func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
352 // TODO(zhaoq): Handle uint32 overflow of Stream.id.
353 s := &Stream{
Don Newtone0d34a82019-11-14 10:58:06 -0500354 ct: t,
Don Newton98fd8812019-09-23 15:15:02 -0400355 done: make(chan struct{}),
356 method: callHdr.Method,
357 sendCompress: callHdr.SendCompress,
358 buf: newRecvBuffer(),
359 headerChan: make(chan struct{}),
360 contentSubtype: callHdr.ContentSubtype,
361 }
362 s.wq = newWriteQuota(defaultWriteQuota, s.done)
363 s.requestRead = func(n int) {
364 t.adjustWindow(s, uint32(n))
365 }
366 // The client side stream context should have exactly the same life cycle with the user provided context.
367 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
368 // So we use the original context here instead of creating a copy.
369 s.ctx = ctx
370 s.trReader = &transportReader{
371 reader: &recvBufferReader{
372 ctx: s.ctx,
373 ctxDone: s.ctx.Done(),
374 recv: s.buf,
375 closeStream: func(err error) {
376 t.CloseStream(s, err)
377 },
378 freeBuffer: t.bufferPool.put,
379 },
380 windowHandler: func(n int) {
381 t.updateWindow(s, uint32(n))
382 },
383 }
384 return s
385}
386
387func (t *http2Client) getPeer() *peer.Peer {
Don Newtone0d34a82019-11-14 10:58:06 -0500388 return &peer.Peer{
389 Addr: t.remoteAddr,
390 AuthInfo: t.authInfo,
Don Newton98fd8812019-09-23 15:15:02 -0400391 }
Don Newton98fd8812019-09-23 15:15:02 -0400392}
393
394func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
395 aud := t.createAudience(callHdr)
Don Newtone0d34a82019-11-14 10:58:06 -0500396 ri := credentials.RequestInfo{
397 Method: callHdr.Method,
398 }
399 ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
400 authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
Don Newton98fd8812019-09-23 15:15:02 -0400401 if err != nil {
402 return nil, err
403 }
Don Newtone0d34a82019-11-14 10:58:06 -0500404 callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
Don Newton98fd8812019-09-23 15:15:02 -0400405 if err != nil {
406 return nil, err
407 }
408 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
409 // first and create a slice of that exact size.
410 // Make the slice of certain predictable size to reduce allocations made by append.
411 hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
412 hfLen += len(authData) + len(callAuthData)
413 headerFields := make([]hpack.HeaderField, 0, hfLen)
414 headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
415 headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
416 headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
417 headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
418 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
419 headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
420 headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
421 if callHdr.PreviousAttempts > 0 {
422 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
423 }
424
425 if callHdr.SendCompress != "" {
426 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
427 }
428 if dl, ok := ctx.Deadline(); ok {
429 // Send out timeout regardless its value. The server can detect timeout context by itself.
430 // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
431 timeout := time.Until(dl)
432 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
433 }
434 for k, v := range authData {
435 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
436 }
437 for k, v := range callAuthData {
438 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
439 }
440 if b := stats.OutgoingTags(ctx); b != nil {
441 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
442 }
443 if b := stats.OutgoingTrace(ctx); b != nil {
444 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
445 }
446
447 if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
448 var k string
449 for k, vv := range md {
450 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
451 if isReservedHeader(k) {
452 continue
453 }
454 for _, v := range vv {
455 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
456 }
457 }
458 for _, vv := range added {
459 for i, v := range vv {
460 if i%2 == 0 {
461 k = v
462 continue
463 }
464 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
465 if isReservedHeader(k) {
466 continue
467 }
468 headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
469 }
470 }
471 }
472 if md, ok := t.md.(*metadata.MD); ok {
473 for k, vv := range *md {
474 if isReservedHeader(k) {
475 continue
476 }
477 for _, v := range vv {
478 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
479 }
480 }
481 }
482 return headerFields, nil
483}
484
485func (t *http2Client) createAudience(callHdr *CallHdr) string {
486 // Create an audience string only if needed.
487 if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
488 return ""
489 }
490 // Construct URI required to get auth request metadata.
491 // Omit port if it is the default one.
492 host := strings.TrimSuffix(callHdr.Host, ":443")
493 pos := strings.LastIndex(callHdr.Method, "/")
494 if pos == -1 {
495 pos = len(callHdr.Method)
496 }
497 return "https://" + host + callHdr.Method[:pos]
498}
499
500func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
501 if len(t.perRPCCreds) == 0 {
502 return nil, nil
503 }
504 authData := map[string]string{}
505 for _, c := range t.perRPCCreds {
506 data, err := c.GetRequestMetadata(ctx, audience)
507 if err != nil {
508 if _, ok := status.FromError(err); ok {
509 return nil, err
510 }
511
512 return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
513 }
514 for k, v := range data {
515 // Capital header names are illegal in HTTP/2.
516 k = strings.ToLower(k)
517 authData[k] = v
518 }
519 }
520 return authData, nil
521}
522
523func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
524 var callAuthData map[string]string
525 // Check if credentials.PerRPCCredentials were provided via call options.
526 // Note: if these credentials are provided both via dial options and call
527 // options, then both sets of credentials will be applied.
528 if callCreds := callHdr.Creds; callCreds != nil {
529 if !t.isSecure && callCreds.RequireTransportSecurity() {
530 return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
531 }
532 data, err := callCreds.GetRequestMetadata(ctx, audience)
533 if err != nil {
534 return nil, status.Errorf(codes.Internal, "transport: %v", err)
535 }
536 callAuthData = make(map[string]string, len(data))
537 for k, v := range data {
538 // Capital header names are illegal in HTTP/2
539 k = strings.ToLower(k)
540 callAuthData[k] = v
541 }
542 }
543 return callAuthData, nil
544}
545
546// NewStream creates a stream and registers it into the transport as "active"
547// streams.
548func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
549 ctx = peer.NewContext(ctx, t.getPeer())
550 headerFields, err := t.createHeaderFields(ctx, callHdr)
551 if err != nil {
552 return nil, err
553 }
554 s := t.newStream(ctx, callHdr)
555 cleanup := func(err error) {
556 if s.swapState(streamDone) == streamDone {
557 // If it was already done, return.
558 return
559 }
560 // The stream was unprocessed by the server.
561 atomic.StoreUint32(&s.unprocessed, 1)
562 s.write(recvMsg{err: err})
563 close(s.done)
564 // If headerChan isn't closed, then close it.
565 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
566 close(s.headerChan)
567 }
568 }
569 hdr := &headerFrame{
570 hf: headerFields,
571 endStream: false,
Don Newtone0d34a82019-11-14 10:58:06 -0500572 initStream: func(id uint32) error {
Don Newton98fd8812019-09-23 15:15:02 -0400573 t.mu.Lock()
574 if state := t.state; state != reachable {
575 t.mu.Unlock()
576 // Do a quick cleanup.
577 err := error(errStreamDrain)
578 if state == closing {
579 err = ErrConnClosing
580 }
581 cleanup(err)
Don Newtone0d34a82019-11-14 10:58:06 -0500582 return err
Don Newton98fd8812019-09-23 15:15:02 -0400583 }
584 t.activeStreams[id] = s
585 if channelz.IsOn() {
586 atomic.AddInt64(&t.czData.streamsStarted, 1)
587 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
588 }
Don Newtone0d34a82019-11-14 10:58:06 -0500589 // If the keepalive goroutine has gone dormant, wake it up.
590 if t.kpDormant {
591 t.kpDormancyCond.Signal()
Don Newton98fd8812019-09-23 15:15:02 -0400592 }
593 t.mu.Unlock()
Don Newtone0d34a82019-11-14 10:58:06 -0500594 return nil
Don Newton98fd8812019-09-23 15:15:02 -0400595 },
596 onOrphaned: cleanup,
597 wq: s.wq,
598 }
599 firstTry := true
600 var ch chan struct{}
601 checkForStreamQuota := func(it interface{}) bool {
602 if t.streamQuota <= 0 { // Can go negative if server decreases it.
603 if firstTry {
604 t.waitingStreams++
605 }
606 ch = t.streamsQuotaAvailable
607 return false
608 }
609 if !firstTry {
610 t.waitingStreams--
611 }
612 t.streamQuota--
613 h := it.(*headerFrame)
614 h.streamID = t.nextID
615 t.nextID += 2
616 s.id = h.streamID
617 s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
618 if t.streamQuota > 0 && t.waitingStreams > 0 {
619 select {
620 case t.streamsQuotaAvailable <- struct{}{}:
621 default:
622 }
623 }
624 return true
625 }
626 var hdrListSizeErr error
627 checkForHeaderListSize := func(it interface{}) bool {
628 if t.maxSendHeaderListSize == nil {
629 return true
630 }
631 hdrFrame := it.(*headerFrame)
632 var sz int64
633 for _, f := range hdrFrame.hf {
634 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
635 hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
636 return false
637 }
638 }
639 return true
640 }
641 for {
642 success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
643 if !checkForStreamQuota(it) {
644 return false
645 }
646 if !checkForHeaderListSize(it) {
647 return false
648 }
649 return true
650 }, hdr)
651 if err != nil {
652 return nil, err
653 }
654 if success {
655 break
656 }
657 if hdrListSizeErr != nil {
658 return nil, hdrListSizeErr
659 }
660 firstTry = false
661 select {
662 case <-ch:
663 case <-s.ctx.Done():
664 return nil, ContextErr(s.ctx.Err())
665 case <-t.goAway:
666 return nil, errStreamDrain
667 case <-t.ctx.Done():
668 return nil, ErrConnClosing
669 }
670 }
671 if t.statsHandler != nil {
672 outHeader := &stats.OutHeader{
673 Client: true,
674 FullMethod: callHdr.Method,
675 RemoteAddr: t.remoteAddr,
676 LocalAddr: t.localAddr,
677 Compression: callHdr.SendCompress,
678 }
679 t.statsHandler.HandleRPC(s.ctx, outHeader)
680 }
681 return s, nil
682}
683
684// CloseStream clears the footprint of a stream when the stream is not needed any more.
685// This must not be executed in reader's goroutine.
686func (t *http2Client) CloseStream(s *Stream, err error) {
687 var (
688 rst bool
689 rstCode http2.ErrCode
690 )
691 if err != nil {
692 rst = true
693 rstCode = http2.ErrCodeCancel
694 }
695 t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
696}
697
698func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
699 // Set stream status to done.
700 if s.swapState(streamDone) == streamDone {
701 // If it was already done, return. If multiple closeStream calls
702 // happen simultaneously, wait for the first to finish.
703 <-s.done
704 return
705 }
706 // status and trailers can be updated here without any synchronization because the stream goroutine will
707 // only read it after it sees an io.EOF error from read or write and we'll write those errors
708 // only after updating this.
709 s.status = st
710 if len(mdata) > 0 {
711 s.trailer = mdata
712 }
713 if err != nil {
714 // This will unblock reads eventually.
715 s.write(recvMsg{err: err})
716 }
717 // If headerChan isn't closed, then close it.
718 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
719 s.noHeaders = true
720 close(s.headerChan)
721 }
722 cleanup := &cleanupStream{
723 streamID: s.id,
724 onWrite: func() {
725 t.mu.Lock()
726 if t.activeStreams != nil {
727 delete(t.activeStreams, s.id)
728 }
729 t.mu.Unlock()
730 if channelz.IsOn() {
731 if eosReceived {
732 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
733 } else {
734 atomic.AddInt64(&t.czData.streamsFailed, 1)
735 }
736 }
737 },
738 rst: rst,
739 rstCode: rstCode,
740 }
741 addBackStreamQuota := func(interface{}) bool {
742 t.streamQuota++
743 if t.streamQuota > 0 && t.waitingStreams > 0 {
744 select {
745 case t.streamsQuotaAvailable <- struct{}{}:
746 default:
747 }
748 }
749 return true
750 }
751 t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
752 // This will unblock write.
753 close(s.done)
754}
755
756// Close kicks off the shutdown process of the transport. This should be called
757// only once on a transport. Once it is called, the transport should not be
758// accessed any more.
759//
760// This method blocks until the addrConn that initiated this transport is
761// re-connected. This happens because t.onClose() begins reconnect logic at the
762// addrConn level and blocks until the addrConn is successfully connected.
763func (t *http2Client) Close() error {
764 t.mu.Lock()
765 // Make sure we only Close once.
766 if t.state == closing {
767 t.mu.Unlock()
768 return nil
769 }
770 // Call t.onClose before setting the state to closing to prevent the client
771 // from attempting to create new streams ASAP.
772 t.onClose()
773 t.state = closing
774 streams := t.activeStreams
775 t.activeStreams = nil
Don Newtone0d34a82019-11-14 10:58:06 -0500776 if t.kpDormant {
777 // If the keepalive goroutine is blocked on this condition variable, we
778 // should unblock it so that the goroutine eventually exits.
779 t.kpDormancyCond.Signal()
780 }
Don Newton98fd8812019-09-23 15:15:02 -0400781 t.mu.Unlock()
782 t.controlBuf.finish()
783 t.cancel()
784 err := t.conn.Close()
785 if channelz.IsOn() {
786 channelz.RemoveEntry(t.channelzID)
787 }
788 // Notify all active streams.
789 for _, s := range streams {
790 t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
791 }
792 if t.statsHandler != nil {
793 connEnd := &stats.ConnEnd{
794 Client: true,
795 }
796 t.statsHandler.HandleConn(t.ctx, connEnd)
797 }
798 return err
799}
800
801// GracefulClose sets the state to draining, which prevents new streams from
802// being created and causes the transport to be closed when the last active
803// stream is closed. If there are no active streams, the transport is closed
804// immediately. This does nothing if the transport is already draining or
805// closing.
806func (t *http2Client) GracefulClose() {
807 t.mu.Lock()
808 // Make sure we move to draining only from active.
809 if t.state == draining || t.state == closing {
810 t.mu.Unlock()
811 return
812 }
813 t.state = draining
814 active := len(t.activeStreams)
815 t.mu.Unlock()
816 if active == 0 {
817 t.Close()
818 return
819 }
820 t.controlBuf.put(&incomingGoAway{})
821}
822
823// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
824// should proceed only if Write returns nil.
825func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
826 if opts.Last {
827 // If it's the last message, update stream state.
828 if !s.compareAndSwapState(streamActive, streamWriteDone) {
829 return errStreamDone
830 }
831 } else if s.getState() != streamActive {
832 return errStreamDone
833 }
834 df := &dataFrame{
835 streamID: s.id,
836 endStream: opts.Last,
837 }
838 if hdr != nil || data != nil { // If it's not an empty data frame.
839 // Add some data to grpc message header so that we can equally
840 // distribute bytes across frames.
841 emptyLen := http2MaxFrameLen - len(hdr)
842 if emptyLen > len(data) {
843 emptyLen = len(data)
844 }
845 hdr = append(hdr, data[:emptyLen]...)
846 data = data[emptyLen:]
847 df.h, df.d = hdr, data
848 // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
849 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
850 return err
851 }
852 }
853 return t.controlBuf.put(df)
854}
855
Don Newtone0d34a82019-11-14 10:58:06 -0500856func (t *http2Client) getStream(f http2.Frame) *Stream {
Don Newton98fd8812019-09-23 15:15:02 -0400857 t.mu.Lock()
Don Newtone0d34a82019-11-14 10:58:06 -0500858 s := t.activeStreams[f.Header().StreamID]
859 t.mu.Unlock()
860 return s
Don Newton98fd8812019-09-23 15:15:02 -0400861}
862
863// adjustWindow sends out extra window update over the initial window size
864// of stream if the application is requesting data larger in size than
865// the window.
866func (t *http2Client) adjustWindow(s *Stream, n uint32) {
867 if w := s.fc.maybeAdjust(n); w > 0 {
868 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
869 }
870}
871
872// updateWindow adjusts the inbound quota for the stream.
873// Window updates will be sent out when the cumulative quota
874// exceeds the corresponding threshold.
875func (t *http2Client) updateWindow(s *Stream, n uint32) {
876 if w := s.fc.onRead(n); w > 0 {
877 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
878 }
879}
880
881// updateFlowControl updates the incoming flow control windows
882// for the transport and the stream based on the current bdp
883// estimation.
884func (t *http2Client) updateFlowControl(n uint32) {
885 t.mu.Lock()
886 for _, s := range t.activeStreams {
887 s.fc.newLimit(n)
888 }
889 t.mu.Unlock()
890 updateIWS := func(interface{}) bool {
891 t.initialWindowSize = int32(n)
892 return true
893 }
894 t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
895 t.controlBuf.put(&outgoingSettings{
896 ss: []http2.Setting{
897 {
898 ID: http2.SettingInitialWindowSize,
899 Val: n,
900 },
901 },
902 })
903}
904
905func (t *http2Client) handleData(f *http2.DataFrame) {
906 size := f.Header().Length
907 var sendBDPPing bool
908 if t.bdpEst != nil {
909 sendBDPPing = t.bdpEst.add(size)
910 }
911 // Decouple connection's flow control from application's read.
912 // An update on connection's flow control should not depend on
913 // whether user application has read the data or not. Such a
914 // restriction is already imposed on the stream's flow control,
915 // and therefore the sender will be blocked anyways.
916 // Decoupling the connection flow control will prevent other
917 // active(fast) streams from starving in presence of slow or
918 // inactive streams.
919 //
920 if w := t.fc.onData(size); w > 0 {
921 t.controlBuf.put(&outgoingWindowUpdate{
922 streamID: 0,
923 increment: w,
924 })
925 }
926 if sendBDPPing {
927 // Avoid excessive ping detection (e.g. in an L7 proxy)
928 // by sending a window update prior to the BDP ping.
929
930 if w := t.fc.reset(); w > 0 {
931 t.controlBuf.put(&outgoingWindowUpdate{
932 streamID: 0,
933 increment: w,
934 })
935 }
936
937 t.controlBuf.put(bdpPing)
938 }
939 // Select the right stream to dispatch.
Don Newtone0d34a82019-11-14 10:58:06 -0500940 s := t.getStream(f)
941 if s == nil {
Don Newton98fd8812019-09-23 15:15:02 -0400942 return
943 }
944 if size > 0 {
945 if err := s.fc.onData(size); err != nil {
946 t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
947 return
948 }
949 if f.Header().Flags.Has(http2.FlagDataPadded) {
950 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
951 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
952 }
953 }
954 // TODO(bradfitz, zhaoq): A copy is required here because there is no
955 // guarantee f.Data() is consumed before the arrival of next frame.
956 // Can this copy be eliminated?
957 if len(f.Data()) > 0 {
958 buffer := t.bufferPool.get()
959 buffer.Reset()
960 buffer.Write(f.Data())
961 s.write(recvMsg{buffer: buffer})
962 }
963 }
964 // The server has closed the stream without sending trailers. Record that
965 // the read direction is closed, and set the status appropriately.
966 if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
967 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
968 }
969}
970
971func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
Don Newtone0d34a82019-11-14 10:58:06 -0500972 s := t.getStream(f)
973 if s == nil {
Don Newton98fd8812019-09-23 15:15:02 -0400974 return
975 }
976 if f.ErrCode == http2.ErrCodeRefusedStream {
977 // The stream was unprocessed by the server.
978 atomic.StoreUint32(&s.unprocessed, 1)
979 }
980 statusCode, ok := http2ErrConvTab[f.ErrCode]
981 if !ok {
982 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
983 statusCode = codes.Unknown
984 }
985 if statusCode == codes.Canceled {
986 if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
987 // Our deadline was already exceeded, and that was likely the cause
988 // of this cancelation. Alter the status code accordingly.
989 statusCode = codes.DeadlineExceeded
990 }
991 }
992 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
993}
994
995func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
996 if f.IsAck() {
997 return
998 }
999 var maxStreams *uint32
1000 var ss []http2.Setting
1001 var updateFuncs []func()
1002 f.ForeachSetting(func(s http2.Setting) error {
1003 switch s.ID {
1004 case http2.SettingMaxConcurrentStreams:
1005 maxStreams = new(uint32)
1006 *maxStreams = s.Val
1007 case http2.SettingMaxHeaderListSize:
1008 updateFuncs = append(updateFuncs, func() {
1009 t.maxSendHeaderListSize = new(uint32)
1010 *t.maxSendHeaderListSize = s.Val
1011 })
1012 default:
1013 ss = append(ss, s)
1014 }
1015 return nil
1016 })
1017 if isFirst && maxStreams == nil {
1018 maxStreams = new(uint32)
1019 *maxStreams = math.MaxUint32
1020 }
1021 sf := &incomingSettings{
1022 ss: ss,
1023 }
1024 if maxStreams != nil {
1025 updateStreamQuota := func() {
1026 delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
1027 t.maxConcurrentStreams = *maxStreams
1028 t.streamQuota += delta
1029 if delta > 0 && t.waitingStreams > 0 {
1030 close(t.streamsQuotaAvailable) // wake all of them up.
1031 t.streamsQuotaAvailable = make(chan struct{}, 1)
1032 }
1033 }
1034 updateFuncs = append(updateFuncs, updateStreamQuota)
1035 }
1036 t.controlBuf.executeAndPut(func(interface{}) bool {
1037 for _, f := range updateFuncs {
1038 f()
1039 }
1040 return true
1041 }, sf)
1042}
1043
1044func (t *http2Client) handlePing(f *http2.PingFrame) {
1045 if f.IsAck() {
1046 // Maybe it's a BDP ping.
1047 if t.bdpEst != nil {
1048 t.bdpEst.calculate(f.Data)
1049 }
1050 return
1051 }
1052 pingAck := &ping{ack: true}
1053 copy(pingAck.data[:], f.Data[:])
1054 t.controlBuf.put(pingAck)
1055}
1056
1057func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
1058 t.mu.Lock()
1059 if t.state == closing {
1060 t.mu.Unlock()
1061 return
1062 }
1063 if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
1064 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
1065 }
1066 id := f.LastStreamID
1067 if id > 0 && id%2 != 1 {
1068 t.mu.Unlock()
1069 t.Close()
1070 return
1071 }
1072 // A client can receive multiple GoAways from the server (see
1073 // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
1074 // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
1075 // sent after an RTT delay with the ID of the last stream the server will
1076 // process.
1077 //
1078 // Therefore, when we get the first GoAway we don't necessarily close any
1079 // streams. While in case of second GoAway we close all streams created after
1080 // the GoAwayId. This way streams that were in-flight while the GoAway from
1081 // server was being sent don't get killed.
1082 select {
1083 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1084 // If there are multiple GoAways the first one should always have an ID greater than the following ones.
1085 if id > t.prevGoAwayID {
1086 t.mu.Unlock()
1087 t.Close()
1088 return
1089 }
1090 default:
1091 t.setGoAwayReason(f)
1092 close(t.goAway)
1093 t.controlBuf.put(&incomingGoAway{})
1094 // Notify the clientconn about the GOAWAY before we set the state to
1095 // draining, to allow the client to stop attempting to create streams
1096 // before disallowing new streams on this connection.
1097 t.onGoAway(t.goAwayReason)
1098 t.state = draining
1099 }
1100 // All streams with IDs greater than the GoAwayId
1101 // and smaller than the previous GoAway ID should be killed.
1102 upperLimit := t.prevGoAwayID
1103 if upperLimit == 0 { // This is the first GoAway Frame.
1104 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1105 }
1106 for streamID, stream := range t.activeStreams {
1107 if streamID > id && streamID <= upperLimit {
1108 // The stream was unprocessed by the server.
1109 atomic.StoreUint32(&stream.unprocessed, 1)
1110 t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1111 }
1112 }
1113 t.prevGoAwayID = id
1114 active := len(t.activeStreams)
1115 t.mu.Unlock()
1116 if active == 0 {
1117 t.Close()
1118 }
1119}
1120
1121// setGoAwayReason sets the value of t.goAwayReason based
1122// on the GoAway frame received.
1123// It expects a lock on transport's mutext to be held by
1124// the caller.
1125func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1126 t.goAwayReason = GoAwayNoReason
1127 switch f.ErrCode {
1128 case http2.ErrCodeEnhanceYourCalm:
1129 if string(f.DebugData()) == "too_many_pings" {
1130 t.goAwayReason = GoAwayTooManyPings
1131 }
1132 }
1133}
1134
1135func (t *http2Client) GetGoAwayReason() GoAwayReason {
1136 t.mu.Lock()
1137 defer t.mu.Unlock()
1138 return t.goAwayReason
1139}
1140
1141func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1142 t.controlBuf.put(&incomingWindowUpdate{
1143 streamID: f.Header().StreamID,
1144 increment: f.Increment,
1145 })
1146}
1147
1148// operateHeaders takes action on the decoded headers.
1149func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
Don Newtone0d34a82019-11-14 10:58:06 -05001150 s := t.getStream(frame)
1151 if s == nil {
Don Newton98fd8812019-09-23 15:15:02 -04001152 return
1153 }
1154 endStream := frame.StreamEnded()
1155 atomic.StoreUint32(&s.bytesReceived, 1)
1156 initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
1157
1158 if !initialHeader && !endStream {
1159 // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
1160 st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
1161 t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
1162 return
1163 }
1164
1165 state := &decodeState{}
1166 // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
1167 state.data.isGRPC = !initialHeader
1168 if err := state.decodeHeader(frame); err != nil {
1169 t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
1170 return
1171 }
1172
1173 isHeader := false
1174 defer func() {
1175 if t.statsHandler != nil {
1176 if isHeader {
1177 inHeader := &stats.InHeader{
1178 Client: true,
1179 WireLength: int(frame.Header().Length),
1180 }
1181 t.statsHandler.HandleRPC(s.ctx, inHeader)
1182 } else {
1183 inTrailer := &stats.InTrailer{
1184 Client: true,
1185 WireLength: int(frame.Header().Length),
1186 }
1187 t.statsHandler.HandleRPC(s.ctx, inTrailer)
1188 }
1189 }
1190 }()
1191
1192 // If headerChan hasn't been closed yet
1193 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
Don Newtone0d34a82019-11-14 10:58:06 -05001194 s.headerValid = true
Don Newton98fd8812019-09-23 15:15:02 -04001195 if !endStream {
1196 // HEADERS frame block carries a Response-Headers.
1197 isHeader = true
1198 // These values can be set without any synchronization because
1199 // stream goroutine will read it only after seeing a closed
1200 // headerChan which we'll close after setting this.
1201 s.recvCompress = state.data.encoding
1202 if len(state.data.mdata) > 0 {
1203 s.header = state.data.mdata
1204 }
1205 } else {
1206 // HEADERS frame block carries a Trailers-Only.
1207 s.noHeaders = true
1208 }
1209 close(s.headerChan)
1210 }
1211
1212 if !endStream {
1213 return
1214 }
1215
1216 // if client received END_STREAM from server while stream was still active, send RST_STREAM
1217 rst := s.getState() == streamActive
1218 t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
1219}
1220
1221// reader runs as a separate goroutine in charge of reading data from network
1222// connection.
1223//
1224// TODO(zhaoq): currently one reader per transport. Investigate whether this is
1225// optimal.
1226// TODO(zhaoq): Check the validity of the incoming frame sequence.
1227func (t *http2Client) reader() {
1228 defer close(t.readerDone)
1229 // Check the validity of server preface.
1230 frame, err := t.framer.fr.ReadFrame()
1231 if err != nil {
1232 t.Close() // this kicks off resetTransport, so must be last before return
1233 return
1234 }
1235 t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
1236 if t.keepaliveEnabled {
Don Newtone0d34a82019-11-14 10:58:06 -05001237 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
Don Newton98fd8812019-09-23 15:15:02 -04001238 }
1239 sf, ok := frame.(*http2.SettingsFrame)
1240 if !ok {
1241 t.Close() // this kicks off resetTransport, so must be last before return
1242 return
1243 }
1244 t.onPrefaceReceipt()
1245 t.handleSettings(sf, true)
1246
1247 // loop to keep reading incoming messages on this transport.
1248 for {
1249 t.controlBuf.throttle()
1250 frame, err := t.framer.fr.ReadFrame()
1251 if t.keepaliveEnabled {
Don Newtone0d34a82019-11-14 10:58:06 -05001252 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
Don Newton98fd8812019-09-23 15:15:02 -04001253 }
1254 if err != nil {
1255 // Abort an active stream if the http2.Framer returns a
1256 // http2.StreamError. This can happen only if the server's response
1257 // is malformed http2.
1258 if se, ok := err.(http2.StreamError); ok {
1259 t.mu.Lock()
1260 s := t.activeStreams[se.StreamID]
1261 t.mu.Unlock()
1262 if s != nil {
1263 // use error detail to provide better err message
1264 code := http2ErrConvTab[se.Code]
1265 msg := t.framer.fr.ErrorDetail().Error()
1266 t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
1267 }
1268 continue
1269 } else {
1270 // Transport error.
1271 t.Close()
1272 return
1273 }
1274 }
1275 switch frame := frame.(type) {
1276 case *http2.MetaHeadersFrame:
1277 t.operateHeaders(frame)
1278 case *http2.DataFrame:
1279 t.handleData(frame)
1280 case *http2.RSTStreamFrame:
1281 t.handleRSTStream(frame)
1282 case *http2.SettingsFrame:
1283 t.handleSettings(frame, false)
1284 case *http2.PingFrame:
1285 t.handlePing(frame)
1286 case *http2.GoAwayFrame:
1287 t.handleGoAway(frame)
1288 case *http2.WindowUpdateFrame:
1289 t.handleWindowUpdate(frame)
1290 default:
1291 errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1292 }
1293 }
1294}
1295
Don Newtone0d34a82019-11-14 10:58:06 -05001296func minTime(a, b time.Duration) time.Duration {
1297 if a < b {
1298 return a
1299 }
1300 return b
1301}
1302
Don Newton98fd8812019-09-23 15:15:02 -04001303// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1304func (t *http2Client) keepalive() {
1305 p := &ping{data: [8]byte{}}
Don Newtone0d34a82019-11-14 10:58:06 -05001306 // True iff a ping has been sent, and no data has been received since then.
1307 outstandingPing := false
1308 // Amount of time remaining before which we should receive an ACK for the
1309 // last sent ping.
1310 timeoutLeft := time.Duration(0)
1311 // Records the last value of t.lastRead before we go block on the timer.
1312 // This is required to check for read activity since then.
1313 prevNano := time.Now().UnixNano()
Don Newton98fd8812019-09-23 15:15:02 -04001314 timer := time.NewTimer(t.kp.Time)
1315 for {
1316 select {
1317 case <-timer.C:
Don Newtone0d34a82019-11-14 10:58:06 -05001318 lastRead := atomic.LoadInt64(&t.lastRead)
1319 if lastRead > prevNano {
1320 // There has been read activity since the last time we were here.
1321 outstandingPing = false
1322 // Next timer should fire at kp.Time seconds from lastRead time.
1323 timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1324 prevNano = lastRead
Don Newton98fd8812019-09-23 15:15:02 -04001325 continue
1326 }
Don Newtone0d34a82019-11-14 10:58:06 -05001327 if outstandingPing && timeoutLeft <= 0 {
1328 t.Close()
1329 return
1330 }
Don Newton98fd8812019-09-23 15:15:02 -04001331 t.mu.Lock()
Don Newtone0d34a82019-11-14 10:58:06 -05001332 if t.state == closing {
1333 // If the transport is closing, we should exit from the
1334 // keepalive goroutine here. If not, we could have a race
1335 // between the call to Signal() from Close() and the call to
1336 // Wait() here, whereby the keepalive goroutine ends up
1337 // blocking on the condition variable which will never be
1338 // signalled again.
1339 t.mu.Unlock()
1340 return
1341 }
Don Newton98fd8812019-09-23 15:15:02 -04001342 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
Don Newtone0d34a82019-11-14 10:58:06 -05001343 // If a ping was sent out previously (because there were active
1344 // streams at that point) which wasn't acked and its timeout
1345 // hadn't fired, but we got here and are about to go dormant,
1346 // we should make sure that we unconditionally send a ping once
1347 // we awaken.
1348 outstandingPing = false
1349 t.kpDormant = true
1350 t.kpDormancyCond.Wait()
1351 }
1352 t.kpDormant = false
1353 t.mu.Unlock()
1354
1355 // We get here either because we were dormant and a new stream was
1356 // created which unblocked the Wait() call, or because the
1357 // keepalive timer expired. In both cases, we need to send a ping.
1358 if !outstandingPing {
Don Newton98fd8812019-09-23 15:15:02 -04001359 if channelz.IsOn() {
1360 atomic.AddInt64(&t.czData.kpCount, 1)
1361 }
Don Newton98fd8812019-09-23 15:15:02 -04001362 t.controlBuf.put(p)
Don Newtone0d34a82019-11-14 10:58:06 -05001363 timeoutLeft = t.kp.Timeout
1364 outstandingPing = true
Don Newton98fd8812019-09-23 15:15:02 -04001365 }
Don Newtone0d34a82019-11-14 10:58:06 -05001366 // The amount of time to sleep here is the minimum of kp.Time and
1367 // timeoutLeft. This will ensure that we wait only for kp.Time
1368 // before sending out the next ping (for cases where the ping is
1369 // acked).
1370 sleepDuration := minTime(t.kp.Time, timeoutLeft)
1371 timeoutLeft -= sleepDuration
1372 prevNano = lastRead
1373 timer.Reset(sleepDuration)
Don Newton98fd8812019-09-23 15:15:02 -04001374 case <-t.ctx.Done():
1375 if !timer.Stop() {
1376 <-timer.C
1377 }
1378 return
1379 }
1380 }
1381}
1382
1383func (t *http2Client) Error() <-chan struct{} {
1384 return t.ctx.Done()
1385}
1386
1387func (t *http2Client) GoAway() <-chan struct{} {
1388 return t.goAway
1389}
1390
1391func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
1392 s := channelz.SocketInternalMetric{
1393 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1394 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1395 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1396 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1397 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1398 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1399 LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1400 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1401 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1402 LocalFlowControlWindow: int64(t.fc.getSize()),
1403 SocketOptions: channelz.GetSocketOption(t.conn),
1404 LocalAddr: t.localAddr,
1405 RemoteAddr: t.remoteAddr,
1406 // RemoteName :
1407 }
1408 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1409 s.Security = au.GetSecurityValue()
1410 }
1411 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1412 return &s
1413}
1414
1415func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
1416
1417func (t *http2Client) IncrMsgSent() {
1418 atomic.AddInt64(&t.czData.msgSent, 1)
1419 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1420}
1421
1422func (t *http2Client) IncrMsgRecv() {
1423 atomic.AddInt64(&t.czData.msgRecv, 1)
1424 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1425}
1426
1427func (t *http2Client) getOutFlowWindow() int64 {
1428 resp := make(chan uint32, 1)
1429 timer := time.NewTimer(time.Second)
1430 defer timer.Stop()
1431 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1432 select {
1433 case sz := <-resp:
1434 return int64(sz)
1435 case <-t.ctxDone:
1436 return -1
1437 case <-timer.C:
1438 return -2
1439 }
1440}