blob: c96178d7403dab6ff65ba06ed1ca3e5740250ea2 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "context"
23 "fmt"
24 "io"
25 "math"
26 "net"
27 "strconv"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "golang.org/x/net/http2"
34 "golang.org/x/net/http2/hpack"
35
36 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/internal/channelz"
39 "google.golang.org/grpc/internal/syscall"
40 "google.golang.org/grpc/keepalive"
41 "google.golang.org/grpc/metadata"
42 "google.golang.org/grpc/peer"
43 "google.golang.org/grpc/stats"
44 "google.golang.org/grpc/status"
45)
46
47// http2Client implements the ClientTransport interface with HTTP2.
48type http2Client struct {
49 ctx context.Context
50 cancel context.CancelFunc
51 ctxDone <-chan struct{} // Cache the ctx.Done() chan.
52 userAgent string
53 md interface{}
54 conn net.Conn // underlying communication channel
55 loopy *loopyWriter
56 remoteAddr net.Addr
57 localAddr net.Addr
58 authInfo credentials.AuthInfo // auth info about the connection
59
60 readerDone chan struct{} // sync point to enable testing.
61 writerDone chan struct{} // sync point to enable testing.
62 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
63 // that the server sent GoAway on this transport.
64 goAway chan struct{}
65 // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
66 awakenKeepalive chan struct{}
67
68 framer *framer
69 // controlBuf delivers all the control related tasks (e.g., window
70 // updates, reset streams, and various settings) to the controller.
71 controlBuf *controlBuffer
72 fc *trInFlow
73 // The scheme used: https if TLS is on, http otherwise.
74 scheme string
75
76 isSecure bool
77
78 perRPCCreds []credentials.PerRPCCredentials
79
80 // Boolean to keep track of reading activity on transport.
81 // 1 is true and 0 is false.
82 activity uint32 // Accessed atomically.
83 kp keepalive.ClientParameters
84 keepaliveEnabled bool
85
86 statsHandler stats.Handler
87
88 initialWindowSize int32
89
90 // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
91 maxSendHeaderListSize *uint32
92
93 bdpEst *bdpEstimator
94 // onPrefaceReceipt is a callback that client transport calls upon
95 // receiving server preface to signal that a succefull HTTP2
96 // connection was established.
97 onPrefaceReceipt func()
98
99 maxConcurrentStreams uint32
100 streamQuota int64
101 streamsQuotaAvailable chan struct{}
102 waitingStreams uint32
103 nextID uint32
104
105 mu sync.Mutex // guard the following variables
106 state transportState
107 activeStreams map[uint32]*Stream
108 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
109 prevGoAwayID uint32
110 // goAwayReason records the http2.ErrCode and debug data received with the
111 // GoAway frame.
112 goAwayReason GoAwayReason
113
114 // Fields below are for channelz metric collection.
115 channelzID int64 // channelz unique identification number
116 czData *channelzData
117
118 onGoAway func(GoAwayReason)
119 onClose func()
Abhilash S.L3b494632019-07-16 15:51:09 +0530120
121 bufferPool *bufferPool
William Kurkianea869482019-04-09 15:16:11 -0400122}
123
124func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
125 if fn != nil {
126 return fn(ctx, addr)
127 }
128 return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
129}
130
131func isTemporary(err error) bool {
132 switch err := err.(type) {
133 case interface {
134 Temporary() bool
135 }:
136 return err.Temporary()
137 case interface {
138 Timeout() bool
139 }:
140 // Timeouts may be resolved upon retry, and are thus treated as
141 // temporary.
142 return err.Timeout()
143 }
144 return true
145}
146
147// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
148// and starts to receive messages on it. Non-nil error returns if construction
149// fails.
150func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
151 scheme := "http"
152 ctx, cancel := context.WithCancel(ctx)
153 defer func() {
154 if err != nil {
155 cancel()
156 }
157 }()
158
159 conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
160 if err != nil {
161 if opts.FailOnNonTempDialError {
162 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
163 }
164 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
165 }
166 // Any further errors will close the underlying connection
167 defer func(conn net.Conn) {
168 if err != nil {
169 conn.Close()
170 }
171 }(conn)
172 kp := opts.KeepaliveParams
173 // Validate keepalive parameters.
174 if kp.Time == 0 {
175 kp.Time = defaultClientKeepaliveTime
176 }
177 if kp.Timeout == 0 {
178 kp.Timeout = defaultClientKeepaliveTimeout
179 }
180 keepaliveEnabled := false
181 if kp.Time != infinity {
182 if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
183 return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
184 }
185 keepaliveEnabled = true
186 }
187 var (
188 isSecure bool
189 authInfo credentials.AuthInfo
190 )
191 transportCreds := opts.TransportCredentials
192 perRPCCreds := opts.PerRPCCredentials
193
194 if b := opts.CredsBundle; b != nil {
195 if t := b.TransportCredentials(); t != nil {
196 transportCreds = t
197 }
198 if t := b.PerRPCCredentials(); t != nil {
199 perRPCCreds = append(perRPCCreds, t)
200 }
201 }
202 if transportCreds != nil {
203 scheme = "https"
204 conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
205 if err != nil {
206 return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
207 }
208 isSecure = true
209 }
210 dynamicWindow := true
211 icwz := int32(initialWindowSize)
212 if opts.InitialConnWindowSize >= defaultWindowSize {
213 icwz = opts.InitialConnWindowSize
214 dynamicWindow = false
215 }
216 writeBufSize := opts.WriteBufferSize
217 readBufSize := opts.ReadBufferSize
218 maxHeaderListSize := defaultClientMaxHeaderListSize
219 if opts.MaxHeaderListSize != nil {
220 maxHeaderListSize = *opts.MaxHeaderListSize
221 }
222 t := &http2Client{
223 ctx: ctx,
224 ctxDone: ctx.Done(), // Cache Done chan.
225 cancel: cancel,
226 userAgent: opts.UserAgent,
227 md: addr.Metadata,
228 conn: conn,
229 remoteAddr: conn.RemoteAddr(),
230 localAddr: conn.LocalAddr(),
231 authInfo: authInfo,
232 readerDone: make(chan struct{}),
233 writerDone: make(chan struct{}),
234 goAway: make(chan struct{}),
235 awakenKeepalive: make(chan struct{}, 1),
236 framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
237 fc: &trInFlow{limit: uint32(icwz)},
238 scheme: scheme,
239 activeStreams: make(map[uint32]*Stream),
240 isSecure: isSecure,
241 perRPCCreds: perRPCCreds,
242 kp: kp,
243 statsHandler: opts.StatsHandler,
244 initialWindowSize: initialWindowSize,
245 onPrefaceReceipt: onPrefaceReceipt,
246 nextID: 1,
247 maxConcurrentStreams: defaultMaxStreamsClient,
248 streamQuota: defaultMaxStreamsClient,
249 streamsQuotaAvailable: make(chan struct{}, 1),
250 czData: new(channelzData),
251 onGoAway: onGoAway,
252 onClose: onClose,
253 keepaliveEnabled: keepaliveEnabled,
Abhilash S.L3b494632019-07-16 15:51:09 +0530254 bufferPool: newBufferPool(),
William Kurkianea869482019-04-09 15:16:11 -0400255 }
256 t.controlBuf = newControlBuffer(t.ctxDone)
257 if opts.InitialWindowSize >= defaultWindowSize {
258 t.initialWindowSize = opts.InitialWindowSize
259 dynamicWindow = false
260 }
261 if dynamicWindow {
262 t.bdpEst = &bdpEstimator{
263 bdp: initialWindowSize,
264 updateFlowControl: t.updateFlowControl,
265 }
266 }
267 // Make sure awakenKeepalive can't be written upon.
268 // keepalive routine will make it writable, if need be.
269 t.awakenKeepalive <- struct{}{}
270 if t.statsHandler != nil {
271 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
272 RemoteAddr: t.remoteAddr,
273 LocalAddr: t.localAddr,
274 })
275 connBegin := &stats.ConnBegin{
276 Client: true,
277 }
278 t.statsHandler.HandleConn(t.ctx, connBegin)
279 }
280 if channelz.IsOn() {
281 t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
282 }
283 if t.keepaliveEnabled {
284 go t.keepalive()
285 }
286 // Start the reader goroutine for incoming message. Each transport has
287 // a dedicated goroutine which reads HTTP2 frame from network. Then it
288 // dispatches the frame to the corresponding stream entity.
289 go t.reader()
290
291 // Send connection preface to server.
292 n, err := t.conn.Write(clientPreface)
293 if err != nil {
294 t.Close()
295 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
296 }
297 if n != len(clientPreface) {
298 t.Close()
299 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
300 }
301 var ss []http2.Setting
302
303 if t.initialWindowSize != defaultWindowSize {
304 ss = append(ss, http2.Setting{
305 ID: http2.SettingInitialWindowSize,
306 Val: uint32(t.initialWindowSize),
307 })
308 }
309 if opts.MaxHeaderListSize != nil {
310 ss = append(ss, http2.Setting{
311 ID: http2.SettingMaxHeaderListSize,
312 Val: *opts.MaxHeaderListSize,
313 })
314 }
315 err = t.framer.fr.WriteSettings(ss...)
316 if err != nil {
317 t.Close()
318 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
319 }
320 // Adjust the connection flow control window if needed.
321 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
322 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
323 t.Close()
324 return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
325 }
326 }
327
328 if err := t.framer.writer.Flush(); err != nil {
329 return nil, err
330 }
331 go func() {
332 t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
333 err := t.loopy.run()
334 if err != nil {
335 errorf("transport: loopyWriter.run returning. Err: %v", err)
336 }
337 // If it's a connection error, let reader goroutine handle it
338 // since there might be data in the buffers.
339 if _, ok := err.(net.Error); !ok {
340 t.conn.Close()
341 }
342 close(t.writerDone)
343 }()
344 return t, nil
345}
346
347func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
348 // TODO(zhaoq): Handle uint32 overflow of Stream.id.
349 s := &Stream{
350 done: make(chan struct{}),
351 method: callHdr.Method,
352 sendCompress: callHdr.SendCompress,
353 buf: newRecvBuffer(),
354 headerChan: make(chan struct{}),
355 contentSubtype: callHdr.ContentSubtype,
356 }
357 s.wq = newWriteQuota(defaultWriteQuota, s.done)
358 s.requestRead = func(n int) {
359 t.adjustWindow(s, uint32(n))
360 }
361 // The client side stream context should have exactly the same life cycle with the user provided context.
362 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
363 // So we use the original context here instead of creating a copy.
364 s.ctx = ctx
365 s.trReader = &transportReader{
366 reader: &recvBufferReader{
367 ctx: s.ctx,
368 ctxDone: s.ctx.Done(),
369 recv: s.buf,
370 closeStream: func(err error) {
371 t.CloseStream(s, err)
372 },
Abhilash S.L3b494632019-07-16 15:51:09 +0530373 freeBuffer: t.bufferPool.put,
William Kurkianea869482019-04-09 15:16:11 -0400374 },
375 windowHandler: func(n int) {
376 t.updateWindow(s, uint32(n))
377 },
378 }
379 return s
380}
381
382func (t *http2Client) getPeer() *peer.Peer {
383 pr := &peer.Peer{
384 Addr: t.remoteAddr,
385 }
386 // Attach Auth info if there is any.
387 if t.authInfo != nil {
388 pr.AuthInfo = t.authInfo
389 }
390 return pr
391}
392
393func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
394 aud := t.createAudience(callHdr)
395 authData, err := t.getTrAuthData(ctx, aud)
396 if err != nil {
397 return nil, err
398 }
399 callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
400 if err != nil {
401 return nil, err
402 }
403 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
404 // first and create a slice of that exact size.
405 // Make the slice of certain predictable size to reduce allocations made by append.
406 hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
407 hfLen += len(authData) + len(callAuthData)
408 headerFields := make([]hpack.HeaderField, 0, hfLen)
409 headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
410 headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
411 headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
412 headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
413 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
414 headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
415 headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
416 if callHdr.PreviousAttempts > 0 {
417 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
418 }
419
420 if callHdr.SendCompress != "" {
421 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
422 }
423 if dl, ok := ctx.Deadline(); ok {
424 // Send out timeout regardless its value. The server can detect timeout context by itself.
425 // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
426 timeout := time.Until(dl)
427 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
428 }
429 for k, v := range authData {
430 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
431 }
432 for k, v := range callAuthData {
433 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
434 }
435 if b := stats.OutgoingTags(ctx); b != nil {
436 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
437 }
438 if b := stats.OutgoingTrace(ctx); b != nil {
439 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
440 }
441
442 if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
443 var k string
Abhilash S.L3b494632019-07-16 15:51:09 +0530444 for k, vv := range md {
445 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
446 if isReservedHeader(k) {
447 continue
448 }
449 for _, v := range vv {
450 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
451 }
452 }
William Kurkianea869482019-04-09 15:16:11 -0400453 for _, vv := range added {
454 for i, v := range vv {
455 if i%2 == 0 {
456 k = v
457 continue
458 }
459 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
460 if isReservedHeader(k) {
461 continue
462 }
463 headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
464 }
465 }
William Kurkianea869482019-04-09 15:16:11 -0400466 }
467 if md, ok := t.md.(*metadata.MD); ok {
468 for k, vv := range *md {
469 if isReservedHeader(k) {
470 continue
471 }
472 for _, v := range vv {
473 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
474 }
475 }
476 }
477 return headerFields, nil
478}
479
480func (t *http2Client) createAudience(callHdr *CallHdr) string {
481 // Create an audience string only if needed.
482 if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
483 return ""
484 }
485 // Construct URI required to get auth request metadata.
486 // Omit port if it is the default one.
487 host := strings.TrimSuffix(callHdr.Host, ":443")
488 pos := strings.LastIndex(callHdr.Method, "/")
489 if pos == -1 {
490 pos = len(callHdr.Method)
491 }
492 return "https://" + host + callHdr.Method[:pos]
493}
494
495func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
496 authData := map[string]string{}
497 for _, c := range t.perRPCCreds {
498 data, err := c.GetRequestMetadata(ctx, audience)
499 if err != nil {
500 if _, ok := status.FromError(err); ok {
501 return nil, err
502 }
503
504 return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
505 }
506 for k, v := range data {
507 // Capital header names are illegal in HTTP/2.
508 k = strings.ToLower(k)
509 authData[k] = v
510 }
511 }
512 return authData, nil
513}
514
515func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
516 callAuthData := map[string]string{}
517 // Check if credentials.PerRPCCredentials were provided via call options.
518 // Note: if these credentials are provided both via dial options and call
519 // options, then both sets of credentials will be applied.
520 if callCreds := callHdr.Creds; callCreds != nil {
521 if !t.isSecure && callCreds.RequireTransportSecurity() {
522 return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
523 }
524 data, err := callCreds.GetRequestMetadata(ctx, audience)
525 if err != nil {
526 return nil, status.Errorf(codes.Internal, "transport: %v", err)
527 }
528 for k, v := range data {
529 // Capital header names are illegal in HTTP/2
530 k = strings.ToLower(k)
531 callAuthData[k] = v
532 }
533 }
534 return callAuthData, nil
535}
536
537// NewStream creates a stream and registers it into the transport as "active"
538// streams.
539func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
540 ctx = peer.NewContext(ctx, t.getPeer())
541 headerFields, err := t.createHeaderFields(ctx, callHdr)
542 if err != nil {
543 return nil, err
544 }
545 s := t.newStream(ctx, callHdr)
546 cleanup := func(err error) {
547 if s.swapState(streamDone) == streamDone {
548 // If it was already done, return.
549 return
550 }
551 // The stream was unprocessed by the server.
552 atomic.StoreUint32(&s.unprocessed, 1)
553 s.write(recvMsg{err: err})
554 close(s.done)
555 // If headerChan isn't closed, then close it.
Abhilash S.L3b494632019-07-16 15:51:09 +0530556 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
William Kurkianea869482019-04-09 15:16:11 -0400557 close(s.headerChan)
558 }
559
560 }
561 hdr := &headerFrame{
562 hf: headerFields,
563 endStream: false,
564 initStream: func(id uint32) (bool, error) {
565 t.mu.Lock()
566 if state := t.state; state != reachable {
567 t.mu.Unlock()
568 // Do a quick cleanup.
569 err := error(errStreamDrain)
570 if state == closing {
571 err = ErrConnClosing
572 }
573 cleanup(err)
574 return false, err
575 }
576 t.activeStreams[id] = s
577 if channelz.IsOn() {
578 atomic.AddInt64(&t.czData.streamsStarted, 1)
579 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
580 }
581 var sendPing bool
582 // If the number of active streams change from 0 to 1, then check if keepalive
583 // has gone dormant. If so, wake it up.
584 if len(t.activeStreams) == 1 && t.keepaliveEnabled {
585 select {
586 case t.awakenKeepalive <- struct{}{}:
587 sendPing = true
588 // Fill the awakenKeepalive channel again as this channel must be
589 // kept non-writable except at the point that the keepalive()
590 // goroutine is waiting either to be awaken or shutdown.
591 t.awakenKeepalive <- struct{}{}
592 default:
593 }
594 }
595 t.mu.Unlock()
596 return sendPing, nil
597 },
598 onOrphaned: cleanup,
599 wq: s.wq,
600 }
601 firstTry := true
602 var ch chan struct{}
603 checkForStreamQuota := func(it interface{}) bool {
604 if t.streamQuota <= 0 { // Can go negative if server decreases it.
605 if firstTry {
606 t.waitingStreams++
607 }
608 ch = t.streamsQuotaAvailable
609 return false
610 }
611 if !firstTry {
612 t.waitingStreams--
613 }
614 t.streamQuota--
615 h := it.(*headerFrame)
616 h.streamID = t.nextID
617 t.nextID += 2
618 s.id = h.streamID
619 s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
620 if t.streamQuota > 0 && t.waitingStreams > 0 {
621 select {
622 case t.streamsQuotaAvailable <- struct{}{}:
623 default:
624 }
625 }
626 return true
627 }
628 var hdrListSizeErr error
629 checkForHeaderListSize := func(it interface{}) bool {
630 if t.maxSendHeaderListSize == nil {
631 return true
632 }
633 hdrFrame := it.(*headerFrame)
634 var sz int64
635 for _, f := range hdrFrame.hf {
636 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
637 hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
638 return false
639 }
640 }
641 return true
642 }
643 for {
644 success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
645 if !checkForStreamQuota(it) {
646 return false
647 }
648 if !checkForHeaderListSize(it) {
649 return false
650 }
651 return true
652 }, hdr)
653 if err != nil {
654 return nil, err
655 }
656 if success {
657 break
658 }
659 if hdrListSizeErr != nil {
660 return nil, hdrListSizeErr
661 }
662 firstTry = false
663 select {
664 case <-ch:
665 case <-s.ctx.Done():
666 return nil, ContextErr(s.ctx.Err())
667 case <-t.goAway:
668 return nil, errStreamDrain
669 case <-t.ctx.Done():
670 return nil, ErrConnClosing
671 }
672 }
673 if t.statsHandler != nil {
674 outHeader := &stats.OutHeader{
675 Client: true,
676 FullMethod: callHdr.Method,
677 RemoteAddr: t.remoteAddr,
678 LocalAddr: t.localAddr,
679 Compression: callHdr.SendCompress,
680 }
681 t.statsHandler.HandleRPC(s.ctx, outHeader)
682 }
683 return s, nil
684}
685
686// CloseStream clears the footprint of a stream when the stream is not needed any more.
687// This must not be executed in reader's goroutine.
688func (t *http2Client) CloseStream(s *Stream, err error) {
689 var (
690 rst bool
691 rstCode http2.ErrCode
692 )
693 if err != nil {
694 rst = true
695 rstCode = http2.ErrCodeCancel
696 }
697 t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
698}
699
700func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
701 // Set stream status to done.
702 if s.swapState(streamDone) == streamDone {
703 // If it was already done, return. If multiple closeStream calls
704 // happen simultaneously, wait for the first to finish.
705 <-s.done
706 return
707 }
708 // status and trailers can be updated here without any synchronization because the stream goroutine will
709 // only read it after it sees an io.EOF error from read or write and we'll write those errors
710 // only after updating this.
711 s.status = st
712 if len(mdata) > 0 {
713 s.trailer = mdata
714 }
715 if err != nil {
716 // This will unblock reads eventually.
717 s.write(recvMsg{err: err})
718 }
719 // If headerChan isn't closed, then close it.
Abhilash S.L3b494632019-07-16 15:51:09 +0530720 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
William Kurkianea869482019-04-09 15:16:11 -0400721 s.noHeaders = true
722 close(s.headerChan)
723 }
724 cleanup := &cleanupStream{
725 streamID: s.id,
726 onWrite: func() {
727 t.mu.Lock()
728 if t.activeStreams != nil {
729 delete(t.activeStreams, s.id)
730 }
731 t.mu.Unlock()
732 if channelz.IsOn() {
733 if eosReceived {
734 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
735 } else {
736 atomic.AddInt64(&t.czData.streamsFailed, 1)
737 }
738 }
739 },
740 rst: rst,
741 rstCode: rstCode,
742 }
743 addBackStreamQuota := func(interface{}) bool {
744 t.streamQuota++
745 if t.streamQuota > 0 && t.waitingStreams > 0 {
746 select {
747 case t.streamsQuotaAvailable <- struct{}{}:
748 default:
749 }
750 }
751 return true
752 }
753 t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
754 // This will unblock write.
755 close(s.done)
756}
757
758// Close kicks off the shutdown process of the transport. This should be called
759// only once on a transport. Once it is called, the transport should not be
760// accessed any more.
761//
762// This method blocks until the addrConn that initiated this transport is
763// re-connected. This happens because t.onClose() begins reconnect logic at the
764// addrConn level and blocks until the addrConn is successfully connected.
765func (t *http2Client) Close() error {
766 t.mu.Lock()
767 // Make sure we only Close once.
768 if t.state == closing {
769 t.mu.Unlock()
770 return nil
771 }
772 t.state = closing
773 streams := t.activeStreams
774 t.activeStreams = nil
775 t.mu.Unlock()
776 t.controlBuf.finish()
777 t.cancel()
778 err := t.conn.Close()
779 if channelz.IsOn() {
780 channelz.RemoveEntry(t.channelzID)
781 }
782 // Notify all active streams.
783 for _, s := range streams {
784 t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
785 }
786 if t.statsHandler != nil {
787 connEnd := &stats.ConnEnd{
788 Client: true,
789 }
790 t.statsHandler.HandleConn(t.ctx, connEnd)
791 }
792 t.onClose()
793 return err
794}
795
796// GracefulClose sets the state to draining, which prevents new streams from
797// being created and causes the transport to be closed when the last active
798// stream is closed. If there are no active streams, the transport is closed
799// immediately. This does nothing if the transport is already draining or
800// closing.
Abhilash S.L3b494632019-07-16 15:51:09 +0530801func (t *http2Client) GracefulClose() {
William Kurkianea869482019-04-09 15:16:11 -0400802 t.mu.Lock()
803 // Make sure we move to draining only from active.
804 if t.state == draining || t.state == closing {
805 t.mu.Unlock()
Abhilash S.L3b494632019-07-16 15:51:09 +0530806 return
William Kurkianea869482019-04-09 15:16:11 -0400807 }
808 t.state = draining
809 active := len(t.activeStreams)
810 t.mu.Unlock()
811 if active == 0 {
Abhilash S.L3b494632019-07-16 15:51:09 +0530812 t.Close()
813 return
William Kurkianea869482019-04-09 15:16:11 -0400814 }
815 t.controlBuf.put(&incomingGoAway{})
William Kurkianea869482019-04-09 15:16:11 -0400816}
817
818// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
819// should proceed only if Write returns nil.
820func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
821 if opts.Last {
822 // If it's the last message, update stream state.
823 if !s.compareAndSwapState(streamActive, streamWriteDone) {
824 return errStreamDone
825 }
826 } else if s.getState() != streamActive {
827 return errStreamDone
828 }
829 df := &dataFrame{
830 streamID: s.id,
831 endStream: opts.Last,
832 }
833 if hdr != nil || data != nil { // If it's not an empty data frame.
834 // Add some data to grpc message header so that we can equally
835 // distribute bytes across frames.
836 emptyLen := http2MaxFrameLen - len(hdr)
837 if emptyLen > len(data) {
838 emptyLen = len(data)
839 }
840 hdr = append(hdr, data[:emptyLen]...)
841 data = data[emptyLen:]
842 df.h, df.d = hdr, data
843 // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
844 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
845 return err
846 }
847 }
848 return t.controlBuf.put(df)
849}
850
851func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
852 t.mu.Lock()
853 defer t.mu.Unlock()
854 s, ok := t.activeStreams[f.Header().StreamID]
855 return s, ok
856}
857
858// adjustWindow sends out extra window update over the initial window size
859// of stream if the application is requesting data larger in size than
860// the window.
861func (t *http2Client) adjustWindow(s *Stream, n uint32) {
862 if w := s.fc.maybeAdjust(n); w > 0 {
863 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
864 }
865}
866
867// updateWindow adjusts the inbound quota for the stream.
868// Window updates will be sent out when the cumulative quota
869// exceeds the corresponding threshold.
870func (t *http2Client) updateWindow(s *Stream, n uint32) {
871 if w := s.fc.onRead(n); w > 0 {
872 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
873 }
874}
875
876// updateFlowControl updates the incoming flow control windows
877// for the transport and the stream based on the current bdp
878// estimation.
879func (t *http2Client) updateFlowControl(n uint32) {
880 t.mu.Lock()
881 for _, s := range t.activeStreams {
882 s.fc.newLimit(n)
883 }
884 t.mu.Unlock()
885 updateIWS := func(interface{}) bool {
886 t.initialWindowSize = int32(n)
887 return true
888 }
889 t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
890 t.controlBuf.put(&outgoingSettings{
891 ss: []http2.Setting{
892 {
893 ID: http2.SettingInitialWindowSize,
894 Val: n,
895 },
896 },
897 })
898}
899
900func (t *http2Client) handleData(f *http2.DataFrame) {
901 size := f.Header().Length
902 var sendBDPPing bool
903 if t.bdpEst != nil {
904 sendBDPPing = t.bdpEst.add(size)
905 }
906 // Decouple connection's flow control from application's read.
907 // An update on connection's flow control should not depend on
908 // whether user application has read the data or not. Such a
909 // restriction is already imposed on the stream's flow control,
910 // and therefore the sender will be blocked anyways.
911 // Decoupling the connection flow control will prevent other
912 // active(fast) streams from starving in presence of slow or
913 // inactive streams.
914 //
915 if w := t.fc.onData(size); w > 0 {
916 t.controlBuf.put(&outgoingWindowUpdate{
917 streamID: 0,
918 increment: w,
919 })
920 }
921 if sendBDPPing {
922 // Avoid excessive ping detection (e.g. in an L7 proxy)
923 // by sending a window update prior to the BDP ping.
924
925 if w := t.fc.reset(); w > 0 {
926 t.controlBuf.put(&outgoingWindowUpdate{
927 streamID: 0,
928 increment: w,
929 })
930 }
931
932 t.controlBuf.put(bdpPing)
933 }
934 // Select the right stream to dispatch.
935 s, ok := t.getStream(f)
936 if !ok {
937 return
938 }
939 if size > 0 {
940 if err := s.fc.onData(size); err != nil {
941 t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
942 return
943 }
944 if f.Header().Flags.Has(http2.FlagDataPadded) {
945 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
946 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
947 }
948 }
949 // TODO(bradfitz, zhaoq): A copy is required here because there is no
950 // guarantee f.Data() is consumed before the arrival of next frame.
951 // Can this copy be eliminated?
952 if len(f.Data()) > 0 {
Abhilash S.L3b494632019-07-16 15:51:09 +0530953 buffer := t.bufferPool.get()
954 buffer.Reset()
955 buffer.Write(f.Data())
956 s.write(recvMsg{buffer: buffer})
William Kurkianea869482019-04-09 15:16:11 -0400957 }
958 }
959 // The server has closed the stream without sending trailers. Record that
960 // the read direction is closed, and set the status appropriately.
961 if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
962 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
963 }
964}
965
966func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
967 s, ok := t.getStream(f)
968 if !ok {
969 return
970 }
971 if f.ErrCode == http2.ErrCodeRefusedStream {
972 // The stream was unprocessed by the server.
973 atomic.StoreUint32(&s.unprocessed, 1)
974 }
975 statusCode, ok := http2ErrConvTab[f.ErrCode]
976 if !ok {
977 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
978 statusCode = codes.Unknown
979 }
980 if statusCode == codes.Canceled {
981 // Our deadline was already exceeded, and that was likely the cause of
982 // this cancelation. Alter the status code accordingly.
983 if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
984 statusCode = codes.DeadlineExceeded
985 }
986 }
987 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
988}
989
990func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
991 if f.IsAck() {
992 return
993 }
994 var maxStreams *uint32
995 var ss []http2.Setting
996 var updateFuncs []func()
997 f.ForeachSetting(func(s http2.Setting) error {
998 switch s.ID {
999 case http2.SettingMaxConcurrentStreams:
1000 maxStreams = new(uint32)
1001 *maxStreams = s.Val
1002 case http2.SettingMaxHeaderListSize:
1003 updateFuncs = append(updateFuncs, func() {
1004 t.maxSendHeaderListSize = new(uint32)
1005 *t.maxSendHeaderListSize = s.Val
1006 })
1007 default:
1008 ss = append(ss, s)
1009 }
1010 return nil
1011 })
1012 if isFirst && maxStreams == nil {
1013 maxStreams = new(uint32)
1014 *maxStreams = math.MaxUint32
1015 }
1016 sf := &incomingSettings{
1017 ss: ss,
1018 }
1019 if maxStreams != nil {
1020 updateStreamQuota := func() {
1021 delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
1022 t.maxConcurrentStreams = *maxStreams
1023 t.streamQuota += delta
1024 if delta > 0 && t.waitingStreams > 0 {
1025 close(t.streamsQuotaAvailable) // wake all of them up.
1026 t.streamsQuotaAvailable = make(chan struct{}, 1)
1027 }
1028 }
1029 updateFuncs = append(updateFuncs, updateStreamQuota)
1030 }
1031 t.controlBuf.executeAndPut(func(interface{}) bool {
1032 for _, f := range updateFuncs {
1033 f()
1034 }
1035 return true
1036 }, sf)
1037}
1038
1039func (t *http2Client) handlePing(f *http2.PingFrame) {
1040 if f.IsAck() {
1041 // Maybe it's a BDP ping.
1042 if t.bdpEst != nil {
1043 t.bdpEst.calculate(f.Data)
1044 }
1045 return
1046 }
1047 pingAck := &ping{ack: true}
1048 copy(pingAck.data[:], f.Data[:])
1049 t.controlBuf.put(pingAck)
1050}
1051
1052func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
1053 t.mu.Lock()
1054 if t.state == closing {
1055 t.mu.Unlock()
1056 return
1057 }
1058 if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
1059 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
1060 }
1061 id := f.LastStreamID
1062 if id > 0 && id%2 != 1 {
1063 t.mu.Unlock()
1064 t.Close()
1065 return
1066 }
1067 // A client can receive multiple GoAways from the server (see
1068 // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
1069 // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
1070 // sent after an RTT delay with the ID of the last stream the server will
1071 // process.
1072 //
1073 // Therefore, when we get the first GoAway we don't necessarily close any
1074 // streams. While in case of second GoAway we close all streams created after
1075 // the GoAwayId. This way streams that were in-flight while the GoAway from
1076 // server was being sent don't get killed.
1077 select {
1078 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1079 // If there are multiple GoAways the first one should always have an ID greater than the following ones.
1080 if id > t.prevGoAwayID {
1081 t.mu.Unlock()
1082 t.Close()
1083 return
1084 }
1085 default:
1086 t.setGoAwayReason(f)
1087 close(t.goAway)
1088 t.state = draining
1089 t.controlBuf.put(&incomingGoAway{})
1090
1091 // This has to be a new goroutine because we're still using the current goroutine to read in the transport.
1092 t.onGoAway(t.goAwayReason)
1093 }
1094 // All streams with IDs greater than the GoAwayId
1095 // and smaller than the previous GoAway ID should be killed.
1096 upperLimit := t.prevGoAwayID
1097 if upperLimit == 0 { // This is the first GoAway Frame.
1098 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1099 }
1100 for streamID, stream := range t.activeStreams {
1101 if streamID > id && streamID <= upperLimit {
1102 // The stream was unprocessed by the server.
1103 atomic.StoreUint32(&stream.unprocessed, 1)
1104 t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1105 }
1106 }
1107 t.prevGoAwayID = id
1108 active := len(t.activeStreams)
1109 t.mu.Unlock()
1110 if active == 0 {
1111 t.Close()
1112 }
1113}
1114
1115// setGoAwayReason sets the value of t.goAwayReason based
1116// on the GoAway frame received.
1117// It expects a lock on transport's mutext to be held by
1118// the caller.
1119func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1120 t.goAwayReason = GoAwayNoReason
1121 switch f.ErrCode {
1122 case http2.ErrCodeEnhanceYourCalm:
1123 if string(f.DebugData()) == "too_many_pings" {
1124 t.goAwayReason = GoAwayTooManyPings
1125 }
1126 }
1127}
1128
1129func (t *http2Client) GetGoAwayReason() GoAwayReason {
1130 t.mu.Lock()
1131 defer t.mu.Unlock()
1132 return t.goAwayReason
1133}
1134
1135func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1136 t.controlBuf.put(&incomingWindowUpdate{
1137 streamID: f.Header().StreamID,
1138 increment: f.Increment,
1139 })
1140}
1141
1142// operateHeaders takes action on the decoded headers.
1143func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1144 s, ok := t.getStream(frame)
1145 if !ok {
1146 return
1147 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301148 endStream := frame.StreamEnded()
William Kurkianea869482019-04-09 15:16:11 -04001149 atomic.StoreUint32(&s.bytesReceived, 1)
Abhilash S.L3b494632019-07-16 15:51:09 +05301150 initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
1151
1152 if !initialHeader && !endStream {
1153 // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
1154 st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
1155 t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
William Kurkianea869482019-04-09 15:16:11 -04001156 return
1157 }
1158
Abhilash S.L3b494632019-07-16 15:51:09 +05301159 state := &decodeState{}
1160 // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
1161 state.data.isGRPC = !initialHeader
1162 if err := state.decodeHeader(frame); err != nil {
1163 t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
1164 return
1165 }
1166
1167 isHeader := false
William Kurkianea869482019-04-09 15:16:11 -04001168 defer func() {
1169 if t.statsHandler != nil {
1170 if isHeader {
1171 inHeader := &stats.InHeader{
1172 Client: true,
1173 WireLength: int(frame.Header().Length),
1174 }
1175 t.statsHandler.HandleRPC(s.ctx, inHeader)
1176 } else {
1177 inTrailer := &stats.InTrailer{
1178 Client: true,
1179 WireLength: int(frame.Header().Length),
1180 }
1181 t.statsHandler.HandleRPC(s.ctx, inTrailer)
1182 }
1183 }
1184 }()
Abhilash S.L3b494632019-07-16 15:51:09 +05301185
1186 // If headerChan hasn't been closed yet
1187 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
William Kurkianea869482019-04-09 15:16:11 -04001188 if !endStream {
Abhilash S.L3b494632019-07-16 15:51:09 +05301189 // HEADERS frame block carries a Response-Headers.
William Kurkianea869482019-04-09 15:16:11 -04001190 isHeader = true
1191 // These values can be set without any synchronization because
1192 // stream goroutine will read it only after seeing a closed
1193 // headerChan which we'll close after setting this.
Abhilash S.L3b494632019-07-16 15:51:09 +05301194 s.recvCompress = state.data.encoding
1195 if len(state.data.mdata) > 0 {
1196 s.header = state.data.mdata
William Kurkianea869482019-04-09 15:16:11 -04001197 }
1198 } else {
Abhilash S.L3b494632019-07-16 15:51:09 +05301199 // HEADERS frame block carries a Trailers-Only.
William Kurkianea869482019-04-09 15:16:11 -04001200 s.noHeaders = true
1201 }
1202 close(s.headerChan)
1203 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301204
William Kurkianea869482019-04-09 15:16:11 -04001205 if !endStream {
1206 return
1207 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301208
William Kurkianea869482019-04-09 15:16:11 -04001209 // if client received END_STREAM from server while stream was still active, send RST_STREAM
1210 rst := s.getState() == streamActive
Abhilash S.L3b494632019-07-16 15:51:09 +05301211 t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
William Kurkianea869482019-04-09 15:16:11 -04001212}
1213
1214// reader runs as a separate goroutine in charge of reading data from network
1215// connection.
1216//
1217// TODO(zhaoq): currently one reader per transport. Investigate whether this is
1218// optimal.
1219// TODO(zhaoq): Check the validity of the incoming frame sequence.
1220func (t *http2Client) reader() {
1221 defer close(t.readerDone)
1222 // Check the validity of server preface.
1223 frame, err := t.framer.fr.ReadFrame()
1224 if err != nil {
1225 t.Close() // this kicks off resetTransport, so must be last before return
1226 return
1227 }
1228 t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
1229 if t.keepaliveEnabled {
1230 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1231 }
1232 sf, ok := frame.(*http2.SettingsFrame)
1233 if !ok {
1234 t.Close() // this kicks off resetTransport, so must be last before return
1235 return
1236 }
1237 t.onPrefaceReceipt()
1238 t.handleSettings(sf, true)
1239
1240 // loop to keep reading incoming messages on this transport.
1241 for {
1242 frame, err := t.framer.fr.ReadFrame()
1243 if t.keepaliveEnabled {
1244 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1245 }
1246 if err != nil {
1247 // Abort an active stream if the http2.Framer returns a
1248 // http2.StreamError. This can happen only if the server's response
1249 // is malformed http2.
1250 if se, ok := err.(http2.StreamError); ok {
1251 t.mu.Lock()
1252 s := t.activeStreams[se.StreamID]
1253 t.mu.Unlock()
1254 if s != nil {
1255 // use error detail to provide better err message
1256 code := http2ErrConvTab[se.Code]
1257 msg := t.framer.fr.ErrorDetail().Error()
1258 t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
1259 }
1260 continue
1261 } else {
1262 // Transport error.
1263 t.Close()
1264 return
1265 }
1266 }
1267 switch frame := frame.(type) {
1268 case *http2.MetaHeadersFrame:
1269 t.operateHeaders(frame)
1270 case *http2.DataFrame:
1271 t.handleData(frame)
1272 case *http2.RSTStreamFrame:
1273 t.handleRSTStream(frame)
1274 case *http2.SettingsFrame:
1275 t.handleSettings(frame, false)
1276 case *http2.PingFrame:
1277 t.handlePing(frame)
1278 case *http2.GoAwayFrame:
1279 t.handleGoAway(frame)
1280 case *http2.WindowUpdateFrame:
1281 t.handleWindowUpdate(frame)
1282 default:
1283 errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1284 }
1285 }
1286}
1287
1288// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1289func (t *http2Client) keepalive() {
1290 p := &ping{data: [8]byte{}}
1291 timer := time.NewTimer(t.kp.Time)
1292 for {
1293 select {
1294 case <-timer.C:
1295 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1296 timer.Reset(t.kp.Time)
1297 continue
1298 }
1299 // Check if keepalive should go dormant.
1300 t.mu.Lock()
1301 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1302 // Make awakenKeepalive writable.
1303 <-t.awakenKeepalive
1304 t.mu.Unlock()
1305 select {
1306 case <-t.awakenKeepalive:
1307 // If the control gets here a ping has been sent
1308 // need to reset the timer with keepalive.Timeout.
1309 case <-t.ctx.Done():
1310 return
1311 }
1312 } else {
1313 t.mu.Unlock()
1314 if channelz.IsOn() {
1315 atomic.AddInt64(&t.czData.kpCount, 1)
1316 }
1317 // Send ping.
1318 t.controlBuf.put(p)
1319 }
1320
1321 // By the time control gets here a ping has been sent one way or the other.
1322 timer.Reset(t.kp.Timeout)
1323 select {
1324 case <-timer.C:
1325 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1326 timer.Reset(t.kp.Time)
1327 continue
1328 }
1329 t.Close()
1330 return
1331 case <-t.ctx.Done():
1332 if !timer.Stop() {
1333 <-timer.C
1334 }
1335 return
1336 }
1337 case <-t.ctx.Done():
1338 if !timer.Stop() {
1339 <-timer.C
1340 }
1341 return
1342 }
1343 }
1344}
1345
1346func (t *http2Client) Error() <-chan struct{} {
1347 return t.ctx.Done()
1348}
1349
1350func (t *http2Client) GoAway() <-chan struct{} {
1351 return t.goAway
1352}
1353
1354func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
1355 s := channelz.SocketInternalMetric{
1356 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1357 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1358 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1359 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1360 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1361 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1362 LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1363 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1364 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1365 LocalFlowControlWindow: int64(t.fc.getSize()),
1366 SocketOptions: channelz.GetSocketOption(t.conn),
1367 LocalAddr: t.localAddr,
1368 RemoteAddr: t.remoteAddr,
1369 // RemoteName :
1370 }
1371 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1372 s.Security = au.GetSecurityValue()
1373 }
1374 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1375 return &s
1376}
1377
Abhilash S.L3b494632019-07-16 15:51:09 +05301378func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
1379
William Kurkianea869482019-04-09 15:16:11 -04001380func (t *http2Client) IncrMsgSent() {
1381 atomic.AddInt64(&t.czData.msgSent, 1)
1382 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1383}
1384
1385func (t *http2Client) IncrMsgRecv() {
1386 atomic.AddInt64(&t.czData.msgRecv, 1)
1387 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1388}
1389
1390func (t *http2Client) getOutFlowWindow() int64 {
1391 resp := make(chan uint32, 1)
1392 timer := time.NewTimer(time.Second)
1393 defer timer.Stop()
1394 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1395 select {
1396 case sz := <-resp:
1397 return int64(sz)
1398 case <-t.ctxDone:
1399 return -1
1400 case <-timer.C:
1401 return -2
1402 }
1403}