blob: 150b73e4659e10ddca74a35e320293f9c36ab3b2 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "strconv"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "github.com/golang/protobuf/proto"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37
Abhilash S.L3b494632019-07-16 15:51:09 +053038 spb "google.golang.org/genproto/googleapis/rpc/status"
William Kurkianea869482019-04-09 15:16:11 -040039 "google.golang.org/grpc/codes"
40 "google.golang.org/grpc/credentials"
41 "google.golang.org/grpc/grpclog"
Abhilash S.L3b494632019-07-16 15:51:09 +053042 "google.golang.org/grpc/internal"
William Kurkianea869482019-04-09 15:16:11 -040043 "google.golang.org/grpc/internal/channelz"
44 "google.golang.org/grpc/internal/grpcrand"
45 "google.golang.org/grpc/keepalive"
46 "google.golang.org/grpc/metadata"
47 "google.golang.org/grpc/peer"
48 "google.golang.org/grpc/stats"
49 "google.golang.org/grpc/status"
50 "google.golang.org/grpc/tap"
51)
52
53var (
54 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
55 // the stream's state.
56 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
57 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
58 // than the limit set by peer.
59 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
Abhilash S.L3b494632019-07-16 15:51:09 +053060 // statusRawProto is a function to get to the raw status proto wrapped in a
61 // status.Status without a proto.Clone().
62 statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
William Kurkianea869482019-04-09 15:16:11 -040063)
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67 ctx context.Context
68 ctxDone <-chan struct{} // Cache the context.Done() chan
69 cancel context.CancelFunc
70 conn net.Conn
71 loopy *loopyWriter
72 readerDone chan struct{} // sync point to enable testing.
73 writerDone chan struct{} // sync point to enable testing.
74 remoteAddr net.Addr
75 localAddr net.Addr
76 maxStreamID uint32 // max stream ID ever seen
77 authInfo credentials.AuthInfo // auth info about the connection
78 inTapHandle tap.ServerInHandle
79 framer *framer
80 // The max number of concurrent streams.
81 maxStreams uint32
82 // controlBuf delivers all the control related tasks (e.g., window
83 // updates, reset streams, and various settings) to the controller.
84 controlBuf *controlBuffer
85 fc *trInFlow
86 stats stats.Handler
87 // Flag to keep track of reading activity on transport.
88 // 1 is true and 0 is false.
89 activity uint32 // Accessed atomically.
90 // Keepalive and max-age parameters for the server.
91 kp keepalive.ServerParameters
92
93 // Keepalive enforcement policy.
94 kep keepalive.EnforcementPolicy
95 // The time instance last ping was received.
96 lastPingAt time.Time
97 // Number of times the client has violated keepalive ping policy so far.
98 pingStrikes uint8
99 // Flag to signify that number of ping strikes should be reset to 0.
100 // This is set whenever data or header frames are sent.
101 // 1 means yes.
102 resetPingStrikes uint32 // Accessed atomically.
103 initialWindowSize int32
104 bdpEst *bdpEstimator
105 maxSendHeaderListSize *uint32
106
107 mu sync.Mutex // guard the following
108
109 // drainChan is initialized when drain(...) is called the first time.
110 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
111 // Then an independent goroutine will be launched to later send the second GoAway.
112 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
113 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
114 // already underway.
115 drainChan chan struct{}
116 state transportState
117 activeStreams map[uint32]*Stream
118 // idle is the time instant when the connection went idle.
119 // This is either the beginning of the connection or when the number of
120 // RPCs go down to 0.
121 // When the connection is busy, this value is set to 0.
122 idle time.Time
123
124 // Fields below are for channelz metric collection.
125 channelzID int64 // channelz unique identification number
126 czData *channelzData
Abhilash S.L3b494632019-07-16 15:51:09 +0530127 bufferPool *bufferPool
William Kurkianea869482019-04-09 15:16:11 -0400128}
129
130// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
131// returned if something goes wrong.
132func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
133 writeBufSize := config.WriteBufferSize
134 readBufSize := config.ReadBufferSize
135 maxHeaderListSize := defaultServerMaxHeaderListSize
136 if config.MaxHeaderListSize != nil {
137 maxHeaderListSize = *config.MaxHeaderListSize
138 }
139 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
140 // Send initial settings as connection preface to client.
141 var isettings []http2.Setting
142 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
143 // permitted in the HTTP2 spec.
144 maxStreams := config.MaxStreams
145 if maxStreams == 0 {
146 maxStreams = math.MaxUint32
147 } else {
148 isettings = append(isettings, http2.Setting{
149 ID: http2.SettingMaxConcurrentStreams,
150 Val: maxStreams,
151 })
152 }
153 dynamicWindow := true
154 iwz := int32(initialWindowSize)
155 if config.InitialWindowSize >= defaultWindowSize {
156 iwz = config.InitialWindowSize
157 dynamicWindow = false
158 }
159 icwz := int32(initialWindowSize)
160 if config.InitialConnWindowSize >= defaultWindowSize {
161 icwz = config.InitialConnWindowSize
162 dynamicWindow = false
163 }
164 if iwz != defaultWindowSize {
165 isettings = append(isettings, http2.Setting{
166 ID: http2.SettingInitialWindowSize,
167 Val: uint32(iwz)})
168 }
169 if config.MaxHeaderListSize != nil {
170 isettings = append(isettings, http2.Setting{
171 ID: http2.SettingMaxHeaderListSize,
172 Val: *config.MaxHeaderListSize,
173 })
174 }
175 if err := framer.fr.WriteSettings(isettings...); err != nil {
176 return nil, connectionErrorf(false, err, "transport: %v", err)
177 }
178 // Adjust the connection flow control window if needed.
179 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
180 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
181 return nil, connectionErrorf(false, err, "transport: %v", err)
182 }
183 }
184 kp := config.KeepaliveParams
185 if kp.MaxConnectionIdle == 0 {
186 kp.MaxConnectionIdle = defaultMaxConnectionIdle
187 }
188 if kp.MaxConnectionAge == 0 {
189 kp.MaxConnectionAge = defaultMaxConnectionAge
190 }
191 // Add a jitter to MaxConnectionAge.
192 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
193 if kp.MaxConnectionAgeGrace == 0 {
194 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
195 }
196 if kp.Time == 0 {
197 kp.Time = defaultServerKeepaliveTime
198 }
199 if kp.Timeout == 0 {
200 kp.Timeout = defaultServerKeepaliveTimeout
201 }
202 kep := config.KeepalivePolicy
203 if kep.MinTime == 0 {
204 kep.MinTime = defaultKeepalivePolicyMinTime
205 }
206 ctx, cancel := context.WithCancel(context.Background())
207 t := &http2Server{
208 ctx: ctx,
209 cancel: cancel,
210 ctxDone: ctx.Done(),
211 conn: conn,
212 remoteAddr: conn.RemoteAddr(),
213 localAddr: conn.LocalAddr(),
214 authInfo: config.AuthInfo,
215 framer: framer,
216 readerDone: make(chan struct{}),
217 writerDone: make(chan struct{}),
218 maxStreams: maxStreams,
219 inTapHandle: config.InTapHandle,
220 fc: &trInFlow{limit: uint32(icwz)},
221 state: reachable,
222 activeStreams: make(map[uint32]*Stream),
223 stats: config.StatsHandler,
224 kp: kp,
225 idle: time.Now(),
226 kep: kep,
227 initialWindowSize: iwz,
228 czData: new(channelzData),
Abhilash S.L3b494632019-07-16 15:51:09 +0530229 bufferPool: newBufferPool(),
William Kurkianea869482019-04-09 15:16:11 -0400230 }
231 t.controlBuf = newControlBuffer(t.ctxDone)
232 if dynamicWindow {
233 t.bdpEst = &bdpEstimator{
234 bdp: initialWindowSize,
235 updateFlowControl: t.updateFlowControl,
236 }
237 }
238 if t.stats != nil {
239 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
240 RemoteAddr: t.remoteAddr,
241 LocalAddr: t.localAddr,
242 })
243 connBegin := &stats.ConnBegin{}
244 t.stats.HandleConn(t.ctx, connBegin)
245 }
246 if channelz.IsOn() {
247 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
248 }
249 t.framer.writer.Flush()
250
251 defer func() {
252 if err != nil {
253 t.Close()
254 }
255 }()
256
257 // Check the validity of client preface.
258 preface := make([]byte, len(clientPreface))
259 if _, err := io.ReadFull(t.conn, preface); err != nil {
260 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
261 }
262 if !bytes.Equal(preface, clientPreface) {
263 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
264 }
265
266 frame, err := t.framer.fr.ReadFrame()
267 if err == io.EOF || err == io.ErrUnexpectedEOF {
268 return nil, err
269 }
270 if err != nil {
271 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
272 }
273 atomic.StoreUint32(&t.activity, 1)
274 sf, ok := frame.(*http2.SettingsFrame)
275 if !ok {
276 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
277 }
278 t.handleSettings(sf)
279
280 go func() {
281 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
282 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
283 if err := t.loopy.run(); err != nil {
284 errorf("transport: loopyWriter.run returning. Err: %v", err)
285 }
286 t.conn.Close()
287 close(t.writerDone)
288 }()
289 go t.keepalive()
290 return t, nil
291}
292
293// operateHeader takes action on the decoded headers.
294func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
295 streamID := frame.Header().StreamID
Abhilash S.L3b494632019-07-16 15:51:09 +0530296 state := &decodeState{
297 serverSide: true,
298 }
William Kurkianea869482019-04-09 15:16:11 -0400299 if err := state.decodeHeader(frame); err != nil {
300 if se, ok := status.FromError(err); ok {
301 t.controlBuf.put(&cleanupStream{
302 streamID: streamID,
303 rst: true,
304 rstCode: statusCodeConvTab[se.Code()],
305 onWrite: func() {},
306 })
307 }
308 return false
309 }
310
311 buf := newRecvBuffer()
312 s := &Stream{
313 id: streamID,
314 st: t,
315 buf: buf,
316 fc: &inFlow{limit: uint32(t.initialWindowSize)},
Abhilash S.L3b494632019-07-16 15:51:09 +0530317 recvCompress: state.data.encoding,
318 method: state.data.method,
319 contentSubtype: state.data.contentSubtype,
William Kurkianea869482019-04-09 15:16:11 -0400320 }
321 if frame.StreamEnded() {
322 // s is just created by the caller. No lock needed.
323 s.state = streamReadDone
324 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530325 if state.data.timeoutSet {
326 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
William Kurkianea869482019-04-09 15:16:11 -0400327 } else {
328 s.ctx, s.cancel = context.WithCancel(t.ctx)
329 }
330 pr := &peer.Peer{
331 Addr: t.remoteAddr,
332 }
333 // Attach Auth info if there is any.
334 if t.authInfo != nil {
335 pr.AuthInfo = t.authInfo
336 }
337 s.ctx = peer.NewContext(s.ctx, pr)
338 // Attach the received metadata to the context.
Abhilash S.L3b494632019-07-16 15:51:09 +0530339 if len(state.data.mdata) > 0 {
340 s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
William Kurkianea869482019-04-09 15:16:11 -0400341 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530342 if state.data.statsTags != nil {
343 s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
William Kurkianea869482019-04-09 15:16:11 -0400344 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530345 if state.data.statsTrace != nil {
346 s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
William Kurkianea869482019-04-09 15:16:11 -0400347 }
348 if t.inTapHandle != nil {
349 var err error
350 info := &tap.Info{
Abhilash S.L3b494632019-07-16 15:51:09 +0530351 FullMethodName: state.data.method,
William Kurkianea869482019-04-09 15:16:11 -0400352 }
353 s.ctx, err = t.inTapHandle(s.ctx, info)
354 if err != nil {
355 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
356 t.controlBuf.put(&cleanupStream{
357 streamID: s.id,
358 rst: true,
359 rstCode: http2.ErrCodeRefusedStream,
360 onWrite: func() {},
361 })
362 return false
363 }
364 }
365 t.mu.Lock()
366 if t.state != reachable {
367 t.mu.Unlock()
368 return false
369 }
370 if uint32(len(t.activeStreams)) >= t.maxStreams {
371 t.mu.Unlock()
372 t.controlBuf.put(&cleanupStream{
373 streamID: streamID,
374 rst: true,
375 rstCode: http2.ErrCodeRefusedStream,
376 onWrite: func() {},
377 })
378 return false
379 }
380 if streamID%2 != 1 || streamID <= t.maxStreamID {
381 t.mu.Unlock()
382 // illegal gRPC stream id.
383 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
384 return true
385 }
386 t.maxStreamID = streamID
387 t.activeStreams[streamID] = s
388 if len(t.activeStreams) == 1 {
389 t.idle = time.Time{}
390 }
391 t.mu.Unlock()
392 if channelz.IsOn() {
393 atomic.AddInt64(&t.czData.streamsStarted, 1)
394 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
395 }
396 s.requestRead = func(n int) {
397 t.adjustWindow(s, uint32(n))
398 }
399 s.ctx = traceCtx(s.ctx, s.method)
400 if t.stats != nil {
401 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
402 inHeader := &stats.InHeader{
403 FullMethod: s.method,
404 RemoteAddr: t.remoteAddr,
405 LocalAddr: t.localAddr,
406 Compression: s.recvCompress,
407 WireLength: int(frame.Header().Length),
408 }
409 t.stats.HandleRPC(s.ctx, inHeader)
410 }
411 s.ctxDone = s.ctx.Done()
412 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
413 s.trReader = &transportReader{
414 reader: &recvBufferReader{
Abhilash S.L3b494632019-07-16 15:51:09 +0530415 ctx: s.ctx,
416 ctxDone: s.ctxDone,
417 recv: s.buf,
418 freeBuffer: t.bufferPool.put,
William Kurkianea869482019-04-09 15:16:11 -0400419 },
420 windowHandler: func(n int) {
421 t.updateWindow(s, uint32(n))
422 },
423 }
424 // Register the stream with loopy.
425 t.controlBuf.put(&registerStream{
426 streamID: s.id,
427 wq: s.wq,
428 })
429 handle(s)
430 return false
431}
432
433// HandleStreams receives incoming streams using the given handler. This is
434// typically run in a separate goroutine.
435// traceCtx attaches trace to ctx and returns the new context.
436func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
437 defer close(t.readerDone)
438 for {
439 frame, err := t.framer.fr.ReadFrame()
440 atomic.StoreUint32(&t.activity, 1)
441 if err != nil {
442 if se, ok := err.(http2.StreamError); ok {
443 warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
444 t.mu.Lock()
445 s := t.activeStreams[se.StreamID]
446 t.mu.Unlock()
447 if s != nil {
Abhilash S.L3b494632019-07-16 15:51:09 +0530448 t.closeStream(s, true, se.Code, false)
William Kurkianea869482019-04-09 15:16:11 -0400449 } else {
450 t.controlBuf.put(&cleanupStream{
451 streamID: se.StreamID,
452 rst: true,
453 rstCode: se.Code,
454 onWrite: func() {},
455 })
456 }
457 continue
458 }
459 if err == io.EOF || err == io.ErrUnexpectedEOF {
460 t.Close()
461 return
462 }
463 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
464 t.Close()
465 return
466 }
467 switch frame := frame.(type) {
468 case *http2.MetaHeadersFrame:
469 if t.operateHeaders(frame, handle, traceCtx) {
470 t.Close()
471 break
472 }
473 case *http2.DataFrame:
474 t.handleData(frame)
475 case *http2.RSTStreamFrame:
476 t.handleRSTStream(frame)
477 case *http2.SettingsFrame:
478 t.handleSettings(frame)
479 case *http2.PingFrame:
480 t.handlePing(frame)
481 case *http2.WindowUpdateFrame:
482 t.handleWindowUpdate(frame)
483 case *http2.GoAwayFrame:
484 // TODO: Handle GoAway from the client appropriately.
485 default:
486 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
487 }
488 }
489}
490
491func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
492 t.mu.Lock()
493 defer t.mu.Unlock()
494 if t.activeStreams == nil {
495 // The transport is closing.
496 return nil, false
497 }
498 s, ok := t.activeStreams[f.Header().StreamID]
499 if !ok {
500 // The stream is already done.
501 return nil, false
502 }
503 return s, true
504}
505
506// adjustWindow sends out extra window update over the initial window size
507// of stream if the application is requesting data larger in size than
508// the window.
509func (t *http2Server) adjustWindow(s *Stream, n uint32) {
510 if w := s.fc.maybeAdjust(n); w > 0 {
511 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
512 }
513
514}
515
516// updateWindow adjusts the inbound quota for the stream and the transport.
517// Window updates will deliver to the controller for sending when
518// the cumulative quota exceeds the corresponding threshold.
519func (t *http2Server) updateWindow(s *Stream, n uint32) {
520 if w := s.fc.onRead(n); w > 0 {
521 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
522 increment: w,
523 })
524 }
525}
526
527// updateFlowControl updates the incoming flow control windows
528// for the transport and the stream based on the current bdp
529// estimation.
530func (t *http2Server) updateFlowControl(n uint32) {
531 t.mu.Lock()
532 for _, s := range t.activeStreams {
533 s.fc.newLimit(n)
534 }
535 t.initialWindowSize = int32(n)
536 t.mu.Unlock()
537 t.controlBuf.put(&outgoingWindowUpdate{
538 streamID: 0,
539 increment: t.fc.newLimit(n),
540 })
541 t.controlBuf.put(&outgoingSettings{
542 ss: []http2.Setting{
543 {
544 ID: http2.SettingInitialWindowSize,
545 Val: n,
546 },
547 },
548 })
549
550}
551
552func (t *http2Server) handleData(f *http2.DataFrame) {
553 size := f.Header().Length
554 var sendBDPPing bool
555 if t.bdpEst != nil {
556 sendBDPPing = t.bdpEst.add(size)
557 }
558 // Decouple connection's flow control from application's read.
559 // An update on connection's flow control should not depend on
560 // whether user application has read the data or not. Such a
561 // restriction is already imposed on the stream's flow control,
562 // and therefore the sender will be blocked anyways.
563 // Decoupling the connection flow control will prevent other
564 // active(fast) streams from starving in presence of slow or
565 // inactive streams.
566 if w := t.fc.onData(size); w > 0 {
567 t.controlBuf.put(&outgoingWindowUpdate{
568 streamID: 0,
569 increment: w,
570 })
571 }
572 if sendBDPPing {
573 // Avoid excessive ping detection (e.g. in an L7 proxy)
574 // by sending a window update prior to the BDP ping.
575 if w := t.fc.reset(); w > 0 {
576 t.controlBuf.put(&outgoingWindowUpdate{
577 streamID: 0,
578 increment: w,
579 })
580 }
581 t.controlBuf.put(bdpPing)
582 }
583 // Select the right stream to dispatch.
584 s, ok := t.getStream(f)
585 if !ok {
586 return
587 }
588 if size > 0 {
589 if err := s.fc.onData(size); err != nil {
Abhilash S.L3b494632019-07-16 15:51:09 +0530590 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
William Kurkianea869482019-04-09 15:16:11 -0400591 return
592 }
593 if f.Header().Flags.Has(http2.FlagDataPadded) {
594 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
595 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
596 }
597 }
598 // TODO(bradfitz, zhaoq): A copy is required here because there is no
599 // guarantee f.Data() is consumed before the arrival of next frame.
600 // Can this copy be eliminated?
601 if len(f.Data()) > 0 {
Abhilash S.L3b494632019-07-16 15:51:09 +0530602 buffer := t.bufferPool.get()
603 buffer.Reset()
604 buffer.Write(f.Data())
605 s.write(recvMsg{buffer: buffer})
William Kurkianea869482019-04-09 15:16:11 -0400606 }
607 }
608 if f.Header().Flags.Has(http2.FlagDataEndStream) {
609 // Received the end of stream from the client.
610 s.compareAndSwapState(streamActive, streamReadDone)
611 s.write(recvMsg{err: io.EOF})
612 }
613}
614
615func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
Abhilash S.L3b494632019-07-16 15:51:09 +0530616 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
617 if s, ok := t.getStream(f); ok {
618 t.closeStream(s, false, 0, false)
William Kurkianea869482019-04-09 15:16:11 -0400619 return
620 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530621 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
622 t.controlBuf.put(&cleanupStream{
623 streamID: f.Header().StreamID,
624 rst: false,
625 rstCode: 0,
626 onWrite: func() {},
627 })
William Kurkianea869482019-04-09 15:16:11 -0400628}
629
630func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
631 if f.IsAck() {
632 return
633 }
634 var ss []http2.Setting
635 var updateFuncs []func()
636 f.ForeachSetting(func(s http2.Setting) error {
637 switch s.ID {
638 case http2.SettingMaxHeaderListSize:
639 updateFuncs = append(updateFuncs, func() {
640 t.maxSendHeaderListSize = new(uint32)
641 *t.maxSendHeaderListSize = s.Val
642 })
643 default:
644 ss = append(ss, s)
645 }
646 return nil
647 })
648 t.controlBuf.executeAndPut(func(interface{}) bool {
649 for _, f := range updateFuncs {
650 f()
651 }
652 return true
653 }, &incomingSettings{
654 ss: ss,
655 })
656}
657
658const (
659 maxPingStrikes = 2
660 defaultPingTimeout = 2 * time.Hour
661)
662
663func (t *http2Server) handlePing(f *http2.PingFrame) {
664 if f.IsAck() {
665 if f.Data == goAwayPing.data && t.drainChan != nil {
666 close(t.drainChan)
667 return
668 }
669 // Maybe it's a BDP ping.
670 if t.bdpEst != nil {
671 t.bdpEst.calculate(f.Data)
672 }
673 return
674 }
675 pingAck := &ping{ack: true}
676 copy(pingAck.data[:], f.Data[:])
677 t.controlBuf.put(pingAck)
678
679 now := time.Now()
680 defer func() {
681 t.lastPingAt = now
682 }()
683 // A reset ping strikes means that we don't need to check for policy
684 // violation for this ping and the pingStrikes counter should be set
685 // to 0.
686 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
687 t.pingStrikes = 0
688 return
689 }
690 t.mu.Lock()
691 ns := len(t.activeStreams)
692 t.mu.Unlock()
693 if ns < 1 && !t.kep.PermitWithoutStream {
694 // Keepalive shouldn't be active thus, this new ping should
695 // have come after at least defaultPingTimeout.
696 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
697 t.pingStrikes++
698 }
699 } else {
700 // Check if keepalive policy is respected.
701 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
702 t.pingStrikes++
703 }
704 }
705
706 if t.pingStrikes > maxPingStrikes {
707 // Send goaway and close the connection.
708 errorf("transport: Got too many pings from the client, closing the connection.")
709 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
710 }
711}
712
713func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
714 t.controlBuf.put(&incomingWindowUpdate{
715 streamID: f.Header().StreamID,
716 increment: f.Increment,
717 })
718}
719
720func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
721 for k, vv := range md {
722 if isReservedHeader(k) {
723 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
724 continue
725 }
726 for _, v := range vv {
727 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
728 }
729 }
730 return headerFields
731}
732
733func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
734 if t.maxSendHeaderListSize == nil {
735 return true
736 }
737 hdrFrame := it.(*headerFrame)
738 var sz int64
739 for _, f := range hdrFrame.hf {
740 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
741 errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
742 return false
743 }
744 }
745 return true
746}
747
748// WriteHeader sends the header metedata md back to the client.
749func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
750 if s.updateHeaderSent() || s.getState() == streamDone {
751 return ErrIllegalHeaderWrite
752 }
753 s.hdrMu.Lock()
754 if md.Len() > 0 {
755 if s.header.Len() > 0 {
756 s.header = metadata.Join(s.header, md)
757 } else {
758 s.header = md
759 }
760 }
761 if err := t.writeHeaderLocked(s); err != nil {
762 s.hdrMu.Unlock()
763 return err
764 }
765 s.hdrMu.Unlock()
766 return nil
767}
768
769func (t *http2Server) writeHeaderLocked(s *Stream) error {
770 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
771 // first and create a slice of that exact size.
772 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
773 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
774 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
775 if s.sendCompress != "" {
776 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
777 }
778 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
779 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
780 streamID: s.id,
781 hf: headerFields,
782 endStream: false,
783 onWrite: func() {
784 atomic.StoreUint32(&t.resetPingStrikes, 1)
785 },
786 })
787 if !success {
788 if err != nil {
789 return err
790 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530791 t.closeStream(s, true, http2.ErrCodeInternal, false)
William Kurkianea869482019-04-09 15:16:11 -0400792 return ErrHeaderListSizeLimitViolation
793 }
794 if t.stats != nil {
795 // Note: WireLength is not set in outHeader.
796 // TODO(mmukhi): Revisit this later, if needed.
797 outHeader := &stats.OutHeader{}
798 t.stats.HandleRPC(s.Context(), outHeader)
799 }
800 return nil
801}
802
803// WriteStatus sends stream status to the client and terminates the stream.
804// There is no further I/O operations being able to perform on this stream.
805// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
806// OK is adopted.
807func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
808 if s.getState() == streamDone {
809 return nil
810 }
811 s.hdrMu.Lock()
812 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
813 // first and create a slice of that exact size.
814 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
815 if !s.updateHeaderSent() { // No headers have been sent.
816 if len(s.header) > 0 { // Send a separate header frame.
817 if err := t.writeHeaderLocked(s); err != nil {
818 s.hdrMu.Unlock()
819 return err
820 }
821 } else { // Send a trailer only response.
822 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
823 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
824 }
825 }
826 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
827 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
828
Abhilash S.L3b494632019-07-16 15:51:09 +0530829 if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
William Kurkianea869482019-04-09 15:16:11 -0400830 stBytes, err := proto.Marshal(p)
831 if err != nil {
832 // TODO: return error instead, when callers are able to handle it.
833 grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
834 } else {
835 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
836 }
837 }
838
839 // Attach the trailer metadata.
840 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
841 trailingHeader := &headerFrame{
842 streamID: s.id,
843 hf: headerFields,
844 endStream: true,
845 onWrite: func() {
846 atomic.StoreUint32(&t.resetPingStrikes, 1)
847 },
848 }
849 s.hdrMu.Unlock()
850 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
851 if !success {
852 if err != nil {
853 return err
854 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530855 t.closeStream(s, true, http2.ErrCodeInternal, false)
William Kurkianea869482019-04-09 15:16:11 -0400856 return ErrHeaderListSizeLimitViolation
857 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530858 // Send a RST_STREAM after the trailers if the client has not already half-closed.
859 rst := s.getState() == streamActive
860 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
William Kurkianea869482019-04-09 15:16:11 -0400861 if t.stats != nil {
862 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
863 }
864 return nil
865}
866
867// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
868// is returns if it fails (e.g., framing error, transport error).
869func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
870 if !s.isHeaderSent() { // Headers haven't been written yet.
871 if err := t.WriteHeader(s, nil); err != nil {
Abhilash S.L3b494632019-07-16 15:51:09 +0530872 if _, ok := err.(ConnectionError); ok {
873 return err
874 }
William Kurkianea869482019-04-09 15:16:11 -0400875 // TODO(mmukhi, dfawley): Make sure this is the right code to return.
876 return status.Errorf(codes.Internal, "transport: %v", err)
877 }
878 } else {
879 // Writing headers checks for this condition.
880 if s.getState() == streamDone {
881 // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
882 s.cancel()
883 select {
884 case <-t.ctx.Done():
885 return ErrConnClosing
886 default:
887 }
888 return ContextErr(s.ctx.Err())
889 }
890 }
891 // Add some data to header frame so that we can equally distribute bytes across frames.
892 emptyLen := http2MaxFrameLen - len(hdr)
893 if emptyLen > len(data) {
894 emptyLen = len(data)
895 }
896 hdr = append(hdr, data[:emptyLen]...)
897 data = data[emptyLen:]
898 df := &dataFrame{
899 streamID: s.id,
900 h: hdr,
901 d: data,
902 onEachWrite: func() {
903 atomic.StoreUint32(&t.resetPingStrikes, 1)
904 },
905 }
906 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
907 select {
908 case <-t.ctx.Done():
909 return ErrConnClosing
910 default:
911 }
912 return ContextErr(s.ctx.Err())
913 }
914 return t.controlBuf.put(df)
915}
916
917// keepalive running in a separate goroutine does the following:
918// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
919// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
920// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
921// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
922// after an additional duration of keepalive.Timeout.
923func (t *http2Server) keepalive() {
924 p := &ping{}
925 var pingSent bool
926 maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
927 maxAge := time.NewTimer(t.kp.MaxConnectionAge)
928 keepalive := time.NewTimer(t.kp.Time)
929 // NOTE: All exit paths of this function should reset their
930 // respective timers. A failure to do so will cause the
931 // following clean-up to deadlock and eventually leak.
932 defer func() {
933 if !maxIdle.Stop() {
934 <-maxIdle.C
935 }
936 if !maxAge.Stop() {
937 <-maxAge.C
938 }
939 if !keepalive.Stop() {
940 <-keepalive.C
941 }
942 }()
943 for {
944 select {
945 case <-maxIdle.C:
946 t.mu.Lock()
947 idle := t.idle
948 if idle.IsZero() { // The connection is non-idle.
949 t.mu.Unlock()
950 maxIdle.Reset(t.kp.MaxConnectionIdle)
951 continue
952 }
953 val := t.kp.MaxConnectionIdle - time.Since(idle)
954 t.mu.Unlock()
955 if val <= 0 {
956 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
957 // Gracefully close the connection.
958 t.drain(http2.ErrCodeNo, []byte{})
959 // Resetting the timer so that the clean-up doesn't deadlock.
960 maxIdle.Reset(infinity)
961 return
962 }
963 maxIdle.Reset(val)
964 case <-maxAge.C:
965 t.drain(http2.ErrCodeNo, []byte{})
966 maxAge.Reset(t.kp.MaxConnectionAgeGrace)
967 select {
968 case <-maxAge.C:
969 // Close the connection after grace period.
970 t.Close()
971 // Resetting the timer so that the clean-up doesn't deadlock.
972 maxAge.Reset(infinity)
973 case <-t.ctx.Done():
974 }
975 return
976 case <-keepalive.C:
977 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
978 pingSent = false
979 keepalive.Reset(t.kp.Time)
980 continue
981 }
982 if pingSent {
983 t.Close()
984 // Resetting the timer so that the clean-up doesn't deadlock.
985 keepalive.Reset(infinity)
986 return
987 }
988 pingSent = true
989 if channelz.IsOn() {
990 atomic.AddInt64(&t.czData.kpCount, 1)
991 }
992 t.controlBuf.put(p)
993 keepalive.Reset(t.kp.Timeout)
994 case <-t.ctx.Done():
995 return
996 }
997 }
998}
999
1000// Close starts shutting down the http2Server transport.
1001// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1002// could cause some resource issue. Revisit this later.
1003func (t *http2Server) Close() error {
1004 t.mu.Lock()
1005 if t.state == closing {
1006 t.mu.Unlock()
1007 return errors.New("transport: Close() was already called")
1008 }
1009 t.state = closing
1010 streams := t.activeStreams
1011 t.activeStreams = nil
1012 t.mu.Unlock()
1013 t.controlBuf.finish()
1014 t.cancel()
1015 err := t.conn.Close()
1016 if channelz.IsOn() {
1017 channelz.RemoveEntry(t.channelzID)
1018 }
1019 // Cancel all active streams.
1020 for _, s := range streams {
1021 s.cancel()
1022 }
1023 if t.stats != nil {
1024 connEnd := &stats.ConnEnd{}
1025 t.stats.HandleConn(t.ctx, connEnd)
1026 }
1027 return err
1028}
1029
1030// deleteStream deletes the stream s from transport's active streams.
1031func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
Abhilash S.L3b494632019-07-16 15:51:09 +05301032 // In case stream sending and receiving are invoked in separate
1033 // goroutines (e.g., bi-directional streaming), cancel needs to be
1034 // called to interrupt the potential blocking on other goroutines.
1035 s.cancel()
William Kurkianea869482019-04-09 15:16:11 -04001036
Abhilash S.L3b494632019-07-16 15:51:09 +05301037 t.mu.Lock()
1038 if _, ok := t.activeStreams[s.id]; ok {
1039 delete(t.activeStreams, s.id)
1040 if len(t.activeStreams) == 0 {
1041 t.idle = time.Now()
1042 }
William Kurkianea869482019-04-09 15:16:11 -04001043 }
1044 t.mu.Unlock()
1045
1046 if channelz.IsOn() {
1047 if eosReceived {
1048 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1049 } else {
1050 atomic.AddInt64(&t.czData.streamsFailed, 1)
1051 }
1052 }
1053}
1054
Abhilash S.L3b494632019-07-16 15:51:09 +05301055// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1056func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
William Kurkianea869482019-04-09 15:16:11 -04001057 oldState := s.swapState(streamDone)
Abhilash S.L3b494632019-07-16 15:51:09 +05301058 if oldState == streamDone {
1059 // If the stream was already done, return.
1060 return
1061 }
William Kurkianea869482019-04-09 15:16:11 -04001062
Abhilash S.L3b494632019-07-16 15:51:09 +05301063 hdr.cleanup = &cleanupStream{
1064 streamID: s.id,
1065 rst: rst,
1066 rstCode: rstCode,
1067 onWrite: func() {
1068 t.deleteStream(s, eosReceived)
1069 },
1070 }
1071 t.controlBuf.put(hdr)
1072}
William Kurkianea869482019-04-09 15:16:11 -04001073
Abhilash S.L3b494632019-07-16 15:51:09 +05301074// closeStream clears the footprint of a stream when the stream is not needed any more.
1075func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1076 s.swapState(streamDone)
William Kurkianea869482019-04-09 15:16:11 -04001077 t.deleteStream(s, eosReceived)
1078
Abhilash S.L3b494632019-07-16 15:51:09 +05301079 t.controlBuf.put(&cleanupStream{
William Kurkianea869482019-04-09 15:16:11 -04001080 streamID: s.id,
1081 rst: rst,
1082 rstCode: rstCode,
1083 onWrite: func() {},
Abhilash S.L3b494632019-07-16 15:51:09 +05301084 })
William Kurkianea869482019-04-09 15:16:11 -04001085}
1086
1087func (t *http2Server) RemoteAddr() net.Addr {
1088 return t.remoteAddr
1089}
1090
1091func (t *http2Server) Drain() {
1092 t.drain(http2.ErrCodeNo, []byte{})
1093}
1094
1095func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1096 t.mu.Lock()
1097 defer t.mu.Unlock()
1098 if t.drainChan != nil {
1099 return
1100 }
1101 t.drainChan = make(chan struct{})
1102 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1103}
1104
1105var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1106
1107// Handles outgoing GoAway and returns true if loopy needs to put itself
1108// in draining mode.
1109func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1110 t.mu.Lock()
1111 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1112 t.mu.Unlock()
1113 // The transport is closing.
1114 return false, ErrConnClosing
1115 }
1116 sid := t.maxStreamID
1117 if !g.headsUp {
1118 // Stop accepting more streams now.
1119 t.state = draining
1120 if len(t.activeStreams) == 0 {
1121 g.closeConn = true
1122 }
1123 t.mu.Unlock()
1124 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1125 return false, err
1126 }
1127 if g.closeConn {
1128 // Abruptly close the connection following the GoAway (via
1129 // loopywriter). But flush out what's inside the buffer first.
1130 t.framer.writer.Flush()
1131 return false, fmt.Errorf("transport: Connection closing")
1132 }
1133 return true, nil
1134 }
1135 t.mu.Unlock()
1136 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1137 // Follow that with a ping and wait for the ack to come back or a timer
1138 // to expire. During this time accept new streams since they might have
1139 // originated before the GoAway reaches the client.
1140 // After getting the ack or timer expiration send out another GoAway this
1141 // time with an ID of the max stream server intends to process.
1142 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1143 return false, err
1144 }
1145 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1146 return false, err
1147 }
1148 go func() {
1149 timer := time.NewTimer(time.Minute)
1150 defer timer.Stop()
1151 select {
1152 case <-t.drainChan:
1153 case <-timer.C:
1154 case <-t.ctx.Done():
1155 return
1156 }
1157 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1158 }()
1159 return false, nil
1160}
1161
1162func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1163 s := channelz.SocketInternalMetric{
1164 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1165 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1166 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1167 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1168 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1169 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1170 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1171 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1172 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1173 LocalFlowControlWindow: int64(t.fc.getSize()),
1174 SocketOptions: channelz.GetSocketOption(t.conn),
1175 LocalAddr: t.localAddr,
1176 RemoteAddr: t.remoteAddr,
1177 // RemoteName :
1178 }
1179 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1180 s.Security = au.GetSecurityValue()
1181 }
1182 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1183 return &s
1184}
1185
1186func (t *http2Server) IncrMsgSent() {
1187 atomic.AddInt64(&t.czData.msgSent, 1)
1188 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1189}
1190
1191func (t *http2Server) IncrMsgRecv() {
1192 atomic.AddInt64(&t.czData.msgRecv, 1)
1193 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1194}
1195
1196func (t *http2Server) getOutFlowWindow() int64 {
1197 resp := make(chan uint32, 1)
1198 timer := time.NewTimer(time.Second)
1199 defer timer.Stop()
1200 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1201 select {
1202 case sz := <-resp:
1203 return int64(sz)
1204 case <-t.ctxDone:
1205 return -1
1206 case <-timer.C:
1207 return -2
1208 }
1209}
1210
1211func getJitter(v time.Duration) time.Duration {
1212 if v == infinity {
1213 return 0
1214 }
1215 // Generate a jitter between +/- 10% of the value.
1216 r := int64(v / 10)
1217 j := grpcrand.Int63n(2*r) - r
1218 return time.Duration(j)
1219}