blob: 19c13e041d3b7b08fcf45ad2695dfa02b8bcf64b [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "net/http"
30 "strconv"
31 "sync"
32 "sync/atomic"
33 "time"
34
35 "github.com/golang/protobuf/proto"
36 "golang.org/x/net/http2"
37 "golang.org/x/net/http2/hpack"
38 "google.golang.org/grpc/internal/grpcutil"
39
40 "google.golang.org/grpc/codes"
41 "google.golang.org/grpc/credentials"
42 "google.golang.org/grpc/internal/channelz"
43 "google.golang.org/grpc/internal/grpcrand"
44 "google.golang.org/grpc/keepalive"
45 "google.golang.org/grpc/metadata"
46 "google.golang.org/grpc/peer"
47 "google.golang.org/grpc/stats"
48 "google.golang.org/grpc/status"
49 "google.golang.org/grpc/tap"
50)
51
52var (
53 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
54 // the stream's state.
55 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
56 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
57 // than the limit set by peer.
58 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
59)
60
61// serverConnectionCounter counts the number of connections a server has seen
62// (equal to the number of http2Servers created). Must be accessed atomically.
63var serverConnectionCounter uint64
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67 lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
68 ctx context.Context
69 done chan struct{}
70 conn net.Conn
71 loopy *loopyWriter
72 readerDone chan struct{} // sync point to enable testing.
73 writerDone chan struct{} // sync point to enable testing.
74 remoteAddr net.Addr
75 localAddr net.Addr
76 maxStreamID uint32 // max stream ID ever seen
77 authInfo credentials.AuthInfo // auth info about the connection
78 inTapHandle tap.ServerInHandle
79 framer *framer
80 // The max number of concurrent streams.
81 maxStreams uint32
82 // controlBuf delivers all the control related tasks (e.g., window
83 // updates, reset streams, and various settings) to the controller.
84 controlBuf *controlBuffer
85 fc *trInFlow
86 stats stats.Handler
87 // Keepalive and max-age parameters for the server.
88 kp keepalive.ServerParameters
89 // Keepalive enforcement policy.
90 kep keepalive.EnforcementPolicy
91 // The time instance last ping was received.
92 lastPingAt time.Time
93 // Number of times the client has violated keepalive ping policy so far.
94 pingStrikes uint8
95 // Flag to signify that number of ping strikes should be reset to 0.
96 // This is set whenever data or header frames are sent.
97 // 1 means yes.
98 resetPingStrikes uint32 // Accessed atomically.
99 initialWindowSize int32
100 bdpEst *bdpEstimator
101 maxSendHeaderListSize *uint32
102
103 mu sync.Mutex // guard the following
104
105 // drainChan is initialized when Drain() is called the first time.
106 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
107 // Then an independent goroutine will be launched to later send the second GoAway.
108 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
109 // Thus call to Drain() will be a no-op if drainChan is already initialized since draining is
110 // already underway.
111 drainChan chan struct{}
112 state transportState
113 activeStreams map[uint32]*Stream
114 // idle is the time instant when the connection went idle.
115 // This is either the beginning of the connection or when the number of
116 // RPCs go down to 0.
117 // When the connection is busy, this value is set to 0.
118 idle time.Time
119
120 // Fields below are for channelz metric collection.
121 channelzID int64 // channelz unique identification number
122 czData *channelzData
123 bufferPool *bufferPool
124
125 connectionID uint64
126}
127
128// NewServerTransport creates a http2 transport with conn and configuration
129// options from config.
130//
131// It returns a non-nil transport and a nil error on success. On failure, it
132// returns a non-nil transport and a nil-error. For a special case where the
133// underlying conn gets closed before the client preface could be read, it
134// returns a nil transport and a nil error.
135func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
136 var authInfo credentials.AuthInfo
137 rawConn := conn
138 if config.Credentials != nil {
139 var err error
140 conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
141 if err != nil {
142 // ErrConnDispatched means that the connection was dispatched away
143 // from gRPC; those connections should be left open. io.EOF means
144 // the connection was closed before handshaking completed, which can
145 // happen naturally from probers. Return these errors directly.
146 if err == credentials.ErrConnDispatched || err == io.EOF {
147 return nil, err
148 }
149 return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
150 }
151 }
152 writeBufSize := config.WriteBufferSize
153 readBufSize := config.ReadBufferSize
154 maxHeaderListSize := defaultServerMaxHeaderListSize
155 if config.MaxHeaderListSize != nil {
156 maxHeaderListSize = *config.MaxHeaderListSize
157 }
158 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
159 // Send initial settings as connection preface to client.
160 isettings := []http2.Setting{{
161 ID: http2.SettingMaxFrameSize,
162 Val: http2MaxFrameLen,
163 }}
164 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
165 // permitted in the HTTP2 spec.
166 maxStreams := config.MaxStreams
167 if maxStreams == 0 {
168 maxStreams = math.MaxUint32
169 } else {
170 isettings = append(isettings, http2.Setting{
171 ID: http2.SettingMaxConcurrentStreams,
172 Val: maxStreams,
173 })
174 }
175 dynamicWindow := true
176 iwz := int32(initialWindowSize)
177 if config.InitialWindowSize >= defaultWindowSize {
178 iwz = config.InitialWindowSize
179 dynamicWindow = false
180 }
181 icwz := int32(initialWindowSize)
182 if config.InitialConnWindowSize >= defaultWindowSize {
183 icwz = config.InitialConnWindowSize
184 dynamicWindow = false
185 }
186 if iwz != defaultWindowSize {
187 isettings = append(isettings, http2.Setting{
188 ID: http2.SettingInitialWindowSize,
189 Val: uint32(iwz)})
190 }
191 if config.MaxHeaderListSize != nil {
192 isettings = append(isettings, http2.Setting{
193 ID: http2.SettingMaxHeaderListSize,
194 Val: *config.MaxHeaderListSize,
195 })
196 }
197 if config.HeaderTableSize != nil {
198 isettings = append(isettings, http2.Setting{
199 ID: http2.SettingHeaderTableSize,
200 Val: *config.HeaderTableSize,
201 })
202 }
203 if err := framer.fr.WriteSettings(isettings...); err != nil {
204 return nil, connectionErrorf(false, err, "transport: %v", err)
205 }
206 // Adjust the connection flow control window if needed.
207 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
208 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
209 return nil, connectionErrorf(false, err, "transport: %v", err)
210 }
211 }
212 kp := config.KeepaliveParams
213 if kp.MaxConnectionIdle == 0 {
214 kp.MaxConnectionIdle = defaultMaxConnectionIdle
215 }
216 if kp.MaxConnectionAge == 0 {
217 kp.MaxConnectionAge = defaultMaxConnectionAge
218 }
219 // Add a jitter to MaxConnectionAge.
220 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
221 if kp.MaxConnectionAgeGrace == 0 {
222 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
223 }
224 if kp.Time == 0 {
225 kp.Time = defaultServerKeepaliveTime
226 }
227 if kp.Timeout == 0 {
228 kp.Timeout = defaultServerKeepaliveTimeout
229 }
230 kep := config.KeepalivePolicy
231 if kep.MinTime == 0 {
232 kep.MinTime = defaultKeepalivePolicyMinTime
233 }
234
235 done := make(chan struct{})
236 t := &http2Server{
237 ctx: setConnection(context.Background(), rawConn),
238 done: done,
239 conn: conn,
240 remoteAddr: conn.RemoteAddr(),
241 localAddr: conn.LocalAddr(),
242 authInfo: authInfo,
243 framer: framer,
244 readerDone: make(chan struct{}),
245 writerDone: make(chan struct{}),
246 maxStreams: maxStreams,
247 inTapHandle: config.InTapHandle,
248 fc: &trInFlow{limit: uint32(icwz)},
249 state: reachable,
250 activeStreams: make(map[uint32]*Stream),
251 stats: config.StatsHandler,
252 kp: kp,
253 idle: time.Now(),
254 kep: kep,
255 initialWindowSize: iwz,
256 czData: new(channelzData),
257 bufferPool: newBufferPool(),
258 }
259 t.controlBuf = newControlBuffer(t.done)
260 if dynamicWindow {
261 t.bdpEst = &bdpEstimator{
262 bdp: initialWindowSize,
263 updateFlowControl: t.updateFlowControl,
264 }
265 }
266 if t.stats != nil {
267 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
268 RemoteAddr: t.remoteAddr,
269 LocalAddr: t.localAddr,
270 })
271 connBegin := &stats.ConnBegin{}
272 t.stats.HandleConn(t.ctx, connBegin)
273 }
274 if channelz.IsOn() {
275 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
276 }
277
278 t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
279
280 t.framer.writer.Flush()
281
282 defer func() {
283 if err != nil {
284 t.Close()
285 }
286 }()
287
288 // Check the validity of client preface.
289 preface := make([]byte, len(clientPreface))
290 if _, err := io.ReadFull(t.conn, preface); err != nil {
291 // In deployments where a gRPC server runs behind a cloud load balancer
292 // which performs regular TCP level health checks, the connection is
293 // closed immediately by the latter. Skipping the error here will help
294 // reduce log clutter.
295 if err == io.EOF {
296 return nil, nil
297 }
298 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
299 }
300 if !bytes.Equal(preface, clientPreface) {
301 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
302 }
303
304 frame, err := t.framer.fr.ReadFrame()
305 if err == io.EOF || err == io.ErrUnexpectedEOF {
306 return nil, err
307 }
308 if err != nil {
309 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
310 }
311 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
312 sf, ok := frame.(*http2.SettingsFrame)
313 if !ok {
314 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
315 }
316 t.handleSettings(sf)
317
318 go func() {
319 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
320 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
321 if err := t.loopy.run(); err != nil {
322 if logger.V(logLevel) {
323 logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
324 }
325 }
326 t.conn.Close()
327 t.controlBuf.finish()
328 close(t.writerDone)
329 }()
330 go t.keepalive()
331 return t, nil
332}
333
334// operateHeader takes action on the decoded headers.
335func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
336 streamID := frame.Header().StreamID
337
338 // frame.Truncated is set to true when framer detects that the current header
339 // list size hits MaxHeaderListSize limit.
340 if frame.Truncated {
341 t.controlBuf.put(&cleanupStream{
342 streamID: streamID,
343 rst: true,
344 rstCode: http2.ErrCodeFrameSize,
345 onWrite: func() {},
346 })
347 return false
348 }
349
350 buf := newRecvBuffer()
351 s := &Stream{
352 id: streamID,
353 st: t,
354 buf: buf,
355 fc: &inFlow{limit: uint32(t.initialWindowSize)},
356 }
357
358 var (
359 // If a gRPC Response-Headers has already been received, then it means
360 // that the peer is speaking gRPC and we are in gRPC mode.
361 isGRPC = false
362 mdata = make(map[string][]string)
363 httpMethod string
364 // headerError is set if an error is encountered while parsing the headers
365 headerError bool
366
367 timeoutSet bool
368 timeout time.Duration
369 )
370
371 for _, hf := range frame.Fields {
372 switch hf.Name {
373 case "content-type":
374 contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
375 if !validContentType {
376 break
377 }
378 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
379 s.contentSubtype = contentSubtype
380 isGRPC = true
381 case "grpc-encoding":
382 s.recvCompress = hf.Value
383 case ":method":
384 httpMethod = hf.Value
385 case ":path":
386 s.method = hf.Value
387 case "grpc-timeout":
388 timeoutSet = true
389 var err error
390 if timeout, err = decodeTimeout(hf.Value); err != nil {
391 headerError = true
392 }
393 default:
394 if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
395 break
396 }
397 v, err := decodeMetadataHeader(hf.Name, hf.Value)
398 if err != nil {
399 headerError = true
400 logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
401 break
402 }
403 mdata[hf.Name] = append(mdata[hf.Name], v)
404 }
405 }
406
407 if !isGRPC || headerError {
408 t.controlBuf.put(&cleanupStream{
409 streamID: streamID,
410 rst: true,
411 rstCode: http2.ErrCodeProtocol,
412 onWrite: func() {},
413 })
414 return false
415 }
416
417 if frame.StreamEnded() {
418 // s is just created by the caller. No lock needed.
419 s.state = streamReadDone
420 }
421 if timeoutSet {
422 s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
423 } else {
424 s.ctx, s.cancel = context.WithCancel(t.ctx)
425 }
426 pr := &peer.Peer{
427 Addr: t.remoteAddr,
428 }
429 // Attach Auth info if there is any.
430 if t.authInfo != nil {
431 pr.AuthInfo = t.authInfo
432 }
433 s.ctx = peer.NewContext(s.ctx, pr)
434 // Attach the received metadata to the context.
435 if len(mdata) > 0 {
436 s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
437 if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
438 s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
439 }
440 if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
441 s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
442 }
443 }
444 t.mu.Lock()
445 if t.state != reachable {
446 t.mu.Unlock()
447 s.cancel()
448 return false
449 }
450 if uint32(len(t.activeStreams)) >= t.maxStreams {
451 t.mu.Unlock()
452 t.controlBuf.put(&cleanupStream{
453 streamID: streamID,
454 rst: true,
455 rstCode: http2.ErrCodeRefusedStream,
456 onWrite: func() {},
457 })
458 s.cancel()
459 return false
460 }
461 if streamID%2 != 1 || streamID <= t.maxStreamID {
462 t.mu.Unlock()
463 // illegal gRPC stream id.
464 if logger.V(logLevel) {
465 logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
466 }
467 s.cancel()
468 return true
469 }
470 t.maxStreamID = streamID
471 if httpMethod != http.MethodPost {
472 t.mu.Unlock()
473 if logger.V(logLevel) {
474 logger.Infof("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
475 }
476 t.controlBuf.put(&cleanupStream{
477 streamID: streamID,
478 rst: true,
479 rstCode: http2.ErrCodeProtocol,
480 onWrite: func() {},
481 })
482 s.cancel()
483 return false
484 }
485 if t.inTapHandle != nil {
486 var err error
487 if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
488 t.mu.Unlock()
489 if logger.V(logLevel) {
490 logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
491 }
492 stat, ok := status.FromError(err)
493 if !ok {
494 stat = status.New(codes.PermissionDenied, err.Error())
495 }
496 t.controlBuf.put(&earlyAbortStream{
497 streamID: s.id,
498 contentSubtype: s.contentSubtype,
499 status: stat,
500 })
501 return false
502 }
503 }
504 t.activeStreams[streamID] = s
505 if len(t.activeStreams) == 1 {
506 t.idle = time.Time{}
507 }
508 t.mu.Unlock()
509 if channelz.IsOn() {
510 atomic.AddInt64(&t.czData.streamsStarted, 1)
511 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
512 }
513 s.requestRead = func(n int) {
514 t.adjustWindow(s, uint32(n))
515 }
516 s.ctx = traceCtx(s.ctx, s.method)
517 if t.stats != nil {
518 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
519 inHeader := &stats.InHeader{
520 FullMethod: s.method,
521 RemoteAddr: t.remoteAddr,
522 LocalAddr: t.localAddr,
523 Compression: s.recvCompress,
524 WireLength: int(frame.Header().Length),
525 Header: metadata.MD(mdata).Copy(),
526 }
527 t.stats.HandleRPC(s.ctx, inHeader)
528 }
529 s.ctxDone = s.ctx.Done()
530 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
531 s.trReader = &transportReader{
532 reader: &recvBufferReader{
533 ctx: s.ctx,
534 ctxDone: s.ctxDone,
535 recv: s.buf,
536 freeBuffer: t.bufferPool.put,
537 },
538 windowHandler: func(n int) {
539 t.updateWindow(s, uint32(n))
540 },
541 }
542 // Register the stream with loopy.
543 t.controlBuf.put(&registerStream{
544 streamID: s.id,
545 wq: s.wq,
546 })
547 handle(s)
548 return false
549}
550
551// HandleStreams receives incoming streams using the given handler. This is
552// typically run in a separate goroutine.
553// traceCtx attaches trace to ctx and returns the new context.
554func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
555 defer close(t.readerDone)
556 for {
557 t.controlBuf.throttle()
558 frame, err := t.framer.fr.ReadFrame()
559 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
560 if err != nil {
561 if se, ok := err.(http2.StreamError); ok {
562 if logger.V(logLevel) {
563 logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
564 }
565 t.mu.Lock()
566 s := t.activeStreams[se.StreamID]
567 t.mu.Unlock()
568 if s != nil {
569 t.closeStream(s, true, se.Code, false)
570 } else {
571 t.controlBuf.put(&cleanupStream{
572 streamID: se.StreamID,
573 rst: true,
574 rstCode: se.Code,
575 onWrite: func() {},
576 })
577 }
578 continue
579 }
580 if err == io.EOF || err == io.ErrUnexpectedEOF {
581 t.Close()
582 return
583 }
584 if logger.V(logLevel) {
585 logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
586 }
587 t.Close()
588 return
589 }
590 switch frame := frame.(type) {
591 case *http2.MetaHeadersFrame:
592 if t.operateHeaders(frame, handle, traceCtx) {
593 t.Close()
594 break
595 }
596 case *http2.DataFrame:
597 t.handleData(frame)
598 case *http2.RSTStreamFrame:
599 t.handleRSTStream(frame)
600 case *http2.SettingsFrame:
601 t.handleSettings(frame)
602 case *http2.PingFrame:
603 t.handlePing(frame)
604 case *http2.WindowUpdateFrame:
605 t.handleWindowUpdate(frame)
606 case *http2.GoAwayFrame:
607 // TODO: Handle GoAway from the client appropriately.
608 default:
609 if logger.V(logLevel) {
610 logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
611 }
612 }
613 }
614}
615
616func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
617 t.mu.Lock()
618 defer t.mu.Unlock()
619 if t.activeStreams == nil {
620 // The transport is closing.
621 return nil, false
622 }
623 s, ok := t.activeStreams[f.Header().StreamID]
624 if !ok {
625 // The stream is already done.
626 return nil, false
627 }
628 return s, true
629}
630
631// adjustWindow sends out extra window update over the initial window size
632// of stream if the application is requesting data larger in size than
633// the window.
634func (t *http2Server) adjustWindow(s *Stream, n uint32) {
635 if w := s.fc.maybeAdjust(n); w > 0 {
636 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
637 }
638
639}
640
641// updateWindow adjusts the inbound quota for the stream and the transport.
642// Window updates will deliver to the controller for sending when
643// the cumulative quota exceeds the corresponding threshold.
644func (t *http2Server) updateWindow(s *Stream, n uint32) {
645 if w := s.fc.onRead(n); w > 0 {
646 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
647 increment: w,
648 })
649 }
650}
651
652// updateFlowControl updates the incoming flow control windows
653// for the transport and the stream based on the current bdp
654// estimation.
655func (t *http2Server) updateFlowControl(n uint32) {
656 t.mu.Lock()
657 for _, s := range t.activeStreams {
658 s.fc.newLimit(n)
659 }
660 t.initialWindowSize = int32(n)
661 t.mu.Unlock()
662 t.controlBuf.put(&outgoingWindowUpdate{
663 streamID: 0,
664 increment: t.fc.newLimit(n),
665 })
666 t.controlBuf.put(&outgoingSettings{
667 ss: []http2.Setting{
668 {
669 ID: http2.SettingInitialWindowSize,
670 Val: n,
671 },
672 },
673 })
674
675}
676
677func (t *http2Server) handleData(f *http2.DataFrame) {
678 size := f.Header().Length
679 var sendBDPPing bool
680 if t.bdpEst != nil {
681 sendBDPPing = t.bdpEst.add(size)
682 }
683 // Decouple connection's flow control from application's read.
684 // An update on connection's flow control should not depend on
685 // whether user application has read the data or not. Such a
686 // restriction is already imposed on the stream's flow control,
687 // and therefore the sender will be blocked anyways.
688 // Decoupling the connection flow control will prevent other
689 // active(fast) streams from starving in presence of slow or
690 // inactive streams.
691 if w := t.fc.onData(size); w > 0 {
692 t.controlBuf.put(&outgoingWindowUpdate{
693 streamID: 0,
694 increment: w,
695 })
696 }
697 if sendBDPPing {
698 // Avoid excessive ping detection (e.g. in an L7 proxy)
699 // by sending a window update prior to the BDP ping.
700 if w := t.fc.reset(); w > 0 {
701 t.controlBuf.put(&outgoingWindowUpdate{
702 streamID: 0,
703 increment: w,
704 })
705 }
706 t.controlBuf.put(bdpPing)
707 }
708 // Select the right stream to dispatch.
709 s, ok := t.getStream(f)
710 if !ok {
711 return
712 }
713 if s.getState() == streamReadDone {
714 t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
715 return
716 }
717 if size > 0 {
718 if err := s.fc.onData(size); err != nil {
719 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
720 return
721 }
722 if f.Header().Flags.Has(http2.FlagDataPadded) {
723 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
724 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
725 }
726 }
727 // TODO(bradfitz, zhaoq): A copy is required here because there is no
728 // guarantee f.Data() is consumed before the arrival of next frame.
729 // Can this copy be eliminated?
730 if len(f.Data()) > 0 {
731 buffer := t.bufferPool.get()
732 buffer.Reset()
733 buffer.Write(f.Data())
734 s.write(recvMsg{buffer: buffer})
735 }
736 }
737 if f.Header().Flags.Has(http2.FlagDataEndStream) {
738 // Received the end of stream from the client.
739 s.compareAndSwapState(streamActive, streamReadDone)
740 s.write(recvMsg{err: io.EOF})
741 }
742}
743
744func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
745 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
746 if s, ok := t.getStream(f); ok {
747 t.closeStream(s, false, 0, false)
748 return
749 }
750 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
751 t.controlBuf.put(&cleanupStream{
752 streamID: f.Header().StreamID,
753 rst: false,
754 rstCode: 0,
755 onWrite: func() {},
756 })
757}
758
759func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
760 if f.IsAck() {
761 return
762 }
763 var ss []http2.Setting
764 var updateFuncs []func()
765 f.ForeachSetting(func(s http2.Setting) error {
766 switch s.ID {
767 case http2.SettingMaxHeaderListSize:
768 updateFuncs = append(updateFuncs, func() {
769 t.maxSendHeaderListSize = new(uint32)
770 *t.maxSendHeaderListSize = s.Val
771 })
772 default:
773 ss = append(ss, s)
774 }
775 return nil
776 })
777 t.controlBuf.executeAndPut(func(interface{}) bool {
778 for _, f := range updateFuncs {
779 f()
780 }
781 return true
782 }, &incomingSettings{
783 ss: ss,
784 })
785}
786
787const (
788 maxPingStrikes = 2
789 defaultPingTimeout = 2 * time.Hour
790)
791
792func (t *http2Server) handlePing(f *http2.PingFrame) {
793 if f.IsAck() {
794 if f.Data == goAwayPing.data && t.drainChan != nil {
795 close(t.drainChan)
796 return
797 }
798 // Maybe it's a BDP ping.
799 if t.bdpEst != nil {
800 t.bdpEst.calculate(f.Data)
801 }
802 return
803 }
804 pingAck := &ping{ack: true}
805 copy(pingAck.data[:], f.Data[:])
806 t.controlBuf.put(pingAck)
807
808 now := time.Now()
809 defer func() {
810 t.lastPingAt = now
811 }()
812 // A reset ping strikes means that we don't need to check for policy
813 // violation for this ping and the pingStrikes counter should be set
814 // to 0.
815 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
816 t.pingStrikes = 0
817 return
818 }
819 t.mu.Lock()
820 ns := len(t.activeStreams)
821 t.mu.Unlock()
822 if ns < 1 && !t.kep.PermitWithoutStream {
823 // Keepalive shouldn't be active thus, this new ping should
824 // have come after at least defaultPingTimeout.
825 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
826 t.pingStrikes++
827 }
828 } else {
829 // Check if keepalive policy is respected.
830 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
831 t.pingStrikes++
832 }
833 }
834
835 if t.pingStrikes > maxPingStrikes {
836 // Send goaway and close the connection.
837 if logger.V(logLevel) {
838 logger.Errorf("transport: Got too many pings from the client, closing the connection.")
839 }
840 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
841 }
842}
843
844func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
845 t.controlBuf.put(&incomingWindowUpdate{
846 streamID: f.Header().StreamID,
847 increment: f.Increment,
848 })
849}
850
851func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
852 for k, vv := range md {
853 if isReservedHeader(k) {
854 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
855 continue
856 }
857 for _, v := range vv {
858 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
859 }
860 }
861 return headerFields
862}
863
864func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
865 if t.maxSendHeaderListSize == nil {
866 return true
867 }
868 hdrFrame := it.(*headerFrame)
869 var sz int64
870 for _, f := range hdrFrame.hf {
871 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
872 if logger.V(logLevel) {
873 logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
874 }
875 return false
876 }
877 }
878 return true
879}
880
881// WriteHeader sends the header metadata md back to the client.
882func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
883 if s.updateHeaderSent() || s.getState() == streamDone {
884 return ErrIllegalHeaderWrite
885 }
886 s.hdrMu.Lock()
887 if md.Len() > 0 {
888 if s.header.Len() > 0 {
889 s.header = metadata.Join(s.header, md)
890 } else {
891 s.header = md
892 }
893 }
894 if err := t.writeHeaderLocked(s); err != nil {
895 s.hdrMu.Unlock()
896 return err
897 }
898 s.hdrMu.Unlock()
899 return nil
900}
901
902func (t *http2Server) setResetPingStrikes() {
903 atomic.StoreUint32(&t.resetPingStrikes, 1)
904}
905
906func (t *http2Server) writeHeaderLocked(s *Stream) error {
907 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
908 // first and create a slice of that exact size.
909 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
910 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
911 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
912 if s.sendCompress != "" {
913 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
914 }
915 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
916 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
917 streamID: s.id,
918 hf: headerFields,
919 endStream: false,
920 onWrite: t.setResetPingStrikes,
921 })
922 if !success {
923 if err != nil {
924 return err
925 }
926 t.closeStream(s, true, http2.ErrCodeInternal, false)
927 return ErrHeaderListSizeLimitViolation
928 }
929 if t.stats != nil {
930 // Note: Headers are compressed with hpack after this call returns.
931 // No WireLength field is set here.
932 outHeader := &stats.OutHeader{
933 Header: s.header.Copy(),
934 Compression: s.sendCompress,
935 }
936 t.stats.HandleRPC(s.Context(), outHeader)
937 }
938 return nil
939}
940
941// WriteStatus sends stream status to the client and terminates the stream.
942// There is no further I/O operations being able to perform on this stream.
943// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
944// OK is adopted.
945func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
946 if s.getState() == streamDone {
947 return nil
948 }
949 s.hdrMu.Lock()
950 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
951 // first and create a slice of that exact size.
952 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
953 if !s.updateHeaderSent() { // No headers have been sent.
954 if len(s.header) > 0 { // Send a separate header frame.
955 if err := t.writeHeaderLocked(s); err != nil {
956 s.hdrMu.Unlock()
957 return err
958 }
959 } else { // Send a trailer only response.
960 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
961 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
962 }
963 }
964 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
965 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
966
967 if p := st.Proto(); p != nil && len(p.Details) > 0 {
968 stBytes, err := proto.Marshal(p)
969 if err != nil {
970 // TODO: return error instead, when callers are able to handle it.
971 logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
972 } else {
973 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
974 }
975 }
976
977 // Attach the trailer metadata.
978 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
979 trailingHeader := &headerFrame{
980 streamID: s.id,
981 hf: headerFields,
982 endStream: true,
983 onWrite: t.setResetPingStrikes,
984 }
985 s.hdrMu.Unlock()
986 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
987 if !success {
988 if err != nil {
989 return err
990 }
991 t.closeStream(s, true, http2.ErrCodeInternal, false)
992 return ErrHeaderListSizeLimitViolation
993 }
994 // Send a RST_STREAM after the trailers if the client has not already half-closed.
995 rst := s.getState() == streamActive
996 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
997 if t.stats != nil {
998 // Note: The trailer fields are compressed with hpack after this call returns.
999 // No WireLength field is set here.
1000 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
1001 Trailer: s.trailer.Copy(),
1002 })
1003 }
1004 return nil
1005}
1006
1007// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
1008// is returns if it fails (e.g., framing error, transport error).
1009func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
1010 if !s.isHeaderSent() { // Headers haven't been written yet.
1011 if err := t.WriteHeader(s, nil); err != nil {
1012 if _, ok := err.(ConnectionError); ok {
1013 return err
1014 }
1015 // TODO(mmukhi, dfawley): Make sure this is the right code to return.
1016 return status.Errorf(codes.Internal, "transport: %v", err)
1017 }
1018 } else {
1019 // Writing headers checks for this condition.
1020 if s.getState() == streamDone {
1021 // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
1022 s.cancel()
1023 select {
1024 case <-t.done:
1025 return ErrConnClosing
1026 default:
1027 }
1028 return ContextErr(s.ctx.Err())
1029 }
1030 }
1031 df := &dataFrame{
1032 streamID: s.id,
1033 h: hdr,
1034 d: data,
1035 onEachWrite: t.setResetPingStrikes,
1036 }
1037 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
1038 select {
1039 case <-t.done:
1040 return ErrConnClosing
1041 default:
1042 }
1043 return ContextErr(s.ctx.Err())
1044 }
1045 return t.controlBuf.put(df)
1046}
1047
1048// keepalive running in a separate goroutine does the following:
1049// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
1050// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
1051// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
1052// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
1053// after an additional duration of keepalive.Timeout.
1054func (t *http2Server) keepalive() {
1055 p := &ping{}
1056 // True iff a ping has been sent, and no data has been received since then.
1057 outstandingPing := false
1058 // Amount of time remaining before which we should receive an ACK for the
1059 // last sent ping.
1060 kpTimeoutLeft := time.Duration(0)
1061 // Records the last value of t.lastRead before we go block on the timer.
1062 // This is required to check for read activity since then.
1063 prevNano := time.Now().UnixNano()
1064 // Initialize the different timers to their default values.
1065 idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
1066 ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
1067 kpTimer := time.NewTimer(t.kp.Time)
1068 defer func() {
1069 // We need to drain the underlying channel in these timers after a call
1070 // to Stop(), only if we are interested in resetting them. Clearly we
1071 // are not interested in resetting them here.
1072 idleTimer.Stop()
1073 ageTimer.Stop()
1074 kpTimer.Stop()
1075 }()
1076
1077 for {
1078 select {
1079 case <-idleTimer.C:
1080 t.mu.Lock()
1081 idle := t.idle
1082 if idle.IsZero() { // The connection is non-idle.
1083 t.mu.Unlock()
1084 idleTimer.Reset(t.kp.MaxConnectionIdle)
1085 continue
1086 }
1087 val := t.kp.MaxConnectionIdle - time.Since(idle)
1088 t.mu.Unlock()
1089 if val <= 0 {
1090 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
1091 // Gracefully close the connection.
1092 t.Drain()
1093 return
1094 }
1095 idleTimer.Reset(val)
1096 case <-ageTimer.C:
1097 t.Drain()
1098 ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1099 select {
1100 case <-ageTimer.C:
1101 // Close the connection after grace period.
1102 if logger.V(logLevel) {
1103 logger.Infof("transport: closing server transport due to maximum connection age.")
1104 }
1105 t.Close()
1106 case <-t.done:
1107 }
1108 return
1109 case <-kpTimer.C:
1110 lastRead := atomic.LoadInt64(&t.lastRead)
1111 if lastRead > prevNano {
1112 // There has been read activity since the last time we were
1113 // here. Setup the timer to fire at kp.Time seconds from
1114 // lastRead time and continue.
1115 outstandingPing = false
1116 kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1117 prevNano = lastRead
1118 continue
1119 }
1120 if outstandingPing && kpTimeoutLeft <= 0 {
1121 if logger.V(logLevel) {
1122 logger.Infof("transport: closing server transport due to idleness.")
1123 }
1124 t.Close()
1125 return
1126 }
1127 if !outstandingPing {
1128 if channelz.IsOn() {
1129 atomic.AddInt64(&t.czData.kpCount, 1)
1130 }
1131 t.controlBuf.put(p)
1132 kpTimeoutLeft = t.kp.Timeout
1133 outstandingPing = true
1134 }
1135 // The amount of time to sleep here is the minimum of kp.Time and
1136 // timeoutLeft. This will ensure that we wait only for kp.Time
1137 // before sending out the next ping (for cases where the ping is
1138 // acked).
1139 sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
1140 kpTimeoutLeft -= sleepDuration
1141 kpTimer.Reset(sleepDuration)
1142 case <-t.done:
1143 return
1144 }
1145 }
1146}
1147
1148// Close starts shutting down the http2Server transport.
1149// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1150// could cause some resource issue. Revisit this later.
1151func (t *http2Server) Close() {
1152 t.mu.Lock()
1153 if t.state == closing {
1154 t.mu.Unlock()
1155 return
1156 }
1157 t.state = closing
1158 streams := t.activeStreams
1159 t.activeStreams = nil
1160 t.mu.Unlock()
1161 t.controlBuf.finish()
1162 close(t.done)
1163 if err := t.conn.Close(); err != nil && logger.V(logLevel) {
1164 logger.Infof("transport: error closing conn during Close: %v", err)
1165 }
1166 if channelz.IsOn() {
1167 channelz.RemoveEntry(t.channelzID)
1168 }
1169 // Cancel all active streams.
1170 for _, s := range streams {
1171 s.cancel()
1172 }
1173 if t.stats != nil {
1174 connEnd := &stats.ConnEnd{}
1175 t.stats.HandleConn(t.ctx, connEnd)
1176 }
1177}
1178
1179// deleteStream deletes the stream s from transport's active streams.
1180func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1181 // In case stream sending and receiving are invoked in separate
1182 // goroutines (e.g., bi-directional streaming), cancel needs to be
1183 // called to interrupt the potential blocking on other goroutines.
1184 s.cancel()
1185
1186 t.mu.Lock()
1187 if _, ok := t.activeStreams[s.id]; ok {
1188 delete(t.activeStreams, s.id)
1189 if len(t.activeStreams) == 0 {
1190 t.idle = time.Now()
1191 }
1192 }
1193 t.mu.Unlock()
1194
1195 if channelz.IsOn() {
1196 if eosReceived {
1197 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1198 } else {
1199 atomic.AddInt64(&t.czData.streamsFailed, 1)
1200 }
1201 }
1202}
1203
1204// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1205func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1206 oldState := s.swapState(streamDone)
1207 if oldState == streamDone {
1208 // If the stream was already done, return.
1209 return
1210 }
1211
1212 hdr.cleanup = &cleanupStream{
1213 streamID: s.id,
1214 rst: rst,
1215 rstCode: rstCode,
1216 onWrite: func() {
1217 t.deleteStream(s, eosReceived)
1218 },
1219 }
1220 t.controlBuf.put(hdr)
1221}
1222
1223// closeStream clears the footprint of a stream when the stream is not needed any more.
1224func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1225 s.swapState(streamDone)
1226 t.deleteStream(s, eosReceived)
1227
1228 t.controlBuf.put(&cleanupStream{
1229 streamID: s.id,
1230 rst: rst,
1231 rstCode: rstCode,
1232 onWrite: func() {},
1233 })
1234}
1235
1236func (t *http2Server) RemoteAddr() net.Addr {
1237 return t.remoteAddr
1238}
1239
1240func (t *http2Server) Drain() {
1241 t.mu.Lock()
1242 defer t.mu.Unlock()
1243 if t.drainChan != nil {
1244 return
1245 }
1246 t.drainChan = make(chan struct{})
1247 t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
1248}
1249
1250var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1251
1252// Handles outgoing GoAway and returns true if loopy needs to put itself
1253// in draining mode.
1254func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1255 t.mu.Lock()
1256 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1257 t.mu.Unlock()
1258 // The transport is closing.
1259 return false, ErrConnClosing
1260 }
1261 sid := t.maxStreamID
1262 if !g.headsUp {
1263 // Stop accepting more streams now.
1264 t.state = draining
1265 if len(t.activeStreams) == 0 {
1266 g.closeConn = true
1267 }
1268 t.mu.Unlock()
1269 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1270 return false, err
1271 }
1272 if g.closeConn {
1273 // Abruptly close the connection following the GoAway (via
1274 // loopywriter). But flush out what's inside the buffer first.
1275 t.framer.writer.Flush()
1276 return false, fmt.Errorf("transport: Connection closing")
1277 }
1278 return true, nil
1279 }
1280 t.mu.Unlock()
1281 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1282 // Follow that with a ping and wait for the ack to come back or a timer
1283 // to expire. During this time accept new streams since they might have
1284 // originated before the GoAway reaches the client.
1285 // After getting the ack or timer expiration send out another GoAway this
1286 // time with an ID of the max stream server intends to process.
1287 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1288 return false, err
1289 }
1290 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1291 return false, err
1292 }
1293 go func() {
1294 timer := time.NewTimer(time.Minute)
1295 defer timer.Stop()
1296 select {
1297 case <-t.drainChan:
1298 case <-timer.C:
1299 case <-t.done:
1300 return
1301 }
1302 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1303 }()
1304 return false, nil
1305}
1306
1307func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1308 s := channelz.SocketInternalMetric{
1309 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1310 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1311 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1312 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1313 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1314 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1315 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1316 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1317 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1318 LocalFlowControlWindow: int64(t.fc.getSize()),
1319 SocketOptions: channelz.GetSocketOption(t.conn),
1320 LocalAddr: t.localAddr,
1321 RemoteAddr: t.remoteAddr,
1322 // RemoteName :
1323 }
1324 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1325 s.Security = au.GetSecurityValue()
1326 }
1327 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1328 return &s
1329}
1330
1331func (t *http2Server) IncrMsgSent() {
1332 atomic.AddInt64(&t.czData.msgSent, 1)
1333 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1334}
1335
1336func (t *http2Server) IncrMsgRecv() {
1337 atomic.AddInt64(&t.czData.msgRecv, 1)
1338 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1339}
1340
1341func (t *http2Server) getOutFlowWindow() int64 {
1342 resp := make(chan uint32, 1)
1343 timer := time.NewTimer(time.Second)
1344 defer timer.Stop()
1345 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1346 select {
1347 case sz := <-resp:
1348 return int64(sz)
1349 case <-t.done:
1350 return -1
1351 case <-timer.C:
1352 return -2
1353 }
1354}
1355
1356func getJitter(v time.Duration) time.Duration {
1357 if v == infinity {
1358 return 0
1359 }
1360 // Generate a jitter between +/- 10% of the value.
1361 r := int64(v / 10)
1362 j := grpcrand.Int63n(2*r) - r
1363 return time.Duration(j)
1364}
1365
1366type connectionKey struct{}
1367
1368// GetConnection gets the connection from the context.
1369func GetConnection(ctx context.Context) net.Conn {
1370 conn, _ := ctx.Value(connectionKey{}).(net.Conn)
1371 return conn
1372}
1373
1374// SetConnection adds the connection to the context to be able to get
1375// information about the destination ip and port for an incoming RPC. This also
1376// allows any unary or streaming interceptors to see the connection.
1377func setConnection(ctx context.Context, conn net.Conn) context.Context {
1378 return context.WithValue(ctx, connectionKey{}, conn)
1379}