blob: 39208b146e093d343aefae2401334b2264ca903c [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "context"
23 "fmt"
24 "io"
25 "math"
26 "net"
27 "strconv"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "golang.org/x/net/http2"
34 "golang.org/x/net/http2/hpack"
35
36 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/internal/channelz"
39 "google.golang.org/grpc/internal/syscall"
40 "google.golang.org/grpc/keepalive"
41 "google.golang.org/grpc/metadata"
42 "google.golang.org/grpc/peer"
43 "google.golang.org/grpc/stats"
44 "google.golang.org/grpc/status"
45)
46
47// http2Client implements the ClientTransport interface with HTTP2.
48type http2Client struct {
49 ctx context.Context
50 cancel context.CancelFunc
51 ctxDone <-chan struct{} // Cache the ctx.Done() chan.
52 userAgent string
53 md interface{}
54 conn net.Conn // underlying communication channel
55 loopy *loopyWriter
56 remoteAddr net.Addr
57 localAddr net.Addr
58 authInfo credentials.AuthInfo // auth info about the connection
59
60 readerDone chan struct{} // sync point to enable testing.
61 writerDone chan struct{} // sync point to enable testing.
62 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
63 // that the server sent GoAway on this transport.
64 goAway chan struct{}
65 // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
66 awakenKeepalive chan struct{}
67
68 framer *framer
69 // controlBuf delivers all the control related tasks (e.g., window
70 // updates, reset streams, and various settings) to the controller.
71 controlBuf *controlBuffer
72 fc *trInFlow
73 // The scheme used: https if TLS is on, http otherwise.
74 scheme string
75
76 isSecure bool
77
78 perRPCCreds []credentials.PerRPCCredentials
79
80 // Boolean to keep track of reading activity on transport.
81 // 1 is true and 0 is false.
82 activity uint32 // Accessed atomically.
83 kp keepalive.ClientParameters
84 keepaliveEnabled bool
85
86 statsHandler stats.Handler
87
88 initialWindowSize int32
89
90 // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
91 maxSendHeaderListSize *uint32
92
93 bdpEst *bdpEstimator
94 // onSuccess is a callback that client transport calls upon
95 // receiving server preface to signal that a succefull HTTP2
96 // connection was established.
97 onSuccess func()
98
99 maxConcurrentStreams uint32
100 streamQuota int64
101 streamsQuotaAvailable chan struct{}
102 waitingStreams uint32
103 nextID uint32
104
105 mu sync.Mutex // guard the following variables
106 state transportState
107 activeStreams map[uint32]*Stream
108 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
109 prevGoAwayID uint32
110 // goAwayReason records the http2.ErrCode and debug data received with the
111 // GoAway frame.
112 goAwayReason GoAwayReason
113
114 // Fields below are for channelz metric collection.
115 channelzID int64 // channelz unique identification number
116 czData *channelzData
117
118 onGoAway func(GoAwayReason)
119 onClose func()
120}
121
122func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
123 if fn != nil {
124 return fn(ctx, addr)
125 }
126 return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
127}
128
129func isTemporary(err error) bool {
130 switch err := err.(type) {
131 case interface {
132 Temporary() bool
133 }:
134 return err.Temporary()
135 case interface {
136 Timeout() bool
137 }:
138 // Timeouts may be resolved upon retry, and are thus treated as
139 // temporary.
140 return err.Timeout()
141 }
142 return true
143}
144
145// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
146// and starts to receive messages on it. Non-nil error returns if construction
147// fails.
148func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
149 scheme := "http"
150 ctx, cancel := context.WithCancel(ctx)
151 defer func() {
152 if err != nil {
153 cancel()
154 }
155 }()
156
157 conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
158 if err != nil {
159 if opts.FailOnNonTempDialError {
160 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
161 }
162 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
163 }
164 // Any further errors will close the underlying connection
165 defer func(conn net.Conn) {
166 if err != nil {
167 conn.Close()
168 }
169 }(conn)
170 kp := opts.KeepaliveParams
171 // Validate keepalive parameters.
172 if kp.Time == 0 {
173 kp.Time = defaultClientKeepaliveTime
174 }
175 if kp.Timeout == 0 {
176 kp.Timeout = defaultClientKeepaliveTimeout
177 }
178 keepaliveEnabled := false
179 if kp.Time != infinity {
180 if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
181 return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
182 }
183 keepaliveEnabled = true
184 }
185 var (
186 isSecure bool
187 authInfo credentials.AuthInfo
188 )
189 transportCreds := opts.TransportCredentials
190 perRPCCreds := opts.PerRPCCredentials
191
192 if b := opts.CredsBundle; b != nil {
193 if t := b.TransportCredentials(); t != nil {
194 transportCreds = t
195 }
196 if t := b.PerRPCCredentials(); t != nil {
197 perRPCCreds = append(perRPCCreds, t)
198 }
199 }
200 if transportCreds != nil {
201 scheme = "https"
202 conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
203 if err != nil {
204 return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
205 }
206 isSecure = true
207 }
208 dynamicWindow := true
209 icwz := int32(initialWindowSize)
210 if opts.InitialConnWindowSize >= defaultWindowSize {
211 icwz = opts.InitialConnWindowSize
212 dynamicWindow = false
213 }
214 writeBufSize := opts.WriteBufferSize
215 readBufSize := opts.ReadBufferSize
216 maxHeaderListSize := defaultClientMaxHeaderListSize
217 if opts.MaxHeaderListSize != nil {
218 maxHeaderListSize = *opts.MaxHeaderListSize
219 }
220 t := &http2Client{
221 ctx: ctx,
222 ctxDone: ctx.Done(), // Cache Done chan.
223 cancel: cancel,
224 userAgent: opts.UserAgent,
225 md: addr.Metadata,
226 conn: conn,
227 remoteAddr: conn.RemoteAddr(),
228 localAddr: conn.LocalAddr(),
229 authInfo: authInfo,
230 readerDone: make(chan struct{}),
231 writerDone: make(chan struct{}),
232 goAway: make(chan struct{}),
233 awakenKeepalive: make(chan struct{}, 1),
234 framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
235 fc: &trInFlow{limit: uint32(icwz)},
236 scheme: scheme,
237 activeStreams: make(map[uint32]*Stream),
238 isSecure: isSecure,
239 perRPCCreds: perRPCCreds,
240 kp: kp,
241 statsHandler: opts.StatsHandler,
242 initialWindowSize: initialWindowSize,
243 onSuccess: onSuccess,
244 nextID: 1,
245 maxConcurrentStreams: defaultMaxStreamsClient,
246 streamQuota: defaultMaxStreamsClient,
247 streamsQuotaAvailable: make(chan struct{}, 1),
248 czData: new(channelzData),
249 onGoAway: onGoAway,
250 onClose: onClose,
251 keepaliveEnabled: keepaliveEnabled,
252 }
253 t.controlBuf = newControlBuffer(t.ctxDone)
254 if opts.InitialWindowSize >= defaultWindowSize {
255 t.initialWindowSize = opts.InitialWindowSize
256 dynamicWindow = false
257 }
258 if dynamicWindow {
259 t.bdpEst = &bdpEstimator{
260 bdp: initialWindowSize,
261 updateFlowControl: t.updateFlowControl,
262 }
263 }
264 // Make sure awakenKeepalive can't be written upon.
265 // keepalive routine will make it writable, if need be.
266 t.awakenKeepalive <- struct{}{}
267 if t.statsHandler != nil {
268 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
269 RemoteAddr: t.remoteAddr,
270 LocalAddr: t.localAddr,
271 })
272 connBegin := &stats.ConnBegin{
273 Client: true,
274 }
275 t.statsHandler.HandleConn(t.ctx, connBegin)
276 }
277 if channelz.IsOn() {
278 t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
279 }
280 if t.keepaliveEnabled {
281 go t.keepalive()
282 }
283 // Start the reader goroutine for incoming message. Each transport has
284 // a dedicated goroutine which reads HTTP2 frame from network. Then it
285 // dispatches the frame to the corresponding stream entity.
286 go t.reader()
287
288 // Send connection preface to server.
289 n, err := t.conn.Write(clientPreface)
290 if err != nil {
291 t.Close()
292 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
293 }
294 if n != len(clientPreface) {
295 t.Close()
296 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
297 }
298 var ss []http2.Setting
299
300 if t.initialWindowSize != defaultWindowSize {
301 ss = append(ss, http2.Setting{
302 ID: http2.SettingInitialWindowSize,
303 Val: uint32(t.initialWindowSize),
304 })
305 }
306 if opts.MaxHeaderListSize != nil {
307 ss = append(ss, http2.Setting{
308 ID: http2.SettingMaxHeaderListSize,
309 Val: *opts.MaxHeaderListSize,
310 })
311 }
312 err = t.framer.fr.WriteSettings(ss...)
313 if err != nil {
314 t.Close()
315 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
316 }
317 // Adjust the connection flow control window if needed.
318 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
319 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
320 t.Close()
321 return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
322 }
323 }
324
325 t.framer.writer.Flush()
326 go func() {
327 t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
328 err := t.loopy.run()
329 if err != nil {
330 errorf("transport: loopyWriter.run returning. Err: %v", err)
331 }
332 // If it's a connection error, let reader goroutine handle it
333 // since there might be data in the buffers.
334 if _, ok := err.(net.Error); !ok {
335 t.conn.Close()
336 }
337 close(t.writerDone)
338 }()
339 return t, nil
340}
341
342func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
343 // TODO(zhaoq): Handle uint32 overflow of Stream.id.
344 s := &Stream{
345 done: make(chan struct{}),
346 method: callHdr.Method,
347 sendCompress: callHdr.SendCompress,
348 buf: newRecvBuffer(),
349 headerChan: make(chan struct{}),
350 contentSubtype: callHdr.ContentSubtype,
351 }
352 s.wq = newWriteQuota(defaultWriteQuota, s.done)
353 s.requestRead = func(n int) {
354 t.adjustWindow(s, uint32(n))
355 }
356 // The client side stream context should have exactly the same life cycle with the user provided context.
357 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
358 // So we use the original context here instead of creating a copy.
359 s.ctx = ctx
360 s.trReader = &transportReader{
361 reader: &recvBufferReader{
362 ctx: s.ctx,
363 ctxDone: s.ctx.Done(),
364 recv: s.buf,
365 },
366 windowHandler: func(n int) {
367 t.updateWindow(s, uint32(n))
368 },
369 }
370 return s
371}
372
373func (t *http2Client) getPeer() *peer.Peer {
374 pr := &peer.Peer{
375 Addr: t.remoteAddr,
376 }
377 // Attach Auth info if there is any.
378 if t.authInfo != nil {
379 pr.AuthInfo = t.authInfo
380 }
381 return pr
382}
383
384func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
385 aud := t.createAudience(callHdr)
386 authData, err := t.getTrAuthData(ctx, aud)
387 if err != nil {
388 return nil, err
389 }
390 callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
391 if err != nil {
392 return nil, err
393 }
394 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
395 // first and create a slice of that exact size.
396 // Make the slice of certain predictable size to reduce allocations made by append.
397 hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
398 hfLen += len(authData) + len(callAuthData)
399 headerFields := make([]hpack.HeaderField, 0, hfLen)
400 headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
401 headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
402 headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
403 headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
404 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
405 headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
406 headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
407 if callHdr.PreviousAttempts > 0 {
408 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
409 }
410
411 if callHdr.SendCompress != "" {
412 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
413 }
414 if dl, ok := ctx.Deadline(); ok {
415 // Send out timeout regardless its value. The server can detect timeout context by itself.
416 // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
417 timeout := dl.Sub(time.Now())
418 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
419 }
420 for k, v := range authData {
421 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
422 }
423 for k, v := range callAuthData {
424 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
425 }
426 if b := stats.OutgoingTags(ctx); b != nil {
427 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
428 }
429 if b := stats.OutgoingTrace(ctx); b != nil {
430 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
431 }
432
433 if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
434 var k string
435 for _, vv := range added {
436 for i, v := range vv {
437 if i%2 == 0 {
438 k = v
439 continue
440 }
441 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
442 if isReservedHeader(k) {
443 continue
444 }
445 headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
446 }
447 }
448 for k, vv := range md {
449 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
450 if isReservedHeader(k) {
451 continue
452 }
453 for _, v := range vv {
454 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
455 }
456 }
457 }
458 if md, ok := t.md.(*metadata.MD); ok {
459 for k, vv := range *md {
460 if isReservedHeader(k) {
461 continue
462 }
463 for _, v := range vv {
464 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
465 }
466 }
467 }
468 return headerFields, nil
469}
470
471func (t *http2Client) createAudience(callHdr *CallHdr) string {
472 // Create an audience string only if needed.
473 if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
474 return ""
475 }
476 // Construct URI required to get auth request metadata.
477 // Omit port if it is the default one.
478 host := strings.TrimSuffix(callHdr.Host, ":443")
479 pos := strings.LastIndex(callHdr.Method, "/")
480 if pos == -1 {
481 pos = len(callHdr.Method)
482 }
483 return "https://" + host + callHdr.Method[:pos]
484}
485
486func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
487 authData := map[string]string{}
488 for _, c := range t.perRPCCreds {
489 data, err := c.GetRequestMetadata(ctx, audience)
490 if err != nil {
491 if _, ok := status.FromError(err); ok {
492 return nil, err
493 }
494
495 return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
496 }
497 for k, v := range data {
498 // Capital header names are illegal in HTTP/2.
499 k = strings.ToLower(k)
500 authData[k] = v
501 }
502 }
503 return authData, nil
504}
505
506func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
507 callAuthData := map[string]string{}
508 // Check if credentials.PerRPCCredentials were provided via call options.
509 // Note: if these credentials are provided both via dial options and call
510 // options, then both sets of credentials will be applied.
511 if callCreds := callHdr.Creds; callCreds != nil {
512 if !t.isSecure && callCreds.RequireTransportSecurity() {
513 return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
514 }
515 data, err := callCreds.GetRequestMetadata(ctx, audience)
516 if err != nil {
517 return nil, status.Errorf(codes.Internal, "transport: %v", err)
518 }
519 for k, v := range data {
520 // Capital header names are illegal in HTTP/2
521 k = strings.ToLower(k)
522 callAuthData[k] = v
523 }
524 }
525 return callAuthData, nil
526}
527
528// NewStream creates a stream and registers it into the transport as "active"
529// streams.
530func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
531 ctx = peer.NewContext(ctx, t.getPeer())
532 headerFields, err := t.createHeaderFields(ctx, callHdr)
533 if err != nil {
534 return nil, err
535 }
536 s := t.newStream(ctx, callHdr)
537 cleanup := func(err error) {
538 if s.swapState(streamDone) == streamDone {
539 // If it was already done, return.
540 return
541 }
542 // The stream was unprocessed by the server.
543 atomic.StoreUint32(&s.unprocessed, 1)
544 s.write(recvMsg{err: err})
545 close(s.done)
546 // If headerChan isn't closed, then close it.
547 if atomic.SwapUint32(&s.headerDone, 1) == 0 {
548 close(s.headerChan)
549 }
550
551 }
552 hdr := &headerFrame{
553 hf: headerFields,
554 endStream: false,
555 initStream: func(id uint32) (bool, error) {
556 t.mu.Lock()
557 if state := t.state; state != reachable {
558 t.mu.Unlock()
559 // Do a quick cleanup.
560 err := error(errStreamDrain)
561 if state == closing {
562 err = ErrConnClosing
563 }
564 cleanup(err)
565 return false, err
566 }
567 t.activeStreams[id] = s
568 if channelz.IsOn() {
569 atomic.AddInt64(&t.czData.streamsStarted, 1)
570 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
571 }
572 var sendPing bool
573 // If the number of active streams change from 0 to 1, then check if keepalive
574 // has gone dormant. If so, wake it up.
575 if len(t.activeStreams) == 1 && t.keepaliveEnabled {
576 select {
577 case t.awakenKeepalive <- struct{}{}:
578 sendPing = true
579 // Fill the awakenKeepalive channel again as this channel must be
580 // kept non-writable except at the point that the keepalive()
581 // goroutine is waiting either to be awaken or shutdown.
582 t.awakenKeepalive <- struct{}{}
583 default:
584 }
585 }
586 t.mu.Unlock()
587 return sendPing, nil
588 },
589 onOrphaned: cleanup,
590 wq: s.wq,
591 }
592 firstTry := true
593 var ch chan struct{}
594 checkForStreamQuota := func(it interface{}) bool {
595 if t.streamQuota <= 0 { // Can go negative if server decreases it.
596 if firstTry {
597 t.waitingStreams++
598 }
599 ch = t.streamsQuotaAvailable
600 return false
601 }
602 if !firstTry {
603 t.waitingStreams--
604 }
605 t.streamQuota--
606 h := it.(*headerFrame)
607 h.streamID = t.nextID
608 t.nextID += 2
609 s.id = h.streamID
610 s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
611 if t.streamQuota > 0 && t.waitingStreams > 0 {
612 select {
613 case t.streamsQuotaAvailable <- struct{}{}:
614 default:
615 }
616 }
617 return true
618 }
619 var hdrListSizeErr error
620 checkForHeaderListSize := func(it interface{}) bool {
621 if t.maxSendHeaderListSize == nil {
622 return true
623 }
624 hdrFrame := it.(*headerFrame)
625 var sz int64
626 for _, f := range hdrFrame.hf {
627 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
628 hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
629 return false
630 }
631 }
632 return true
633 }
634 for {
635 success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
636 if !checkForStreamQuota(it) {
637 return false
638 }
639 if !checkForHeaderListSize(it) {
640 return false
641 }
642 return true
643 }, hdr)
644 if err != nil {
645 return nil, err
646 }
647 if success {
648 break
649 }
650 if hdrListSizeErr != nil {
651 return nil, hdrListSizeErr
652 }
653 firstTry = false
654 select {
655 case <-ch:
656 case <-s.ctx.Done():
657 return nil, ContextErr(s.ctx.Err())
658 case <-t.goAway:
659 return nil, errStreamDrain
660 case <-t.ctx.Done():
661 return nil, ErrConnClosing
662 }
663 }
664 if t.statsHandler != nil {
665 outHeader := &stats.OutHeader{
666 Client: true,
667 FullMethod: callHdr.Method,
668 RemoteAddr: t.remoteAddr,
669 LocalAddr: t.localAddr,
670 Compression: callHdr.SendCompress,
671 }
672 t.statsHandler.HandleRPC(s.ctx, outHeader)
673 }
674 return s, nil
675}
676
677// CloseStream clears the footprint of a stream when the stream is not needed any more.
678// This must not be executed in reader's goroutine.
679func (t *http2Client) CloseStream(s *Stream, err error) {
680 var (
681 rst bool
682 rstCode http2.ErrCode
683 )
684 if err != nil {
685 rst = true
686 rstCode = http2.ErrCodeCancel
687 }
688 t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
689}
690
691func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
692 // Set stream status to done.
693 if s.swapState(streamDone) == streamDone {
694 // If it was already done, return. If multiple closeStream calls
695 // happen simultaneously, wait for the first to finish.
696 <-s.done
697 return
698 }
699 // status and trailers can be updated here without any synchronization because the stream goroutine will
700 // only read it after it sees an io.EOF error from read or write and we'll write those errors
701 // only after updating this.
702 s.status = st
703 if len(mdata) > 0 {
704 s.trailer = mdata
705 }
706 if err != nil {
707 // This will unblock reads eventually.
708 s.write(recvMsg{err: err})
709 }
710 // If headerChan isn't closed, then close it.
711 if atomic.SwapUint32(&s.headerDone, 1) == 0 {
712 s.noHeaders = true
713 close(s.headerChan)
714 }
715 cleanup := &cleanupStream{
716 streamID: s.id,
717 onWrite: func() {
718 t.mu.Lock()
719 if t.activeStreams != nil {
720 delete(t.activeStreams, s.id)
721 }
722 t.mu.Unlock()
723 if channelz.IsOn() {
724 if eosReceived {
725 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
726 } else {
727 atomic.AddInt64(&t.czData.streamsFailed, 1)
728 }
729 }
730 },
731 rst: rst,
732 rstCode: rstCode,
733 }
734 addBackStreamQuota := func(interface{}) bool {
735 t.streamQuota++
736 if t.streamQuota > 0 && t.waitingStreams > 0 {
737 select {
738 case t.streamsQuotaAvailable <- struct{}{}:
739 default:
740 }
741 }
742 return true
743 }
744 t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
745 // This will unblock write.
746 close(s.done)
747}
748
749// Close kicks off the shutdown process of the transport. This should be called
750// only once on a transport. Once it is called, the transport should not be
751// accessed any more.
752//
753// This method blocks until the addrConn that initiated this transport is
754// re-connected. This happens because t.onClose() begins reconnect logic at the
755// addrConn level and blocks until the addrConn is successfully connected.
756func (t *http2Client) Close() error {
757 t.mu.Lock()
758 // Make sure we only Close once.
759 if t.state == closing {
760 t.mu.Unlock()
761 return nil
762 }
763 t.state = closing
764 streams := t.activeStreams
765 t.activeStreams = nil
766 t.mu.Unlock()
767 t.controlBuf.finish()
768 t.cancel()
769 err := t.conn.Close()
770 if channelz.IsOn() {
771 channelz.RemoveEntry(t.channelzID)
772 }
773 // Notify all active streams.
774 for _, s := range streams {
775 t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
776 }
777 if t.statsHandler != nil {
778 connEnd := &stats.ConnEnd{
779 Client: true,
780 }
781 t.statsHandler.HandleConn(t.ctx, connEnd)
782 }
783 go t.onClose()
784 return err
785}
786
787// GracefulClose sets the state to draining, which prevents new streams from
788// being created and causes the transport to be closed when the last active
789// stream is closed. If there are no active streams, the transport is closed
790// immediately. This does nothing if the transport is already draining or
791// closing.
792func (t *http2Client) GracefulClose() error {
793 t.mu.Lock()
794 // Make sure we move to draining only from active.
795 if t.state == draining || t.state == closing {
796 t.mu.Unlock()
797 return nil
798 }
799 t.state = draining
800 active := len(t.activeStreams)
801 t.mu.Unlock()
802 if active == 0 {
803 return t.Close()
804 }
805 t.controlBuf.put(&incomingGoAway{})
806 return nil
807}
808
809// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
810// should proceed only if Write returns nil.
811func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
812 if opts.Last {
813 // If it's the last message, update stream state.
814 if !s.compareAndSwapState(streamActive, streamWriteDone) {
815 return errStreamDone
816 }
817 } else if s.getState() != streamActive {
818 return errStreamDone
819 }
820 df := &dataFrame{
821 streamID: s.id,
822 endStream: opts.Last,
823 }
824 if hdr != nil || data != nil { // If it's not an empty data frame.
825 // Add some data to grpc message header so that we can equally
826 // distribute bytes across frames.
827 emptyLen := http2MaxFrameLen - len(hdr)
828 if emptyLen > len(data) {
829 emptyLen = len(data)
830 }
831 hdr = append(hdr, data[:emptyLen]...)
832 data = data[emptyLen:]
833 df.h, df.d = hdr, data
834 // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
835 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
836 return err
837 }
838 }
839 return t.controlBuf.put(df)
840}
841
842func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
843 t.mu.Lock()
844 defer t.mu.Unlock()
845 s, ok := t.activeStreams[f.Header().StreamID]
846 return s, ok
847}
848
849// adjustWindow sends out extra window update over the initial window size
850// of stream if the application is requesting data larger in size than
851// the window.
852func (t *http2Client) adjustWindow(s *Stream, n uint32) {
853 if w := s.fc.maybeAdjust(n); w > 0 {
854 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
855 }
856}
857
858// updateWindow adjusts the inbound quota for the stream.
859// Window updates will be sent out when the cumulative quota
860// exceeds the corresponding threshold.
861func (t *http2Client) updateWindow(s *Stream, n uint32) {
862 if w := s.fc.onRead(n); w > 0 {
863 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
864 }
865}
866
867// updateFlowControl updates the incoming flow control windows
868// for the transport and the stream based on the current bdp
869// estimation.
870func (t *http2Client) updateFlowControl(n uint32) {
871 t.mu.Lock()
872 for _, s := range t.activeStreams {
873 s.fc.newLimit(n)
874 }
875 t.mu.Unlock()
876 updateIWS := func(interface{}) bool {
877 t.initialWindowSize = int32(n)
878 return true
879 }
880 t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
881 t.controlBuf.put(&outgoingSettings{
882 ss: []http2.Setting{
883 {
884 ID: http2.SettingInitialWindowSize,
885 Val: n,
886 },
887 },
888 })
889}
890
891func (t *http2Client) handleData(f *http2.DataFrame) {
892 size := f.Header().Length
893 var sendBDPPing bool
894 if t.bdpEst != nil {
895 sendBDPPing = t.bdpEst.add(size)
896 }
897 // Decouple connection's flow control from application's read.
898 // An update on connection's flow control should not depend on
899 // whether user application has read the data or not. Such a
900 // restriction is already imposed on the stream's flow control,
901 // and therefore the sender will be blocked anyways.
902 // Decoupling the connection flow control will prevent other
903 // active(fast) streams from starving in presence of slow or
904 // inactive streams.
905 //
906 if w := t.fc.onData(size); w > 0 {
907 t.controlBuf.put(&outgoingWindowUpdate{
908 streamID: 0,
909 increment: w,
910 })
911 }
912 if sendBDPPing {
913 // Avoid excessive ping detection (e.g. in an L7 proxy)
914 // by sending a window update prior to the BDP ping.
915
916 if w := t.fc.reset(); w > 0 {
917 t.controlBuf.put(&outgoingWindowUpdate{
918 streamID: 0,
919 increment: w,
920 })
921 }
922
923 t.controlBuf.put(bdpPing)
924 }
925 // Select the right stream to dispatch.
926 s, ok := t.getStream(f)
927 if !ok {
928 return
929 }
930 if size > 0 {
931 if err := s.fc.onData(size); err != nil {
932 t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
933 return
934 }
935 if f.Header().Flags.Has(http2.FlagDataPadded) {
936 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
937 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
938 }
939 }
940 // TODO(bradfitz, zhaoq): A copy is required here because there is no
941 // guarantee f.Data() is consumed before the arrival of next frame.
942 // Can this copy be eliminated?
943 if len(f.Data()) > 0 {
944 data := make([]byte, len(f.Data()))
945 copy(data, f.Data())
946 s.write(recvMsg{data: data})
947 }
948 }
949 // The server has closed the stream without sending trailers. Record that
950 // the read direction is closed, and set the status appropriately.
951 if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
952 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
953 }
954}
955
956func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
957 s, ok := t.getStream(f)
958 if !ok {
959 return
960 }
961 if f.ErrCode == http2.ErrCodeRefusedStream {
962 // The stream was unprocessed by the server.
963 atomic.StoreUint32(&s.unprocessed, 1)
964 }
965 statusCode, ok := http2ErrConvTab[f.ErrCode]
966 if !ok {
967 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
968 statusCode = codes.Unknown
969 }
970 if statusCode == codes.Canceled {
971 // Our deadline was already exceeded, and that was likely the cause of
972 // this cancelation. Alter the status code accordingly.
973 if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
974 statusCode = codes.DeadlineExceeded
975 }
976 }
977 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
978}
979
980func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
981 if f.IsAck() {
982 return
983 }
984 var maxStreams *uint32
985 var ss []http2.Setting
986 var updateFuncs []func()
987 f.ForeachSetting(func(s http2.Setting) error {
988 switch s.ID {
989 case http2.SettingMaxConcurrentStreams:
990 maxStreams = new(uint32)
991 *maxStreams = s.Val
992 case http2.SettingMaxHeaderListSize:
993 updateFuncs = append(updateFuncs, func() {
994 t.maxSendHeaderListSize = new(uint32)
995 *t.maxSendHeaderListSize = s.Val
996 })
997 default:
998 ss = append(ss, s)
999 }
1000 return nil
1001 })
1002 if isFirst && maxStreams == nil {
1003 maxStreams = new(uint32)
1004 *maxStreams = math.MaxUint32
1005 }
1006 sf := &incomingSettings{
1007 ss: ss,
1008 }
1009 if maxStreams != nil {
1010 updateStreamQuota := func() {
1011 delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
1012 t.maxConcurrentStreams = *maxStreams
1013 t.streamQuota += delta
1014 if delta > 0 && t.waitingStreams > 0 {
1015 close(t.streamsQuotaAvailable) // wake all of them up.
1016 t.streamsQuotaAvailable = make(chan struct{}, 1)
1017 }
1018 }
1019 updateFuncs = append(updateFuncs, updateStreamQuota)
1020 }
1021 t.controlBuf.executeAndPut(func(interface{}) bool {
1022 for _, f := range updateFuncs {
1023 f()
1024 }
1025 return true
1026 }, sf)
1027}
1028
1029func (t *http2Client) handlePing(f *http2.PingFrame) {
1030 if f.IsAck() {
1031 // Maybe it's a BDP ping.
1032 if t.bdpEst != nil {
1033 t.bdpEst.calculate(f.Data)
1034 }
1035 return
1036 }
1037 pingAck := &ping{ack: true}
1038 copy(pingAck.data[:], f.Data[:])
1039 t.controlBuf.put(pingAck)
1040}
1041
1042func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
1043 t.mu.Lock()
1044 if t.state == closing {
1045 t.mu.Unlock()
1046 return
1047 }
1048 if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
1049 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
1050 }
1051 id := f.LastStreamID
1052 if id > 0 && id%2 != 1 {
1053 t.mu.Unlock()
1054 t.Close()
1055 return
1056 }
1057 // A client can receive multiple GoAways from the server (see
1058 // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
1059 // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
1060 // sent after an RTT delay with the ID of the last stream the server will
1061 // process.
1062 //
1063 // Therefore, when we get the first GoAway we don't necessarily close any
1064 // streams. While in case of second GoAway we close all streams created after
1065 // the GoAwayId. This way streams that were in-flight while the GoAway from
1066 // server was being sent don't get killed.
1067 select {
1068 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1069 // If there are multiple GoAways the first one should always have an ID greater than the following ones.
1070 if id > t.prevGoAwayID {
1071 t.mu.Unlock()
1072 t.Close()
1073 return
1074 }
1075 default:
1076 t.setGoAwayReason(f)
1077 close(t.goAway)
1078 t.state = draining
1079 t.controlBuf.put(&incomingGoAway{})
1080
1081 // This has to be a new goroutine because we're still using the current goroutine to read in the transport.
1082 t.onGoAway(t.goAwayReason)
1083 }
1084 // All streams with IDs greater than the GoAwayId
1085 // and smaller than the previous GoAway ID should be killed.
1086 upperLimit := t.prevGoAwayID
1087 if upperLimit == 0 { // This is the first GoAway Frame.
1088 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1089 }
1090 for streamID, stream := range t.activeStreams {
1091 if streamID > id && streamID <= upperLimit {
1092 // The stream was unprocessed by the server.
1093 atomic.StoreUint32(&stream.unprocessed, 1)
1094 t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1095 }
1096 }
1097 t.prevGoAwayID = id
1098 active := len(t.activeStreams)
1099 t.mu.Unlock()
1100 if active == 0 {
1101 t.Close()
1102 }
1103}
1104
1105// setGoAwayReason sets the value of t.goAwayReason based
1106// on the GoAway frame received.
1107// It expects a lock on transport's mutext to be held by
1108// the caller.
1109func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1110 t.goAwayReason = GoAwayNoReason
1111 switch f.ErrCode {
1112 case http2.ErrCodeEnhanceYourCalm:
1113 if string(f.DebugData()) == "too_many_pings" {
1114 t.goAwayReason = GoAwayTooManyPings
1115 }
1116 }
1117}
1118
1119func (t *http2Client) GetGoAwayReason() GoAwayReason {
1120 t.mu.Lock()
1121 defer t.mu.Unlock()
1122 return t.goAwayReason
1123}
1124
1125func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1126 t.controlBuf.put(&incomingWindowUpdate{
1127 streamID: f.Header().StreamID,
1128 increment: f.Increment,
1129 })
1130}
1131
1132// operateHeaders takes action on the decoded headers.
1133func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1134 s, ok := t.getStream(frame)
1135 if !ok {
1136 return
1137 }
1138 atomic.StoreUint32(&s.bytesReceived, 1)
1139 var state decodeState
1140 if err := state.decodeHeader(frame); err != nil {
1141 t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false)
1142 // Something wrong. Stops reading even when there is remaining.
1143 return
1144 }
1145
1146 endStream := frame.StreamEnded()
1147 var isHeader bool
1148 defer func() {
1149 if t.statsHandler != nil {
1150 if isHeader {
1151 inHeader := &stats.InHeader{
1152 Client: true,
1153 WireLength: int(frame.Header().Length),
1154 }
1155 t.statsHandler.HandleRPC(s.ctx, inHeader)
1156 } else {
1157 inTrailer := &stats.InTrailer{
1158 Client: true,
1159 WireLength: int(frame.Header().Length),
1160 }
1161 t.statsHandler.HandleRPC(s.ctx, inTrailer)
1162 }
1163 }
1164 }()
1165 // If headers haven't been received yet.
1166 if atomic.SwapUint32(&s.headerDone, 1) == 0 {
1167 if !endStream {
1168 // Headers frame is not actually a trailers-only frame.
1169 isHeader = true
1170 // These values can be set without any synchronization because
1171 // stream goroutine will read it only after seeing a closed
1172 // headerChan which we'll close after setting this.
1173 s.recvCompress = state.encoding
1174 if len(state.mdata) > 0 {
1175 s.header = state.mdata
1176 }
1177 } else {
1178 s.noHeaders = true
1179 }
1180 close(s.headerChan)
1181 }
1182 if !endStream {
1183 return
1184 }
1185 // if client received END_STREAM from server while stream was still active, send RST_STREAM
1186 rst := s.getState() == streamActive
1187 t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.mdata, true)
1188}
1189
1190// reader runs as a separate goroutine in charge of reading data from network
1191// connection.
1192//
1193// TODO(zhaoq): currently one reader per transport. Investigate whether this is
1194// optimal.
1195// TODO(zhaoq): Check the validity of the incoming frame sequence.
1196func (t *http2Client) reader() {
1197 defer close(t.readerDone)
1198 // Check the validity of server preface.
1199 frame, err := t.framer.fr.ReadFrame()
1200 if err != nil {
1201 t.Close() // this kicks off resetTransport, so must be last before return
1202 return
1203 }
1204 t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
1205 if t.keepaliveEnabled {
1206 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1207 }
1208 sf, ok := frame.(*http2.SettingsFrame)
1209 if !ok {
1210 t.Close() // this kicks off resetTransport, so must be last before return
1211 return
1212 }
1213 t.onSuccess()
1214 t.handleSettings(sf, true)
1215
1216 // loop to keep reading incoming messages on this transport.
1217 for {
1218 frame, err := t.framer.fr.ReadFrame()
1219 if t.keepaliveEnabled {
1220 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1221 }
1222 if err != nil {
1223 // Abort an active stream if the http2.Framer returns a
1224 // http2.StreamError. This can happen only if the server's response
1225 // is malformed http2.
1226 if se, ok := err.(http2.StreamError); ok {
1227 t.mu.Lock()
1228 s := t.activeStreams[se.StreamID]
1229 t.mu.Unlock()
1230 if s != nil {
1231 // use error detail to provide better err message
1232 code := http2ErrConvTab[se.Code]
1233 msg := t.framer.fr.ErrorDetail().Error()
1234 t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
1235 }
1236 continue
1237 } else {
1238 // Transport error.
1239 t.Close()
1240 return
1241 }
1242 }
1243 switch frame := frame.(type) {
1244 case *http2.MetaHeadersFrame:
1245 t.operateHeaders(frame)
1246 case *http2.DataFrame:
1247 t.handleData(frame)
1248 case *http2.RSTStreamFrame:
1249 t.handleRSTStream(frame)
1250 case *http2.SettingsFrame:
1251 t.handleSettings(frame, false)
1252 case *http2.PingFrame:
1253 t.handlePing(frame)
1254 case *http2.GoAwayFrame:
1255 t.handleGoAway(frame)
1256 case *http2.WindowUpdateFrame:
1257 t.handleWindowUpdate(frame)
1258 default:
1259 errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1260 }
1261 }
1262}
1263
1264// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1265func (t *http2Client) keepalive() {
1266 p := &ping{data: [8]byte{}}
1267 timer := time.NewTimer(t.kp.Time)
1268 for {
1269 select {
1270 case <-timer.C:
1271 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1272 timer.Reset(t.kp.Time)
1273 continue
1274 }
1275 // Check if keepalive should go dormant.
1276 t.mu.Lock()
1277 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1278 // Make awakenKeepalive writable.
1279 <-t.awakenKeepalive
1280 t.mu.Unlock()
1281 select {
1282 case <-t.awakenKeepalive:
1283 // If the control gets here a ping has been sent
1284 // need to reset the timer with keepalive.Timeout.
1285 case <-t.ctx.Done():
1286 return
1287 }
1288 } else {
1289 t.mu.Unlock()
1290 if channelz.IsOn() {
1291 atomic.AddInt64(&t.czData.kpCount, 1)
1292 }
1293 // Send ping.
1294 t.controlBuf.put(p)
1295 }
1296
1297 // By the time control gets here a ping has been sent one way or the other.
1298 timer.Reset(t.kp.Timeout)
1299 select {
1300 case <-timer.C:
1301 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1302 timer.Reset(t.kp.Time)
1303 continue
1304 }
1305 t.Close()
1306 return
1307 case <-t.ctx.Done():
1308 if !timer.Stop() {
1309 <-timer.C
1310 }
1311 return
1312 }
1313 case <-t.ctx.Done():
1314 if !timer.Stop() {
1315 <-timer.C
1316 }
1317 return
1318 }
1319 }
1320}
1321
1322func (t *http2Client) Error() <-chan struct{} {
1323 return t.ctx.Done()
1324}
1325
1326func (t *http2Client) GoAway() <-chan struct{} {
1327 return t.goAway
1328}
1329
1330func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
1331 s := channelz.SocketInternalMetric{
1332 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1333 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1334 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1335 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1336 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1337 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1338 LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1339 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1340 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1341 LocalFlowControlWindow: int64(t.fc.getSize()),
1342 SocketOptions: channelz.GetSocketOption(t.conn),
1343 LocalAddr: t.localAddr,
1344 RemoteAddr: t.remoteAddr,
1345 // RemoteName :
1346 }
1347 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1348 s.Security = au.GetSecurityValue()
1349 }
1350 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1351 return &s
1352}
1353
1354func (t *http2Client) IncrMsgSent() {
1355 atomic.AddInt64(&t.czData.msgSent, 1)
1356 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1357}
1358
1359func (t *http2Client) IncrMsgRecv() {
1360 atomic.AddInt64(&t.czData.msgRecv, 1)
1361 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1362}
1363
1364func (t *http2Client) getOutFlowWindow() int64 {
1365 resp := make(chan uint32, 1)
1366 timer := time.NewTimer(time.Second)
1367 defer timer.Stop()
1368 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1369 select {
1370 case sz := <-resp:
1371 return int64(sz)
1372 case <-t.ctxDone:
1373 return -1
1374 case <-timer.C:
1375 return -2
1376 }
1377}