blob: 33686a11ab953b4fdadc7d4f1c4e27a62a271197 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "strconv"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "github.com/golang/protobuf/proto"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37
divyadesai19009132020-03-04 12:58:08 +000038 spb "google.golang.org/genproto/googleapis/rpc/status"
Zack Williamse940c7a2019-08-21 14:25:39 -070039 "google.golang.org/grpc/codes"
40 "google.golang.org/grpc/credentials"
41 "google.golang.org/grpc/grpclog"
divyadesai19009132020-03-04 12:58:08 +000042 "google.golang.org/grpc/internal"
Zack Williamse940c7a2019-08-21 14:25:39 -070043 "google.golang.org/grpc/internal/channelz"
44 "google.golang.org/grpc/internal/grpcrand"
45 "google.golang.org/grpc/keepalive"
46 "google.golang.org/grpc/metadata"
47 "google.golang.org/grpc/peer"
48 "google.golang.org/grpc/stats"
49 "google.golang.org/grpc/status"
50 "google.golang.org/grpc/tap"
51)
52
53var (
54 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
55 // the stream's state.
56 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
57 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
58 // than the limit set by peer.
59 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
divyadesai19009132020-03-04 12:58:08 +000060 // statusRawProto is a function to get to the raw status proto wrapped in a
61 // status.Status without a proto.Clone().
62 statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
Zack Williamse940c7a2019-08-21 14:25:39 -070063)
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67 ctx context.Context
divyadesai19009132020-03-04 12:58:08 +000068 done chan struct{}
Zack Williamse940c7a2019-08-21 14:25:39 -070069 conn net.Conn
70 loopy *loopyWriter
71 readerDone chan struct{} // sync point to enable testing.
72 writerDone chan struct{} // sync point to enable testing.
73 remoteAddr net.Addr
74 localAddr net.Addr
75 maxStreamID uint32 // max stream ID ever seen
76 authInfo credentials.AuthInfo // auth info about the connection
77 inTapHandle tap.ServerInHandle
78 framer *framer
79 // The max number of concurrent streams.
80 maxStreams uint32
81 // controlBuf delivers all the control related tasks (e.g., window
82 // updates, reset streams, and various settings) to the controller.
83 controlBuf *controlBuffer
84 fc *trInFlow
85 stats stats.Handler
86 // Flag to keep track of reading activity on transport.
87 // 1 is true and 0 is false.
88 activity uint32 // Accessed atomically.
89 // Keepalive and max-age parameters for the server.
90 kp keepalive.ServerParameters
91
92 // Keepalive enforcement policy.
93 kep keepalive.EnforcementPolicy
94 // The time instance last ping was received.
95 lastPingAt time.Time
96 // Number of times the client has violated keepalive ping policy so far.
97 pingStrikes uint8
98 // Flag to signify that number of ping strikes should be reset to 0.
99 // This is set whenever data or header frames are sent.
100 // 1 means yes.
101 resetPingStrikes uint32 // Accessed atomically.
102 initialWindowSize int32
103 bdpEst *bdpEstimator
104 maxSendHeaderListSize *uint32
105
106 mu sync.Mutex // guard the following
107
108 // drainChan is initialized when drain(...) is called the first time.
109 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
110 // Then an independent goroutine will be launched to later send the second GoAway.
111 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
112 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
113 // already underway.
114 drainChan chan struct{}
115 state transportState
116 activeStreams map[uint32]*Stream
117 // idle is the time instant when the connection went idle.
118 // This is either the beginning of the connection or when the number of
119 // RPCs go down to 0.
120 // When the connection is busy, this value is set to 0.
121 idle time.Time
122
123 // Fields below are for channelz metric collection.
124 channelzID int64 // channelz unique identification number
125 czData *channelzData
divyadesai19009132020-03-04 12:58:08 +0000126 bufferPool *bufferPool
Zack Williamse940c7a2019-08-21 14:25:39 -0700127}
128
129// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
130// returned if something goes wrong.
131func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
132 writeBufSize := config.WriteBufferSize
133 readBufSize := config.ReadBufferSize
134 maxHeaderListSize := defaultServerMaxHeaderListSize
135 if config.MaxHeaderListSize != nil {
136 maxHeaderListSize = *config.MaxHeaderListSize
137 }
138 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
139 // Send initial settings as connection preface to client.
divyadesai19009132020-03-04 12:58:08 +0000140 isettings := []http2.Setting{{
141 ID: http2.SettingMaxFrameSize,
142 Val: http2MaxFrameLen,
143 }}
Zack Williamse940c7a2019-08-21 14:25:39 -0700144 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
145 // permitted in the HTTP2 spec.
146 maxStreams := config.MaxStreams
147 if maxStreams == 0 {
148 maxStreams = math.MaxUint32
149 } else {
150 isettings = append(isettings, http2.Setting{
151 ID: http2.SettingMaxConcurrentStreams,
152 Val: maxStreams,
153 })
154 }
155 dynamicWindow := true
156 iwz := int32(initialWindowSize)
157 if config.InitialWindowSize >= defaultWindowSize {
158 iwz = config.InitialWindowSize
159 dynamicWindow = false
160 }
161 icwz := int32(initialWindowSize)
162 if config.InitialConnWindowSize >= defaultWindowSize {
163 icwz = config.InitialConnWindowSize
164 dynamicWindow = false
165 }
166 if iwz != defaultWindowSize {
167 isettings = append(isettings, http2.Setting{
168 ID: http2.SettingInitialWindowSize,
169 Val: uint32(iwz)})
170 }
171 if config.MaxHeaderListSize != nil {
172 isettings = append(isettings, http2.Setting{
173 ID: http2.SettingMaxHeaderListSize,
174 Val: *config.MaxHeaderListSize,
175 })
176 }
177 if err := framer.fr.WriteSettings(isettings...); err != nil {
178 return nil, connectionErrorf(false, err, "transport: %v", err)
179 }
180 // Adjust the connection flow control window if needed.
181 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
182 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
183 return nil, connectionErrorf(false, err, "transport: %v", err)
184 }
185 }
186 kp := config.KeepaliveParams
187 if kp.MaxConnectionIdle == 0 {
188 kp.MaxConnectionIdle = defaultMaxConnectionIdle
189 }
190 if kp.MaxConnectionAge == 0 {
191 kp.MaxConnectionAge = defaultMaxConnectionAge
192 }
193 // Add a jitter to MaxConnectionAge.
194 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
195 if kp.MaxConnectionAgeGrace == 0 {
196 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
197 }
198 if kp.Time == 0 {
199 kp.Time = defaultServerKeepaliveTime
200 }
201 if kp.Timeout == 0 {
202 kp.Timeout = defaultServerKeepaliveTimeout
203 }
204 kep := config.KeepalivePolicy
205 if kep.MinTime == 0 {
206 kep.MinTime = defaultKeepalivePolicyMinTime
207 }
divyadesai19009132020-03-04 12:58:08 +0000208 done := make(chan struct{})
Zack Williamse940c7a2019-08-21 14:25:39 -0700209 t := &http2Server{
divyadesai19009132020-03-04 12:58:08 +0000210 ctx: context.Background(),
211 done: done,
Zack Williamse940c7a2019-08-21 14:25:39 -0700212 conn: conn,
213 remoteAddr: conn.RemoteAddr(),
214 localAddr: conn.LocalAddr(),
215 authInfo: config.AuthInfo,
216 framer: framer,
217 readerDone: make(chan struct{}),
218 writerDone: make(chan struct{}),
219 maxStreams: maxStreams,
220 inTapHandle: config.InTapHandle,
221 fc: &trInFlow{limit: uint32(icwz)},
222 state: reachable,
223 activeStreams: make(map[uint32]*Stream),
224 stats: config.StatsHandler,
225 kp: kp,
226 idle: time.Now(),
227 kep: kep,
228 initialWindowSize: iwz,
229 czData: new(channelzData),
divyadesai19009132020-03-04 12:58:08 +0000230 bufferPool: newBufferPool(),
Zack Williamse940c7a2019-08-21 14:25:39 -0700231 }
divyadesai19009132020-03-04 12:58:08 +0000232 t.controlBuf = newControlBuffer(t.done)
Zack Williamse940c7a2019-08-21 14:25:39 -0700233 if dynamicWindow {
234 t.bdpEst = &bdpEstimator{
235 bdp: initialWindowSize,
236 updateFlowControl: t.updateFlowControl,
237 }
238 }
239 if t.stats != nil {
240 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
241 RemoteAddr: t.remoteAddr,
242 LocalAddr: t.localAddr,
243 })
244 connBegin := &stats.ConnBegin{}
245 t.stats.HandleConn(t.ctx, connBegin)
246 }
247 if channelz.IsOn() {
248 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
249 }
250 t.framer.writer.Flush()
251
252 defer func() {
253 if err != nil {
254 t.Close()
255 }
256 }()
257
258 // Check the validity of client preface.
259 preface := make([]byte, len(clientPreface))
260 if _, err := io.ReadFull(t.conn, preface); err != nil {
261 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
262 }
263 if !bytes.Equal(preface, clientPreface) {
264 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
265 }
266
267 frame, err := t.framer.fr.ReadFrame()
268 if err == io.EOF || err == io.ErrUnexpectedEOF {
269 return nil, err
270 }
271 if err != nil {
272 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
273 }
274 atomic.StoreUint32(&t.activity, 1)
275 sf, ok := frame.(*http2.SettingsFrame)
276 if !ok {
277 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
278 }
279 t.handleSettings(sf)
280
281 go func() {
282 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
283 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
284 if err := t.loopy.run(); err != nil {
285 errorf("transport: loopyWriter.run returning. Err: %v", err)
286 }
287 t.conn.Close()
288 close(t.writerDone)
289 }()
290 go t.keepalive()
291 return t, nil
292}
293
294// operateHeader takes action on the decoded headers.
295func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
296 streamID := frame.Header().StreamID
297 state := &decodeState{
298 serverSide: true,
299 }
300 if err := state.decodeHeader(frame); err != nil {
301 if se, ok := status.FromError(err); ok {
302 t.controlBuf.put(&cleanupStream{
303 streamID: streamID,
304 rst: true,
305 rstCode: statusCodeConvTab[se.Code()],
306 onWrite: func() {},
307 })
308 }
309 return false
310 }
311
312 buf := newRecvBuffer()
313 s := &Stream{
314 id: streamID,
315 st: t,
316 buf: buf,
317 fc: &inFlow{limit: uint32(t.initialWindowSize)},
318 recvCompress: state.data.encoding,
319 method: state.data.method,
320 contentSubtype: state.data.contentSubtype,
321 }
322 if frame.StreamEnded() {
323 // s is just created by the caller. No lock needed.
324 s.state = streamReadDone
325 }
326 if state.data.timeoutSet {
327 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
328 } else {
329 s.ctx, s.cancel = context.WithCancel(t.ctx)
330 }
331 pr := &peer.Peer{
332 Addr: t.remoteAddr,
333 }
334 // Attach Auth info if there is any.
335 if t.authInfo != nil {
336 pr.AuthInfo = t.authInfo
337 }
338 s.ctx = peer.NewContext(s.ctx, pr)
339 // Attach the received metadata to the context.
340 if len(state.data.mdata) > 0 {
341 s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
342 }
343 if state.data.statsTags != nil {
344 s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
345 }
346 if state.data.statsTrace != nil {
347 s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
348 }
349 if t.inTapHandle != nil {
350 var err error
351 info := &tap.Info{
352 FullMethodName: state.data.method,
353 }
354 s.ctx, err = t.inTapHandle(s.ctx, info)
355 if err != nil {
356 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
357 t.controlBuf.put(&cleanupStream{
358 streamID: s.id,
359 rst: true,
360 rstCode: http2.ErrCodeRefusedStream,
361 onWrite: func() {},
362 })
divyadesai19009132020-03-04 12:58:08 +0000363 s.cancel()
Zack Williamse940c7a2019-08-21 14:25:39 -0700364 return false
365 }
366 }
367 t.mu.Lock()
368 if t.state != reachable {
369 t.mu.Unlock()
divyadesai19009132020-03-04 12:58:08 +0000370 s.cancel()
Zack Williamse940c7a2019-08-21 14:25:39 -0700371 return false
372 }
373 if uint32(len(t.activeStreams)) >= t.maxStreams {
374 t.mu.Unlock()
375 t.controlBuf.put(&cleanupStream{
376 streamID: streamID,
377 rst: true,
378 rstCode: http2.ErrCodeRefusedStream,
379 onWrite: func() {},
380 })
divyadesai19009132020-03-04 12:58:08 +0000381 s.cancel()
Zack Williamse940c7a2019-08-21 14:25:39 -0700382 return false
383 }
384 if streamID%2 != 1 || streamID <= t.maxStreamID {
385 t.mu.Unlock()
386 // illegal gRPC stream id.
387 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
divyadesai19009132020-03-04 12:58:08 +0000388 s.cancel()
Zack Williamse940c7a2019-08-21 14:25:39 -0700389 return true
390 }
391 t.maxStreamID = streamID
392 t.activeStreams[streamID] = s
393 if len(t.activeStreams) == 1 {
394 t.idle = time.Time{}
395 }
396 t.mu.Unlock()
397 if channelz.IsOn() {
398 atomic.AddInt64(&t.czData.streamsStarted, 1)
399 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
400 }
401 s.requestRead = func(n int) {
402 t.adjustWindow(s, uint32(n))
403 }
404 s.ctx = traceCtx(s.ctx, s.method)
405 if t.stats != nil {
406 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
407 inHeader := &stats.InHeader{
408 FullMethod: s.method,
409 RemoteAddr: t.remoteAddr,
410 LocalAddr: t.localAddr,
411 Compression: s.recvCompress,
412 WireLength: int(frame.Header().Length),
413 }
414 t.stats.HandleRPC(s.ctx, inHeader)
415 }
416 s.ctxDone = s.ctx.Done()
417 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
418 s.trReader = &transportReader{
419 reader: &recvBufferReader{
divyadesai19009132020-03-04 12:58:08 +0000420 ctx: s.ctx,
421 ctxDone: s.ctxDone,
422 recv: s.buf,
423 freeBuffer: t.bufferPool.put,
Zack Williamse940c7a2019-08-21 14:25:39 -0700424 },
425 windowHandler: func(n int) {
426 t.updateWindow(s, uint32(n))
427 },
428 }
429 // Register the stream with loopy.
430 t.controlBuf.put(&registerStream{
431 streamID: s.id,
432 wq: s.wq,
433 })
434 handle(s)
435 return false
436}
437
438// HandleStreams receives incoming streams using the given handler. This is
439// typically run in a separate goroutine.
440// traceCtx attaches trace to ctx and returns the new context.
441func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
442 defer close(t.readerDone)
443 for {
divyadesai19009132020-03-04 12:58:08 +0000444 t.controlBuf.throttle()
Zack Williamse940c7a2019-08-21 14:25:39 -0700445 frame, err := t.framer.fr.ReadFrame()
446 atomic.StoreUint32(&t.activity, 1)
447 if err != nil {
448 if se, ok := err.(http2.StreamError); ok {
449 warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
450 t.mu.Lock()
451 s := t.activeStreams[se.StreamID]
452 t.mu.Unlock()
453 if s != nil {
454 t.closeStream(s, true, se.Code, false)
455 } else {
456 t.controlBuf.put(&cleanupStream{
457 streamID: se.StreamID,
458 rst: true,
459 rstCode: se.Code,
460 onWrite: func() {},
461 })
462 }
463 continue
464 }
465 if err == io.EOF || err == io.ErrUnexpectedEOF {
466 t.Close()
467 return
468 }
469 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
470 t.Close()
471 return
472 }
473 switch frame := frame.(type) {
474 case *http2.MetaHeadersFrame:
475 if t.operateHeaders(frame, handle, traceCtx) {
476 t.Close()
477 break
478 }
479 case *http2.DataFrame:
480 t.handleData(frame)
481 case *http2.RSTStreamFrame:
482 t.handleRSTStream(frame)
483 case *http2.SettingsFrame:
484 t.handleSettings(frame)
485 case *http2.PingFrame:
486 t.handlePing(frame)
487 case *http2.WindowUpdateFrame:
488 t.handleWindowUpdate(frame)
489 case *http2.GoAwayFrame:
490 // TODO: Handle GoAway from the client appropriately.
491 default:
492 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
493 }
494 }
495}
496
497func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
498 t.mu.Lock()
499 defer t.mu.Unlock()
500 if t.activeStreams == nil {
501 // The transport is closing.
502 return nil, false
503 }
504 s, ok := t.activeStreams[f.Header().StreamID]
505 if !ok {
506 // The stream is already done.
507 return nil, false
508 }
509 return s, true
510}
511
512// adjustWindow sends out extra window update over the initial window size
513// of stream if the application is requesting data larger in size than
514// the window.
515func (t *http2Server) adjustWindow(s *Stream, n uint32) {
516 if w := s.fc.maybeAdjust(n); w > 0 {
517 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
518 }
519
520}
521
522// updateWindow adjusts the inbound quota for the stream and the transport.
523// Window updates will deliver to the controller for sending when
524// the cumulative quota exceeds the corresponding threshold.
525func (t *http2Server) updateWindow(s *Stream, n uint32) {
526 if w := s.fc.onRead(n); w > 0 {
527 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
528 increment: w,
529 })
530 }
531}
532
533// updateFlowControl updates the incoming flow control windows
534// for the transport and the stream based on the current bdp
535// estimation.
536func (t *http2Server) updateFlowControl(n uint32) {
537 t.mu.Lock()
538 for _, s := range t.activeStreams {
539 s.fc.newLimit(n)
540 }
541 t.initialWindowSize = int32(n)
542 t.mu.Unlock()
543 t.controlBuf.put(&outgoingWindowUpdate{
544 streamID: 0,
545 increment: t.fc.newLimit(n),
546 })
547 t.controlBuf.put(&outgoingSettings{
548 ss: []http2.Setting{
549 {
550 ID: http2.SettingInitialWindowSize,
551 Val: n,
552 },
553 },
554 })
555
556}
557
558func (t *http2Server) handleData(f *http2.DataFrame) {
559 size := f.Header().Length
560 var sendBDPPing bool
561 if t.bdpEst != nil {
562 sendBDPPing = t.bdpEst.add(size)
563 }
564 // Decouple connection's flow control from application's read.
565 // An update on connection's flow control should not depend on
566 // whether user application has read the data or not. Such a
567 // restriction is already imposed on the stream's flow control,
568 // and therefore the sender will be blocked anyways.
569 // Decoupling the connection flow control will prevent other
570 // active(fast) streams from starving in presence of slow or
571 // inactive streams.
572 if w := t.fc.onData(size); w > 0 {
573 t.controlBuf.put(&outgoingWindowUpdate{
574 streamID: 0,
575 increment: w,
576 })
577 }
578 if sendBDPPing {
579 // Avoid excessive ping detection (e.g. in an L7 proxy)
580 // by sending a window update prior to the BDP ping.
581 if w := t.fc.reset(); w > 0 {
582 t.controlBuf.put(&outgoingWindowUpdate{
583 streamID: 0,
584 increment: w,
585 })
586 }
587 t.controlBuf.put(bdpPing)
588 }
589 // Select the right stream to dispatch.
590 s, ok := t.getStream(f)
591 if !ok {
592 return
593 }
594 if size > 0 {
595 if err := s.fc.onData(size); err != nil {
596 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
597 return
598 }
599 if f.Header().Flags.Has(http2.FlagDataPadded) {
600 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
601 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
602 }
603 }
604 // TODO(bradfitz, zhaoq): A copy is required here because there is no
605 // guarantee f.Data() is consumed before the arrival of next frame.
606 // Can this copy be eliminated?
607 if len(f.Data()) > 0 {
divyadesai19009132020-03-04 12:58:08 +0000608 buffer := t.bufferPool.get()
609 buffer.Reset()
610 buffer.Write(f.Data())
611 s.write(recvMsg{buffer: buffer})
Zack Williamse940c7a2019-08-21 14:25:39 -0700612 }
613 }
614 if f.Header().Flags.Has(http2.FlagDataEndStream) {
615 // Received the end of stream from the client.
616 s.compareAndSwapState(streamActive, streamReadDone)
617 s.write(recvMsg{err: io.EOF})
618 }
619}
620
621func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
622 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
623 if s, ok := t.getStream(f); ok {
624 t.closeStream(s, false, 0, false)
625 return
626 }
627 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
628 t.controlBuf.put(&cleanupStream{
629 streamID: f.Header().StreamID,
630 rst: false,
631 rstCode: 0,
632 onWrite: func() {},
633 })
634}
635
636func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
637 if f.IsAck() {
638 return
639 }
640 var ss []http2.Setting
641 var updateFuncs []func()
642 f.ForeachSetting(func(s http2.Setting) error {
643 switch s.ID {
644 case http2.SettingMaxHeaderListSize:
645 updateFuncs = append(updateFuncs, func() {
646 t.maxSendHeaderListSize = new(uint32)
647 *t.maxSendHeaderListSize = s.Val
648 })
649 default:
650 ss = append(ss, s)
651 }
652 return nil
653 })
654 t.controlBuf.executeAndPut(func(interface{}) bool {
655 for _, f := range updateFuncs {
656 f()
657 }
658 return true
659 }, &incomingSettings{
660 ss: ss,
661 })
662}
663
664const (
665 maxPingStrikes = 2
666 defaultPingTimeout = 2 * time.Hour
667)
668
669func (t *http2Server) handlePing(f *http2.PingFrame) {
670 if f.IsAck() {
671 if f.Data == goAwayPing.data && t.drainChan != nil {
672 close(t.drainChan)
673 return
674 }
675 // Maybe it's a BDP ping.
676 if t.bdpEst != nil {
677 t.bdpEst.calculate(f.Data)
678 }
679 return
680 }
681 pingAck := &ping{ack: true}
682 copy(pingAck.data[:], f.Data[:])
683 t.controlBuf.put(pingAck)
684
685 now := time.Now()
686 defer func() {
687 t.lastPingAt = now
688 }()
689 // A reset ping strikes means that we don't need to check for policy
690 // violation for this ping and the pingStrikes counter should be set
691 // to 0.
692 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
693 t.pingStrikes = 0
694 return
695 }
696 t.mu.Lock()
697 ns := len(t.activeStreams)
698 t.mu.Unlock()
699 if ns < 1 && !t.kep.PermitWithoutStream {
700 // Keepalive shouldn't be active thus, this new ping should
701 // have come after at least defaultPingTimeout.
702 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
703 t.pingStrikes++
704 }
705 } else {
706 // Check if keepalive policy is respected.
707 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
708 t.pingStrikes++
709 }
710 }
711
712 if t.pingStrikes > maxPingStrikes {
713 // Send goaway and close the connection.
714 errorf("transport: Got too many pings from the client, closing the connection.")
715 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
716 }
717}
718
719func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
720 t.controlBuf.put(&incomingWindowUpdate{
721 streamID: f.Header().StreamID,
722 increment: f.Increment,
723 })
724}
725
726func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
727 for k, vv := range md {
728 if isReservedHeader(k) {
729 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
730 continue
731 }
732 for _, v := range vv {
733 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
734 }
735 }
736 return headerFields
737}
738
739func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
740 if t.maxSendHeaderListSize == nil {
741 return true
742 }
743 hdrFrame := it.(*headerFrame)
744 var sz int64
745 for _, f := range hdrFrame.hf {
746 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
747 errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
748 return false
749 }
750 }
751 return true
752}
753
754// WriteHeader sends the header metedata md back to the client.
755func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
756 if s.updateHeaderSent() || s.getState() == streamDone {
757 return ErrIllegalHeaderWrite
758 }
759 s.hdrMu.Lock()
760 if md.Len() > 0 {
761 if s.header.Len() > 0 {
762 s.header = metadata.Join(s.header, md)
763 } else {
764 s.header = md
765 }
766 }
767 if err := t.writeHeaderLocked(s); err != nil {
768 s.hdrMu.Unlock()
769 return err
770 }
771 s.hdrMu.Unlock()
772 return nil
773}
774
divyadesai19009132020-03-04 12:58:08 +0000775func (t *http2Server) setResetPingStrikes() {
776 atomic.StoreUint32(&t.resetPingStrikes, 1)
777}
778
Zack Williamse940c7a2019-08-21 14:25:39 -0700779func (t *http2Server) writeHeaderLocked(s *Stream) error {
780 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
781 // first and create a slice of that exact size.
782 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
783 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
784 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
785 if s.sendCompress != "" {
786 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
787 }
788 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
789 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
790 streamID: s.id,
791 hf: headerFields,
792 endStream: false,
divyadesai19009132020-03-04 12:58:08 +0000793 onWrite: t.setResetPingStrikes,
Zack Williamse940c7a2019-08-21 14:25:39 -0700794 })
795 if !success {
796 if err != nil {
797 return err
798 }
799 t.closeStream(s, true, http2.ErrCodeInternal, false)
800 return ErrHeaderListSizeLimitViolation
801 }
802 if t.stats != nil {
803 // Note: WireLength is not set in outHeader.
804 // TODO(mmukhi): Revisit this later, if needed.
805 outHeader := &stats.OutHeader{}
806 t.stats.HandleRPC(s.Context(), outHeader)
807 }
808 return nil
809}
810
811// WriteStatus sends stream status to the client and terminates the stream.
812// There is no further I/O operations being able to perform on this stream.
813// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
814// OK is adopted.
815func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
816 if s.getState() == streamDone {
817 return nil
818 }
819 s.hdrMu.Lock()
820 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
821 // first and create a slice of that exact size.
822 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
823 if !s.updateHeaderSent() { // No headers have been sent.
824 if len(s.header) > 0 { // Send a separate header frame.
825 if err := t.writeHeaderLocked(s); err != nil {
826 s.hdrMu.Unlock()
827 return err
828 }
829 } else { // Send a trailer only response.
830 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
831 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
832 }
833 }
834 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
835 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
836
divyadesai19009132020-03-04 12:58:08 +0000837 if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
Zack Williamse940c7a2019-08-21 14:25:39 -0700838 stBytes, err := proto.Marshal(p)
839 if err != nil {
840 // TODO: return error instead, when callers are able to handle it.
841 grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
842 } else {
843 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
844 }
845 }
846
847 // Attach the trailer metadata.
848 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
849 trailingHeader := &headerFrame{
850 streamID: s.id,
851 hf: headerFields,
852 endStream: true,
divyadesai19009132020-03-04 12:58:08 +0000853 onWrite: t.setResetPingStrikes,
Zack Williamse940c7a2019-08-21 14:25:39 -0700854 }
855 s.hdrMu.Unlock()
856 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
857 if !success {
858 if err != nil {
859 return err
860 }
861 t.closeStream(s, true, http2.ErrCodeInternal, false)
862 return ErrHeaderListSizeLimitViolation
863 }
864 // Send a RST_STREAM after the trailers if the client has not already half-closed.
865 rst := s.getState() == streamActive
866 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
867 if t.stats != nil {
868 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
869 }
870 return nil
871}
872
873// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
874// is returns if it fails (e.g., framing error, transport error).
875func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
876 if !s.isHeaderSent() { // Headers haven't been written yet.
877 if err := t.WriteHeader(s, nil); err != nil {
878 if _, ok := err.(ConnectionError); ok {
879 return err
880 }
881 // TODO(mmukhi, dfawley): Make sure this is the right code to return.
882 return status.Errorf(codes.Internal, "transport: %v", err)
883 }
884 } else {
885 // Writing headers checks for this condition.
886 if s.getState() == streamDone {
887 // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
888 s.cancel()
889 select {
divyadesai19009132020-03-04 12:58:08 +0000890 case <-t.done:
Zack Williamse940c7a2019-08-21 14:25:39 -0700891 return ErrConnClosing
892 default:
893 }
894 return ContextErr(s.ctx.Err())
895 }
896 }
897 // Add some data to header frame so that we can equally distribute bytes across frames.
898 emptyLen := http2MaxFrameLen - len(hdr)
899 if emptyLen > len(data) {
900 emptyLen = len(data)
901 }
902 hdr = append(hdr, data[:emptyLen]...)
903 data = data[emptyLen:]
904 df := &dataFrame{
divyadesai19009132020-03-04 12:58:08 +0000905 streamID: s.id,
906 h: hdr,
907 d: data,
908 onEachWrite: t.setResetPingStrikes,
Zack Williamse940c7a2019-08-21 14:25:39 -0700909 }
910 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
911 select {
divyadesai19009132020-03-04 12:58:08 +0000912 case <-t.done:
Zack Williamse940c7a2019-08-21 14:25:39 -0700913 return ErrConnClosing
914 default:
915 }
916 return ContextErr(s.ctx.Err())
917 }
918 return t.controlBuf.put(df)
919}
920
921// keepalive running in a separate goroutine does the following:
922// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
923// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
924// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
925// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
926// after an additional duration of keepalive.Timeout.
927func (t *http2Server) keepalive() {
928 p := &ping{}
929 var pingSent bool
930 maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
931 maxAge := time.NewTimer(t.kp.MaxConnectionAge)
932 keepalive := time.NewTimer(t.kp.Time)
933 // NOTE: All exit paths of this function should reset their
934 // respective timers. A failure to do so will cause the
935 // following clean-up to deadlock and eventually leak.
936 defer func() {
937 if !maxIdle.Stop() {
938 <-maxIdle.C
939 }
940 if !maxAge.Stop() {
941 <-maxAge.C
942 }
943 if !keepalive.Stop() {
944 <-keepalive.C
945 }
946 }()
947 for {
948 select {
949 case <-maxIdle.C:
950 t.mu.Lock()
951 idle := t.idle
952 if idle.IsZero() { // The connection is non-idle.
953 t.mu.Unlock()
954 maxIdle.Reset(t.kp.MaxConnectionIdle)
955 continue
956 }
957 val := t.kp.MaxConnectionIdle - time.Since(idle)
958 t.mu.Unlock()
959 if val <= 0 {
960 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
961 // Gracefully close the connection.
962 t.drain(http2.ErrCodeNo, []byte{})
963 // Resetting the timer so that the clean-up doesn't deadlock.
964 maxIdle.Reset(infinity)
965 return
966 }
967 maxIdle.Reset(val)
968 case <-maxAge.C:
969 t.drain(http2.ErrCodeNo, []byte{})
970 maxAge.Reset(t.kp.MaxConnectionAgeGrace)
971 select {
972 case <-maxAge.C:
973 // Close the connection after grace period.
divyadesai19009132020-03-04 12:58:08 +0000974 infof("transport: closing server transport due to maximum connection age.")
Zack Williamse940c7a2019-08-21 14:25:39 -0700975 t.Close()
976 // Resetting the timer so that the clean-up doesn't deadlock.
977 maxAge.Reset(infinity)
divyadesai19009132020-03-04 12:58:08 +0000978 case <-t.done:
Zack Williamse940c7a2019-08-21 14:25:39 -0700979 }
980 return
981 case <-keepalive.C:
982 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
983 pingSent = false
984 keepalive.Reset(t.kp.Time)
985 continue
986 }
987 if pingSent {
divyadesai19009132020-03-04 12:58:08 +0000988 infof("transport: closing server transport due to idleness.")
Zack Williamse940c7a2019-08-21 14:25:39 -0700989 t.Close()
990 // Resetting the timer so that the clean-up doesn't deadlock.
991 keepalive.Reset(infinity)
992 return
993 }
994 pingSent = true
995 if channelz.IsOn() {
996 atomic.AddInt64(&t.czData.kpCount, 1)
997 }
998 t.controlBuf.put(p)
999 keepalive.Reset(t.kp.Timeout)
divyadesai19009132020-03-04 12:58:08 +00001000 case <-t.done:
Zack Williamse940c7a2019-08-21 14:25:39 -07001001 return
1002 }
1003 }
1004}
1005
1006// Close starts shutting down the http2Server transport.
1007// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1008// could cause some resource issue. Revisit this later.
1009func (t *http2Server) Close() error {
1010 t.mu.Lock()
1011 if t.state == closing {
1012 t.mu.Unlock()
1013 return errors.New("transport: Close() was already called")
1014 }
1015 t.state = closing
1016 streams := t.activeStreams
1017 t.activeStreams = nil
1018 t.mu.Unlock()
1019 t.controlBuf.finish()
divyadesai19009132020-03-04 12:58:08 +00001020 close(t.done)
Zack Williamse940c7a2019-08-21 14:25:39 -07001021 err := t.conn.Close()
1022 if channelz.IsOn() {
1023 channelz.RemoveEntry(t.channelzID)
1024 }
1025 // Cancel all active streams.
1026 for _, s := range streams {
1027 s.cancel()
1028 }
1029 if t.stats != nil {
1030 connEnd := &stats.ConnEnd{}
1031 t.stats.HandleConn(t.ctx, connEnd)
1032 }
1033 return err
1034}
1035
1036// deleteStream deletes the stream s from transport's active streams.
divyadesai19009132020-03-04 12:58:08 +00001037func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
Zack Williamse940c7a2019-08-21 14:25:39 -07001038 // In case stream sending and receiving are invoked in separate
1039 // goroutines (e.g., bi-directional streaming), cancel needs to be
1040 // called to interrupt the potential blocking on other goroutines.
1041 s.cancel()
1042
1043 t.mu.Lock()
1044 if _, ok := t.activeStreams[s.id]; ok {
1045 delete(t.activeStreams, s.id)
1046 if len(t.activeStreams) == 0 {
1047 t.idle = time.Now()
1048 }
1049 }
1050 t.mu.Unlock()
1051
1052 if channelz.IsOn() {
1053 if eosReceived {
1054 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1055 } else {
1056 atomic.AddInt64(&t.czData.streamsFailed, 1)
1057 }
1058 }
Zack Williamse940c7a2019-08-21 14:25:39 -07001059}
1060
1061// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1062func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
divyadesai19009132020-03-04 12:58:08 +00001063 oldState := s.swapState(streamDone)
Zack Williamse940c7a2019-08-21 14:25:39 -07001064 if oldState == streamDone {
divyadesai19009132020-03-04 12:58:08 +00001065 // If the stream was already done, return.
Zack Williamse940c7a2019-08-21 14:25:39 -07001066 return
1067 }
1068
1069 hdr.cleanup = &cleanupStream{
1070 streamID: s.id,
1071 rst: rst,
1072 rstCode: rstCode,
divyadesai19009132020-03-04 12:58:08 +00001073 onWrite: func() {
1074 t.deleteStream(s, eosReceived)
1075 },
Zack Williamse940c7a2019-08-21 14:25:39 -07001076 }
1077 t.controlBuf.put(hdr)
1078}
1079
1080// closeStream clears the footprint of a stream when the stream is not needed any more.
1081func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
divyadesai19009132020-03-04 12:58:08 +00001082 s.swapState(streamDone)
Zack Williamse940c7a2019-08-21 14:25:39 -07001083 t.deleteStream(s, eosReceived)
divyadesai19009132020-03-04 12:58:08 +00001084
Zack Williamse940c7a2019-08-21 14:25:39 -07001085 t.controlBuf.put(&cleanupStream{
1086 streamID: s.id,
1087 rst: rst,
1088 rstCode: rstCode,
1089 onWrite: func() {},
1090 })
1091}
1092
1093func (t *http2Server) RemoteAddr() net.Addr {
1094 return t.remoteAddr
1095}
1096
1097func (t *http2Server) Drain() {
1098 t.drain(http2.ErrCodeNo, []byte{})
1099}
1100
1101func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1102 t.mu.Lock()
1103 defer t.mu.Unlock()
1104 if t.drainChan != nil {
1105 return
1106 }
1107 t.drainChan = make(chan struct{})
1108 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1109}
1110
1111var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1112
1113// Handles outgoing GoAway and returns true if loopy needs to put itself
1114// in draining mode.
1115func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1116 t.mu.Lock()
1117 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1118 t.mu.Unlock()
1119 // The transport is closing.
1120 return false, ErrConnClosing
1121 }
1122 sid := t.maxStreamID
1123 if !g.headsUp {
1124 // Stop accepting more streams now.
1125 t.state = draining
1126 if len(t.activeStreams) == 0 {
1127 g.closeConn = true
1128 }
1129 t.mu.Unlock()
1130 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1131 return false, err
1132 }
1133 if g.closeConn {
1134 // Abruptly close the connection following the GoAway (via
1135 // loopywriter). But flush out what's inside the buffer first.
1136 t.framer.writer.Flush()
1137 return false, fmt.Errorf("transport: Connection closing")
1138 }
1139 return true, nil
1140 }
1141 t.mu.Unlock()
1142 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1143 // Follow that with a ping and wait for the ack to come back or a timer
1144 // to expire. During this time accept new streams since they might have
1145 // originated before the GoAway reaches the client.
1146 // After getting the ack or timer expiration send out another GoAway this
1147 // time with an ID of the max stream server intends to process.
1148 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1149 return false, err
1150 }
1151 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1152 return false, err
1153 }
1154 go func() {
1155 timer := time.NewTimer(time.Minute)
1156 defer timer.Stop()
1157 select {
1158 case <-t.drainChan:
1159 case <-timer.C:
divyadesai19009132020-03-04 12:58:08 +00001160 case <-t.done:
Zack Williamse940c7a2019-08-21 14:25:39 -07001161 return
1162 }
1163 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1164 }()
1165 return false, nil
1166}
1167
1168func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1169 s := channelz.SocketInternalMetric{
1170 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1171 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1172 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1173 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1174 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1175 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1176 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1177 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1178 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1179 LocalFlowControlWindow: int64(t.fc.getSize()),
1180 SocketOptions: channelz.GetSocketOption(t.conn),
1181 LocalAddr: t.localAddr,
1182 RemoteAddr: t.remoteAddr,
1183 // RemoteName :
1184 }
1185 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1186 s.Security = au.GetSecurityValue()
1187 }
1188 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1189 return &s
1190}
1191
1192func (t *http2Server) IncrMsgSent() {
1193 atomic.AddInt64(&t.czData.msgSent, 1)
1194 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1195}
1196
1197func (t *http2Server) IncrMsgRecv() {
1198 atomic.AddInt64(&t.czData.msgRecv, 1)
1199 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1200}
1201
1202func (t *http2Server) getOutFlowWindow() int64 {
1203 resp := make(chan uint32, 1)
1204 timer := time.NewTimer(time.Second)
1205 defer timer.Stop()
1206 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1207 select {
1208 case sz := <-resp:
1209 return int64(sz)
divyadesai19009132020-03-04 12:58:08 +00001210 case <-t.done:
Zack Williamse940c7a2019-08-21 14:25:39 -07001211 return -1
1212 case <-timer.C:
1213 return -2
1214 }
1215}
1216
1217func getJitter(v time.Duration) time.Duration {
1218 if v == infinity {
1219 return 0
1220 }
1221 // Generate a jitter between +/- 10% of the value.
1222 r := int64(v / 10)
1223 j := grpcrand.Int63n(2*r) - r
1224 return time.Duration(j)
1225}