blob: f2cad9ebc311499b5e6764b540906016b18199b8 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "net/http"
30 "strconv"
31 "sync"
32 "sync/atomic"
33 "time"
34
35 "github.com/golang/protobuf/proto"
36 "golang.org/x/net/http2"
37 "golang.org/x/net/http2/hpack"
38 "google.golang.org/grpc/internal/grpcutil"
39
40 "google.golang.org/grpc/codes"
41 "google.golang.org/grpc/credentials"
42 "google.golang.org/grpc/internal/channelz"
43 "google.golang.org/grpc/internal/grpcrand"
44 "google.golang.org/grpc/keepalive"
45 "google.golang.org/grpc/metadata"
46 "google.golang.org/grpc/peer"
47 "google.golang.org/grpc/stats"
48 "google.golang.org/grpc/status"
49 "google.golang.org/grpc/tap"
50)
51
52var (
53 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
54 // the stream's state.
55 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
56 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
57 // than the limit set by peer.
58 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
59)
60
61// serverConnectionCounter counts the number of connections a server has seen
62// (equal to the number of http2Servers created). Must be accessed atomically.
63var serverConnectionCounter uint64
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67 lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
68 ctx context.Context
69 done chan struct{}
70 conn net.Conn
71 loopy *loopyWriter
72 readerDone chan struct{} // sync point to enable testing.
73 writerDone chan struct{} // sync point to enable testing.
74 remoteAddr net.Addr
75 localAddr net.Addr
76 maxStreamID uint32 // max stream ID ever seen
77 authInfo credentials.AuthInfo // auth info about the connection
78 inTapHandle tap.ServerInHandle
79 framer *framer
80 // The max number of concurrent streams.
81 maxStreams uint32
82 // controlBuf delivers all the control related tasks (e.g., window
83 // updates, reset streams, and various settings) to the controller.
84 controlBuf *controlBuffer
85 fc *trInFlow
86 stats stats.Handler
87 // Keepalive and max-age parameters for the server.
88 kp keepalive.ServerParameters
89 // Keepalive enforcement policy.
90 kep keepalive.EnforcementPolicy
91 // The time instance last ping was received.
92 lastPingAt time.Time
93 // Number of times the client has violated keepalive ping policy so far.
94 pingStrikes uint8
95 // Flag to signify that number of ping strikes should be reset to 0.
96 // This is set whenever data or header frames are sent.
97 // 1 means yes.
98 resetPingStrikes uint32 // Accessed atomically.
99 initialWindowSize int32
100 bdpEst *bdpEstimator
101 maxSendHeaderListSize *uint32
102
103 mu sync.Mutex // guard the following
104
105 // drainChan is initialized when Drain() is called the first time.
106 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
107 // Then an independent goroutine will be launched to later send the second GoAway.
108 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
109 // Thus call to Drain() will be a no-op if drainChan is already initialized since draining is
110 // already underway.
111 drainChan chan struct{}
112 state transportState
113 activeStreams map[uint32]*Stream
114 // idle is the time instant when the connection went idle.
115 // This is either the beginning of the connection or when the number of
116 // RPCs go down to 0.
117 // When the connection is busy, this value is set to 0.
118 idle time.Time
119
120 // Fields below are for channelz metric collection.
121 channelzID int64 // channelz unique identification number
122 czData *channelzData
123 bufferPool *bufferPool
124
125 connectionID uint64
126}
127
128// NewServerTransport creates a http2 transport with conn and configuration
129// options from config.
130//
131// It returns a non-nil transport and a nil error on success. On failure, it
khenaidoo5cb0d402021-12-08 14:09:16 -0500132// returns a nil transport and a non-nil error. For a special case where the
khenaidoo5fc5cea2021-08-11 17:39:16 -0400133// underlying conn gets closed before the client preface could be read, it
134// returns a nil transport and a nil error.
135func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
136 var authInfo credentials.AuthInfo
137 rawConn := conn
138 if config.Credentials != nil {
139 var err error
140 conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
141 if err != nil {
142 // ErrConnDispatched means that the connection was dispatched away
143 // from gRPC; those connections should be left open. io.EOF means
144 // the connection was closed before handshaking completed, which can
145 // happen naturally from probers. Return these errors directly.
146 if err == credentials.ErrConnDispatched || err == io.EOF {
147 return nil, err
148 }
149 return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
150 }
151 }
152 writeBufSize := config.WriteBufferSize
153 readBufSize := config.ReadBufferSize
154 maxHeaderListSize := defaultServerMaxHeaderListSize
155 if config.MaxHeaderListSize != nil {
156 maxHeaderListSize = *config.MaxHeaderListSize
157 }
158 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
159 // Send initial settings as connection preface to client.
160 isettings := []http2.Setting{{
161 ID: http2.SettingMaxFrameSize,
162 Val: http2MaxFrameLen,
163 }}
164 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
165 // permitted in the HTTP2 spec.
166 maxStreams := config.MaxStreams
167 if maxStreams == 0 {
168 maxStreams = math.MaxUint32
169 } else {
170 isettings = append(isettings, http2.Setting{
171 ID: http2.SettingMaxConcurrentStreams,
172 Val: maxStreams,
173 })
174 }
175 dynamicWindow := true
176 iwz := int32(initialWindowSize)
177 if config.InitialWindowSize >= defaultWindowSize {
178 iwz = config.InitialWindowSize
179 dynamicWindow = false
180 }
181 icwz := int32(initialWindowSize)
182 if config.InitialConnWindowSize >= defaultWindowSize {
183 icwz = config.InitialConnWindowSize
184 dynamicWindow = false
185 }
186 if iwz != defaultWindowSize {
187 isettings = append(isettings, http2.Setting{
188 ID: http2.SettingInitialWindowSize,
189 Val: uint32(iwz)})
190 }
191 if config.MaxHeaderListSize != nil {
192 isettings = append(isettings, http2.Setting{
193 ID: http2.SettingMaxHeaderListSize,
194 Val: *config.MaxHeaderListSize,
195 })
196 }
197 if config.HeaderTableSize != nil {
198 isettings = append(isettings, http2.Setting{
199 ID: http2.SettingHeaderTableSize,
200 Val: *config.HeaderTableSize,
201 })
202 }
203 if err := framer.fr.WriteSettings(isettings...); err != nil {
204 return nil, connectionErrorf(false, err, "transport: %v", err)
205 }
206 // Adjust the connection flow control window if needed.
207 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
208 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
209 return nil, connectionErrorf(false, err, "transport: %v", err)
210 }
211 }
212 kp := config.KeepaliveParams
213 if kp.MaxConnectionIdle == 0 {
214 kp.MaxConnectionIdle = defaultMaxConnectionIdle
215 }
216 if kp.MaxConnectionAge == 0 {
217 kp.MaxConnectionAge = defaultMaxConnectionAge
218 }
219 // Add a jitter to MaxConnectionAge.
220 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
221 if kp.MaxConnectionAgeGrace == 0 {
222 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
223 }
224 if kp.Time == 0 {
225 kp.Time = defaultServerKeepaliveTime
226 }
227 if kp.Timeout == 0 {
228 kp.Timeout = defaultServerKeepaliveTimeout
229 }
230 kep := config.KeepalivePolicy
231 if kep.MinTime == 0 {
232 kep.MinTime = defaultKeepalivePolicyMinTime
233 }
234
235 done := make(chan struct{})
236 t := &http2Server{
237 ctx: setConnection(context.Background(), rawConn),
238 done: done,
239 conn: conn,
240 remoteAddr: conn.RemoteAddr(),
241 localAddr: conn.LocalAddr(),
242 authInfo: authInfo,
243 framer: framer,
244 readerDone: make(chan struct{}),
245 writerDone: make(chan struct{}),
246 maxStreams: maxStreams,
247 inTapHandle: config.InTapHandle,
248 fc: &trInFlow{limit: uint32(icwz)},
249 state: reachable,
250 activeStreams: make(map[uint32]*Stream),
251 stats: config.StatsHandler,
252 kp: kp,
253 idle: time.Now(),
254 kep: kep,
255 initialWindowSize: iwz,
256 czData: new(channelzData),
257 bufferPool: newBufferPool(),
258 }
259 t.controlBuf = newControlBuffer(t.done)
260 if dynamicWindow {
261 t.bdpEst = &bdpEstimator{
262 bdp: initialWindowSize,
263 updateFlowControl: t.updateFlowControl,
264 }
265 }
266 if t.stats != nil {
267 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
268 RemoteAddr: t.remoteAddr,
269 LocalAddr: t.localAddr,
270 })
271 connBegin := &stats.ConnBegin{}
272 t.stats.HandleConn(t.ctx, connBegin)
273 }
274 if channelz.IsOn() {
275 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
276 }
277
278 t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
279
280 t.framer.writer.Flush()
281
282 defer func() {
283 if err != nil {
284 t.Close()
285 }
286 }()
287
288 // Check the validity of client preface.
289 preface := make([]byte, len(clientPreface))
290 if _, err := io.ReadFull(t.conn, preface); err != nil {
291 // In deployments where a gRPC server runs behind a cloud load balancer
292 // which performs regular TCP level health checks, the connection is
khenaidoo5cb0d402021-12-08 14:09:16 -0500293 // closed immediately by the latter. Returning io.EOF here allows the
294 // grpc server implementation to recognize this scenario and suppress
295 // logging to reduce spam.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400296 if err == io.EOF {
khenaidoo5cb0d402021-12-08 14:09:16 -0500297 return nil, io.EOF
khenaidoo5fc5cea2021-08-11 17:39:16 -0400298 }
299 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
300 }
301 if !bytes.Equal(preface, clientPreface) {
302 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
303 }
304
305 frame, err := t.framer.fr.ReadFrame()
306 if err == io.EOF || err == io.ErrUnexpectedEOF {
307 return nil, err
308 }
309 if err != nil {
310 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
311 }
312 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
313 sf, ok := frame.(*http2.SettingsFrame)
314 if !ok {
315 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
316 }
317 t.handleSettings(sf)
318
319 go func() {
320 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
321 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
322 if err := t.loopy.run(); err != nil {
323 if logger.V(logLevel) {
324 logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
325 }
326 }
327 t.conn.Close()
328 t.controlBuf.finish()
329 close(t.writerDone)
330 }()
331 go t.keepalive()
332 return t, nil
333}
334
335// operateHeader takes action on the decoded headers.
336func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
337 streamID := frame.Header().StreamID
338
339 // frame.Truncated is set to true when framer detects that the current header
340 // list size hits MaxHeaderListSize limit.
341 if frame.Truncated {
342 t.controlBuf.put(&cleanupStream{
343 streamID: streamID,
344 rst: true,
345 rstCode: http2.ErrCodeFrameSize,
346 onWrite: func() {},
347 })
348 return false
349 }
350
351 buf := newRecvBuffer()
352 s := &Stream{
353 id: streamID,
354 st: t,
355 buf: buf,
356 fc: &inFlow{limit: uint32(t.initialWindowSize)},
357 }
358
359 var (
360 // If a gRPC Response-Headers has already been received, then it means
361 // that the peer is speaking gRPC and we are in gRPC mode.
362 isGRPC = false
363 mdata = make(map[string][]string)
364 httpMethod string
365 // headerError is set if an error is encountered while parsing the headers
366 headerError bool
367
368 timeoutSet bool
369 timeout time.Duration
370 )
371
372 for _, hf := range frame.Fields {
373 switch hf.Name {
374 case "content-type":
375 contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
376 if !validContentType {
377 break
378 }
379 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
380 s.contentSubtype = contentSubtype
381 isGRPC = true
382 case "grpc-encoding":
383 s.recvCompress = hf.Value
384 case ":method":
385 httpMethod = hf.Value
386 case ":path":
387 s.method = hf.Value
388 case "grpc-timeout":
389 timeoutSet = true
390 var err error
391 if timeout, err = decodeTimeout(hf.Value); err != nil {
392 headerError = true
393 }
khenaidoo5cb0d402021-12-08 14:09:16 -0500394 // "Transports must consider requests containing the Connection header
395 // as malformed." - A41
396 case "connection":
397 if logger.V(logLevel) {
398 logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
399 }
400 headerError = true
khenaidoo5fc5cea2021-08-11 17:39:16 -0400401 default:
402 if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
403 break
404 }
405 v, err := decodeMetadataHeader(hf.Name, hf.Value)
406 if err != nil {
407 headerError = true
408 logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
409 break
410 }
411 mdata[hf.Name] = append(mdata[hf.Name], v)
412 }
413 }
414
khenaidoo5cb0d402021-12-08 14:09:16 -0500415 // "If multiple Host headers or multiple :authority headers are present, the
416 // request must be rejected with an HTTP status code 400 as required by Host
417 // validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
418 // with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
419 // error, this takes precedence over a client not speaking gRPC.
420 if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
421 errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
422 if logger.V(logLevel) {
423 logger.Errorf("transport: %v", errMsg)
424 }
425 t.controlBuf.put(&earlyAbortStream{
426 httpStatus: 400,
427 streamID: streamID,
428 contentSubtype: s.contentSubtype,
429 status: status.New(codes.Internal, errMsg),
430 })
431 return false
432 }
433
khenaidoo5fc5cea2021-08-11 17:39:16 -0400434 if !isGRPC || headerError {
435 t.controlBuf.put(&cleanupStream{
436 streamID: streamID,
437 rst: true,
438 rstCode: http2.ErrCodeProtocol,
439 onWrite: func() {},
440 })
441 return false
442 }
443
khenaidoo5cb0d402021-12-08 14:09:16 -0500444 // "If :authority is missing, Host must be renamed to :authority." - A41
445 if len(mdata[":authority"]) == 0 {
446 // No-op if host isn't present, no eventual :authority header is a valid
447 // RPC.
448 if host, ok := mdata["host"]; ok {
449 mdata[":authority"] = host
450 delete(mdata, "host")
451 }
452 } else {
453 // "If :authority is present, Host must be discarded" - A41
454 delete(mdata, "host")
455 }
456
khenaidoo5fc5cea2021-08-11 17:39:16 -0400457 if frame.StreamEnded() {
458 // s is just created by the caller. No lock needed.
459 s.state = streamReadDone
460 }
461 if timeoutSet {
462 s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
463 } else {
464 s.ctx, s.cancel = context.WithCancel(t.ctx)
465 }
466 pr := &peer.Peer{
467 Addr: t.remoteAddr,
468 }
469 // Attach Auth info if there is any.
470 if t.authInfo != nil {
471 pr.AuthInfo = t.authInfo
472 }
473 s.ctx = peer.NewContext(s.ctx, pr)
474 // Attach the received metadata to the context.
475 if len(mdata) > 0 {
476 s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
477 if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
478 s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
479 }
480 if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
481 s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
482 }
483 }
484 t.mu.Lock()
485 if t.state != reachable {
486 t.mu.Unlock()
487 s.cancel()
488 return false
489 }
490 if uint32(len(t.activeStreams)) >= t.maxStreams {
491 t.mu.Unlock()
492 t.controlBuf.put(&cleanupStream{
493 streamID: streamID,
494 rst: true,
495 rstCode: http2.ErrCodeRefusedStream,
496 onWrite: func() {},
497 })
498 s.cancel()
499 return false
500 }
501 if streamID%2 != 1 || streamID <= t.maxStreamID {
502 t.mu.Unlock()
503 // illegal gRPC stream id.
504 if logger.V(logLevel) {
505 logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
506 }
507 s.cancel()
508 return true
509 }
510 t.maxStreamID = streamID
511 if httpMethod != http.MethodPost {
512 t.mu.Unlock()
513 if logger.V(logLevel) {
514 logger.Infof("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
515 }
516 t.controlBuf.put(&cleanupStream{
517 streamID: streamID,
518 rst: true,
519 rstCode: http2.ErrCodeProtocol,
520 onWrite: func() {},
521 })
522 s.cancel()
523 return false
524 }
525 if t.inTapHandle != nil {
526 var err error
527 if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
528 t.mu.Unlock()
529 if logger.V(logLevel) {
530 logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
531 }
532 stat, ok := status.FromError(err)
533 if !ok {
534 stat = status.New(codes.PermissionDenied, err.Error())
535 }
536 t.controlBuf.put(&earlyAbortStream{
khenaidoo5cb0d402021-12-08 14:09:16 -0500537 httpStatus: 200,
khenaidoo5fc5cea2021-08-11 17:39:16 -0400538 streamID: s.id,
539 contentSubtype: s.contentSubtype,
540 status: stat,
541 })
542 return false
543 }
544 }
545 t.activeStreams[streamID] = s
546 if len(t.activeStreams) == 1 {
547 t.idle = time.Time{}
548 }
549 t.mu.Unlock()
550 if channelz.IsOn() {
551 atomic.AddInt64(&t.czData.streamsStarted, 1)
552 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
553 }
554 s.requestRead = func(n int) {
555 t.adjustWindow(s, uint32(n))
556 }
557 s.ctx = traceCtx(s.ctx, s.method)
558 if t.stats != nil {
559 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
560 inHeader := &stats.InHeader{
561 FullMethod: s.method,
562 RemoteAddr: t.remoteAddr,
563 LocalAddr: t.localAddr,
564 Compression: s.recvCompress,
565 WireLength: int(frame.Header().Length),
566 Header: metadata.MD(mdata).Copy(),
567 }
568 t.stats.HandleRPC(s.ctx, inHeader)
569 }
570 s.ctxDone = s.ctx.Done()
571 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
572 s.trReader = &transportReader{
573 reader: &recvBufferReader{
574 ctx: s.ctx,
575 ctxDone: s.ctxDone,
576 recv: s.buf,
577 freeBuffer: t.bufferPool.put,
578 },
579 windowHandler: func(n int) {
580 t.updateWindow(s, uint32(n))
581 },
582 }
583 // Register the stream with loopy.
584 t.controlBuf.put(&registerStream{
585 streamID: s.id,
586 wq: s.wq,
587 })
588 handle(s)
589 return false
590}
591
592// HandleStreams receives incoming streams using the given handler. This is
593// typically run in a separate goroutine.
594// traceCtx attaches trace to ctx and returns the new context.
595func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
596 defer close(t.readerDone)
597 for {
598 t.controlBuf.throttle()
599 frame, err := t.framer.fr.ReadFrame()
600 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
601 if err != nil {
602 if se, ok := err.(http2.StreamError); ok {
603 if logger.V(logLevel) {
604 logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
605 }
606 t.mu.Lock()
607 s := t.activeStreams[se.StreamID]
608 t.mu.Unlock()
609 if s != nil {
610 t.closeStream(s, true, se.Code, false)
611 } else {
612 t.controlBuf.put(&cleanupStream{
613 streamID: se.StreamID,
614 rst: true,
615 rstCode: se.Code,
616 onWrite: func() {},
617 })
618 }
619 continue
620 }
621 if err == io.EOF || err == io.ErrUnexpectedEOF {
622 t.Close()
623 return
624 }
625 if logger.V(logLevel) {
626 logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
627 }
628 t.Close()
629 return
630 }
631 switch frame := frame.(type) {
632 case *http2.MetaHeadersFrame:
633 if t.operateHeaders(frame, handle, traceCtx) {
634 t.Close()
635 break
636 }
637 case *http2.DataFrame:
638 t.handleData(frame)
639 case *http2.RSTStreamFrame:
640 t.handleRSTStream(frame)
641 case *http2.SettingsFrame:
642 t.handleSettings(frame)
643 case *http2.PingFrame:
644 t.handlePing(frame)
645 case *http2.WindowUpdateFrame:
646 t.handleWindowUpdate(frame)
647 case *http2.GoAwayFrame:
648 // TODO: Handle GoAway from the client appropriately.
649 default:
650 if logger.V(logLevel) {
651 logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
652 }
653 }
654 }
655}
656
657func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
658 t.mu.Lock()
659 defer t.mu.Unlock()
660 if t.activeStreams == nil {
661 // The transport is closing.
662 return nil, false
663 }
664 s, ok := t.activeStreams[f.Header().StreamID]
665 if !ok {
666 // The stream is already done.
667 return nil, false
668 }
669 return s, true
670}
671
672// adjustWindow sends out extra window update over the initial window size
673// of stream if the application is requesting data larger in size than
674// the window.
675func (t *http2Server) adjustWindow(s *Stream, n uint32) {
676 if w := s.fc.maybeAdjust(n); w > 0 {
677 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
678 }
679
680}
681
682// updateWindow adjusts the inbound quota for the stream and the transport.
683// Window updates will deliver to the controller for sending when
684// the cumulative quota exceeds the corresponding threshold.
685func (t *http2Server) updateWindow(s *Stream, n uint32) {
686 if w := s.fc.onRead(n); w > 0 {
687 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
688 increment: w,
689 })
690 }
691}
692
693// updateFlowControl updates the incoming flow control windows
694// for the transport and the stream based on the current bdp
695// estimation.
696func (t *http2Server) updateFlowControl(n uint32) {
697 t.mu.Lock()
698 for _, s := range t.activeStreams {
699 s.fc.newLimit(n)
700 }
701 t.initialWindowSize = int32(n)
702 t.mu.Unlock()
703 t.controlBuf.put(&outgoingWindowUpdate{
704 streamID: 0,
705 increment: t.fc.newLimit(n),
706 })
707 t.controlBuf.put(&outgoingSettings{
708 ss: []http2.Setting{
709 {
710 ID: http2.SettingInitialWindowSize,
711 Val: n,
712 },
713 },
714 })
715
716}
717
718func (t *http2Server) handleData(f *http2.DataFrame) {
719 size := f.Header().Length
720 var sendBDPPing bool
721 if t.bdpEst != nil {
722 sendBDPPing = t.bdpEst.add(size)
723 }
724 // Decouple connection's flow control from application's read.
725 // An update on connection's flow control should not depend on
726 // whether user application has read the data or not. Such a
727 // restriction is already imposed on the stream's flow control,
728 // and therefore the sender will be blocked anyways.
729 // Decoupling the connection flow control will prevent other
730 // active(fast) streams from starving in presence of slow or
731 // inactive streams.
732 if w := t.fc.onData(size); w > 0 {
733 t.controlBuf.put(&outgoingWindowUpdate{
734 streamID: 0,
735 increment: w,
736 })
737 }
738 if sendBDPPing {
739 // Avoid excessive ping detection (e.g. in an L7 proxy)
740 // by sending a window update prior to the BDP ping.
741 if w := t.fc.reset(); w > 0 {
742 t.controlBuf.put(&outgoingWindowUpdate{
743 streamID: 0,
744 increment: w,
745 })
746 }
747 t.controlBuf.put(bdpPing)
748 }
749 // Select the right stream to dispatch.
750 s, ok := t.getStream(f)
751 if !ok {
752 return
753 }
754 if s.getState() == streamReadDone {
755 t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
756 return
757 }
758 if size > 0 {
759 if err := s.fc.onData(size); err != nil {
760 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
761 return
762 }
763 if f.Header().Flags.Has(http2.FlagDataPadded) {
764 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
765 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
766 }
767 }
768 // TODO(bradfitz, zhaoq): A copy is required here because there is no
769 // guarantee f.Data() is consumed before the arrival of next frame.
770 // Can this copy be eliminated?
771 if len(f.Data()) > 0 {
772 buffer := t.bufferPool.get()
773 buffer.Reset()
774 buffer.Write(f.Data())
775 s.write(recvMsg{buffer: buffer})
776 }
777 }
khenaidoo5cb0d402021-12-08 14:09:16 -0500778 if f.StreamEnded() {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400779 // Received the end of stream from the client.
780 s.compareAndSwapState(streamActive, streamReadDone)
781 s.write(recvMsg{err: io.EOF})
782 }
783}
784
785func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
786 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
787 if s, ok := t.getStream(f); ok {
788 t.closeStream(s, false, 0, false)
789 return
790 }
791 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
792 t.controlBuf.put(&cleanupStream{
793 streamID: f.Header().StreamID,
794 rst: false,
795 rstCode: 0,
796 onWrite: func() {},
797 })
798}
799
800func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
801 if f.IsAck() {
802 return
803 }
804 var ss []http2.Setting
805 var updateFuncs []func()
806 f.ForeachSetting(func(s http2.Setting) error {
807 switch s.ID {
808 case http2.SettingMaxHeaderListSize:
809 updateFuncs = append(updateFuncs, func() {
810 t.maxSendHeaderListSize = new(uint32)
811 *t.maxSendHeaderListSize = s.Val
812 })
813 default:
814 ss = append(ss, s)
815 }
816 return nil
817 })
818 t.controlBuf.executeAndPut(func(interface{}) bool {
819 for _, f := range updateFuncs {
820 f()
821 }
822 return true
823 }, &incomingSettings{
824 ss: ss,
825 })
826}
827
828const (
829 maxPingStrikes = 2
830 defaultPingTimeout = 2 * time.Hour
831)
832
833func (t *http2Server) handlePing(f *http2.PingFrame) {
834 if f.IsAck() {
835 if f.Data == goAwayPing.data && t.drainChan != nil {
836 close(t.drainChan)
837 return
838 }
839 // Maybe it's a BDP ping.
840 if t.bdpEst != nil {
841 t.bdpEst.calculate(f.Data)
842 }
843 return
844 }
845 pingAck := &ping{ack: true}
846 copy(pingAck.data[:], f.Data[:])
847 t.controlBuf.put(pingAck)
848
849 now := time.Now()
850 defer func() {
851 t.lastPingAt = now
852 }()
853 // A reset ping strikes means that we don't need to check for policy
854 // violation for this ping and the pingStrikes counter should be set
855 // to 0.
856 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
857 t.pingStrikes = 0
858 return
859 }
860 t.mu.Lock()
861 ns := len(t.activeStreams)
862 t.mu.Unlock()
863 if ns < 1 && !t.kep.PermitWithoutStream {
864 // Keepalive shouldn't be active thus, this new ping should
865 // have come after at least defaultPingTimeout.
866 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
867 t.pingStrikes++
868 }
869 } else {
870 // Check if keepalive policy is respected.
871 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
872 t.pingStrikes++
873 }
874 }
875
876 if t.pingStrikes > maxPingStrikes {
877 // Send goaway and close the connection.
878 if logger.V(logLevel) {
879 logger.Errorf("transport: Got too many pings from the client, closing the connection.")
880 }
881 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
882 }
883}
884
885func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
886 t.controlBuf.put(&incomingWindowUpdate{
887 streamID: f.Header().StreamID,
888 increment: f.Increment,
889 })
890}
891
892func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
893 for k, vv := range md {
894 if isReservedHeader(k) {
895 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
896 continue
897 }
898 for _, v := range vv {
899 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
900 }
901 }
902 return headerFields
903}
904
905func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
906 if t.maxSendHeaderListSize == nil {
907 return true
908 }
909 hdrFrame := it.(*headerFrame)
910 var sz int64
911 for _, f := range hdrFrame.hf {
912 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
913 if logger.V(logLevel) {
914 logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
915 }
916 return false
917 }
918 }
919 return true
920}
921
922// WriteHeader sends the header metadata md back to the client.
923func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
924 if s.updateHeaderSent() || s.getState() == streamDone {
925 return ErrIllegalHeaderWrite
926 }
927 s.hdrMu.Lock()
928 if md.Len() > 0 {
929 if s.header.Len() > 0 {
930 s.header = metadata.Join(s.header, md)
931 } else {
932 s.header = md
933 }
934 }
935 if err := t.writeHeaderLocked(s); err != nil {
936 s.hdrMu.Unlock()
937 return err
938 }
939 s.hdrMu.Unlock()
940 return nil
941}
942
943func (t *http2Server) setResetPingStrikes() {
944 atomic.StoreUint32(&t.resetPingStrikes, 1)
945}
946
947func (t *http2Server) writeHeaderLocked(s *Stream) error {
948 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
949 // first and create a slice of that exact size.
950 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
951 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
952 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
953 if s.sendCompress != "" {
954 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
955 }
956 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
957 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
958 streamID: s.id,
959 hf: headerFields,
960 endStream: false,
961 onWrite: t.setResetPingStrikes,
962 })
963 if !success {
964 if err != nil {
965 return err
966 }
967 t.closeStream(s, true, http2.ErrCodeInternal, false)
968 return ErrHeaderListSizeLimitViolation
969 }
970 if t.stats != nil {
971 // Note: Headers are compressed with hpack after this call returns.
972 // No WireLength field is set here.
973 outHeader := &stats.OutHeader{
974 Header: s.header.Copy(),
975 Compression: s.sendCompress,
976 }
977 t.stats.HandleRPC(s.Context(), outHeader)
978 }
979 return nil
980}
981
982// WriteStatus sends stream status to the client and terminates the stream.
983// There is no further I/O operations being able to perform on this stream.
984// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
985// OK is adopted.
986func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
987 if s.getState() == streamDone {
988 return nil
989 }
990 s.hdrMu.Lock()
991 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
992 // first and create a slice of that exact size.
993 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
994 if !s.updateHeaderSent() { // No headers have been sent.
995 if len(s.header) > 0 { // Send a separate header frame.
996 if err := t.writeHeaderLocked(s); err != nil {
997 s.hdrMu.Unlock()
998 return err
999 }
1000 } else { // Send a trailer only response.
1001 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1002 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1003 }
1004 }
1005 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
1006 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
1007
1008 if p := st.Proto(); p != nil && len(p.Details) > 0 {
1009 stBytes, err := proto.Marshal(p)
1010 if err != nil {
1011 // TODO: return error instead, when callers are able to handle it.
1012 logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
1013 } else {
1014 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
1015 }
1016 }
1017
1018 // Attach the trailer metadata.
1019 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
1020 trailingHeader := &headerFrame{
1021 streamID: s.id,
1022 hf: headerFields,
1023 endStream: true,
1024 onWrite: t.setResetPingStrikes,
1025 }
1026 s.hdrMu.Unlock()
1027 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
1028 if !success {
1029 if err != nil {
1030 return err
1031 }
1032 t.closeStream(s, true, http2.ErrCodeInternal, false)
1033 return ErrHeaderListSizeLimitViolation
1034 }
1035 // Send a RST_STREAM after the trailers if the client has not already half-closed.
1036 rst := s.getState() == streamActive
1037 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
1038 if t.stats != nil {
1039 // Note: The trailer fields are compressed with hpack after this call returns.
1040 // No WireLength field is set here.
1041 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
1042 Trailer: s.trailer.Copy(),
1043 })
1044 }
1045 return nil
1046}
1047
1048// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
1049// is returns if it fails (e.g., framing error, transport error).
1050func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
1051 if !s.isHeaderSent() { // Headers haven't been written yet.
1052 if err := t.WriteHeader(s, nil); err != nil {
1053 if _, ok := err.(ConnectionError); ok {
1054 return err
1055 }
1056 // TODO(mmukhi, dfawley): Make sure this is the right code to return.
1057 return status.Errorf(codes.Internal, "transport: %v", err)
1058 }
1059 } else {
1060 // Writing headers checks for this condition.
1061 if s.getState() == streamDone {
1062 // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
1063 s.cancel()
1064 select {
1065 case <-t.done:
1066 return ErrConnClosing
1067 default:
1068 }
1069 return ContextErr(s.ctx.Err())
1070 }
1071 }
1072 df := &dataFrame{
1073 streamID: s.id,
1074 h: hdr,
1075 d: data,
1076 onEachWrite: t.setResetPingStrikes,
1077 }
1078 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
1079 select {
1080 case <-t.done:
1081 return ErrConnClosing
1082 default:
1083 }
1084 return ContextErr(s.ctx.Err())
1085 }
1086 return t.controlBuf.put(df)
1087}
1088
1089// keepalive running in a separate goroutine does the following:
1090// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
1091// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
1092// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
1093// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
1094// after an additional duration of keepalive.Timeout.
1095func (t *http2Server) keepalive() {
1096 p := &ping{}
1097 // True iff a ping has been sent, and no data has been received since then.
1098 outstandingPing := false
1099 // Amount of time remaining before which we should receive an ACK for the
1100 // last sent ping.
1101 kpTimeoutLeft := time.Duration(0)
1102 // Records the last value of t.lastRead before we go block on the timer.
1103 // This is required to check for read activity since then.
1104 prevNano := time.Now().UnixNano()
1105 // Initialize the different timers to their default values.
1106 idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
1107 ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
1108 kpTimer := time.NewTimer(t.kp.Time)
1109 defer func() {
1110 // We need to drain the underlying channel in these timers after a call
1111 // to Stop(), only if we are interested in resetting them. Clearly we
1112 // are not interested in resetting them here.
1113 idleTimer.Stop()
1114 ageTimer.Stop()
1115 kpTimer.Stop()
1116 }()
1117
1118 for {
1119 select {
1120 case <-idleTimer.C:
1121 t.mu.Lock()
1122 idle := t.idle
1123 if idle.IsZero() { // The connection is non-idle.
1124 t.mu.Unlock()
1125 idleTimer.Reset(t.kp.MaxConnectionIdle)
1126 continue
1127 }
1128 val := t.kp.MaxConnectionIdle - time.Since(idle)
1129 t.mu.Unlock()
1130 if val <= 0 {
1131 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
1132 // Gracefully close the connection.
1133 t.Drain()
1134 return
1135 }
1136 idleTimer.Reset(val)
1137 case <-ageTimer.C:
1138 t.Drain()
1139 ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1140 select {
1141 case <-ageTimer.C:
1142 // Close the connection after grace period.
1143 if logger.V(logLevel) {
1144 logger.Infof("transport: closing server transport due to maximum connection age.")
1145 }
1146 t.Close()
1147 case <-t.done:
1148 }
1149 return
1150 case <-kpTimer.C:
1151 lastRead := atomic.LoadInt64(&t.lastRead)
1152 if lastRead > prevNano {
1153 // There has been read activity since the last time we were
1154 // here. Setup the timer to fire at kp.Time seconds from
1155 // lastRead time and continue.
1156 outstandingPing = false
1157 kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1158 prevNano = lastRead
1159 continue
1160 }
1161 if outstandingPing && kpTimeoutLeft <= 0 {
1162 if logger.V(logLevel) {
1163 logger.Infof("transport: closing server transport due to idleness.")
1164 }
1165 t.Close()
1166 return
1167 }
1168 if !outstandingPing {
1169 if channelz.IsOn() {
1170 atomic.AddInt64(&t.czData.kpCount, 1)
1171 }
1172 t.controlBuf.put(p)
1173 kpTimeoutLeft = t.kp.Timeout
1174 outstandingPing = true
1175 }
1176 // The amount of time to sleep here is the minimum of kp.Time and
1177 // timeoutLeft. This will ensure that we wait only for kp.Time
1178 // before sending out the next ping (for cases where the ping is
1179 // acked).
1180 sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
1181 kpTimeoutLeft -= sleepDuration
1182 kpTimer.Reset(sleepDuration)
1183 case <-t.done:
1184 return
1185 }
1186 }
1187}
1188
1189// Close starts shutting down the http2Server transport.
1190// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1191// could cause some resource issue. Revisit this later.
1192func (t *http2Server) Close() {
1193 t.mu.Lock()
1194 if t.state == closing {
1195 t.mu.Unlock()
1196 return
1197 }
1198 t.state = closing
1199 streams := t.activeStreams
1200 t.activeStreams = nil
1201 t.mu.Unlock()
1202 t.controlBuf.finish()
1203 close(t.done)
1204 if err := t.conn.Close(); err != nil && logger.V(logLevel) {
1205 logger.Infof("transport: error closing conn during Close: %v", err)
1206 }
1207 if channelz.IsOn() {
1208 channelz.RemoveEntry(t.channelzID)
1209 }
1210 // Cancel all active streams.
1211 for _, s := range streams {
1212 s.cancel()
1213 }
1214 if t.stats != nil {
1215 connEnd := &stats.ConnEnd{}
1216 t.stats.HandleConn(t.ctx, connEnd)
1217 }
1218}
1219
1220// deleteStream deletes the stream s from transport's active streams.
1221func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1222 // In case stream sending and receiving are invoked in separate
1223 // goroutines (e.g., bi-directional streaming), cancel needs to be
1224 // called to interrupt the potential blocking on other goroutines.
1225 s.cancel()
1226
1227 t.mu.Lock()
1228 if _, ok := t.activeStreams[s.id]; ok {
1229 delete(t.activeStreams, s.id)
1230 if len(t.activeStreams) == 0 {
1231 t.idle = time.Now()
1232 }
1233 }
1234 t.mu.Unlock()
1235
1236 if channelz.IsOn() {
1237 if eosReceived {
1238 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1239 } else {
1240 atomic.AddInt64(&t.czData.streamsFailed, 1)
1241 }
1242 }
1243}
1244
1245// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1246func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1247 oldState := s.swapState(streamDone)
1248 if oldState == streamDone {
1249 // If the stream was already done, return.
1250 return
1251 }
1252
1253 hdr.cleanup = &cleanupStream{
1254 streamID: s.id,
1255 rst: rst,
1256 rstCode: rstCode,
1257 onWrite: func() {
1258 t.deleteStream(s, eosReceived)
1259 },
1260 }
1261 t.controlBuf.put(hdr)
1262}
1263
1264// closeStream clears the footprint of a stream when the stream is not needed any more.
1265func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1266 s.swapState(streamDone)
1267 t.deleteStream(s, eosReceived)
1268
1269 t.controlBuf.put(&cleanupStream{
1270 streamID: s.id,
1271 rst: rst,
1272 rstCode: rstCode,
1273 onWrite: func() {},
1274 })
1275}
1276
1277func (t *http2Server) RemoteAddr() net.Addr {
1278 return t.remoteAddr
1279}
1280
1281func (t *http2Server) Drain() {
1282 t.mu.Lock()
1283 defer t.mu.Unlock()
1284 if t.drainChan != nil {
1285 return
1286 }
1287 t.drainChan = make(chan struct{})
1288 t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
1289}
1290
1291var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1292
1293// Handles outgoing GoAway and returns true if loopy needs to put itself
1294// in draining mode.
1295func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1296 t.mu.Lock()
1297 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1298 t.mu.Unlock()
1299 // The transport is closing.
1300 return false, ErrConnClosing
1301 }
1302 sid := t.maxStreamID
1303 if !g.headsUp {
1304 // Stop accepting more streams now.
1305 t.state = draining
1306 if len(t.activeStreams) == 0 {
1307 g.closeConn = true
1308 }
1309 t.mu.Unlock()
1310 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1311 return false, err
1312 }
1313 if g.closeConn {
1314 // Abruptly close the connection following the GoAway (via
1315 // loopywriter). But flush out what's inside the buffer first.
1316 t.framer.writer.Flush()
1317 return false, fmt.Errorf("transport: Connection closing")
1318 }
1319 return true, nil
1320 }
1321 t.mu.Unlock()
1322 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1323 // Follow that with a ping and wait for the ack to come back or a timer
1324 // to expire. During this time accept new streams since they might have
1325 // originated before the GoAway reaches the client.
1326 // After getting the ack or timer expiration send out another GoAway this
1327 // time with an ID of the max stream server intends to process.
1328 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1329 return false, err
1330 }
1331 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1332 return false, err
1333 }
1334 go func() {
1335 timer := time.NewTimer(time.Minute)
1336 defer timer.Stop()
1337 select {
1338 case <-t.drainChan:
1339 case <-timer.C:
1340 case <-t.done:
1341 return
1342 }
1343 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1344 }()
1345 return false, nil
1346}
1347
1348func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1349 s := channelz.SocketInternalMetric{
1350 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1351 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1352 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1353 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1354 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1355 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1356 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1357 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1358 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1359 LocalFlowControlWindow: int64(t.fc.getSize()),
1360 SocketOptions: channelz.GetSocketOption(t.conn),
1361 LocalAddr: t.localAddr,
1362 RemoteAddr: t.remoteAddr,
1363 // RemoteName :
1364 }
1365 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1366 s.Security = au.GetSecurityValue()
1367 }
1368 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1369 return &s
1370}
1371
1372func (t *http2Server) IncrMsgSent() {
1373 atomic.AddInt64(&t.czData.msgSent, 1)
1374 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1375}
1376
1377func (t *http2Server) IncrMsgRecv() {
1378 atomic.AddInt64(&t.czData.msgRecv, 1)
1379 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1380}
1381
1382func (t *http2Server) getOutFlowWindow() int64 {
1383 resp := make(chan uint32, 1)
1384 timer := time.NewTimer(time.Second)
1385 defer timer.Stop()
1386 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1387 select {
1388 case sz := <-resp:
1389 return int64(sz)
1390 case <-t.done:
1391 return -1
1392 case <-timer.C:
1393 return -2
1394 }
1395}
1396
1397func getJitter(v time.Duration) time.Duration {
1398 if v == infinity {
1399 return 0
1400 }
1401 // Generate a jitter between +/- 10% of the value.
1402 r := int64(v / 10)
1403 j := grpcrand.Int63n(2*r) - r
1404 return time.Duration(j)
1405}
1406
1407type connectionKey struct{}
1408
1409// GetConnection gets the connection from the context.
1410func GetConnection(ctx context.Context) net.Conn {
1411 conn, _ := ctx.Value(connectionKey{}).(net.Conn)
1412 return conn
1413}
1414
1415// SetConnection adds the connection to the context to be able to get
1416// information about the destination ip and port for an incoming RPC. This also
1417// allows any unary or streaming interceptors to see the connection.
1418func setConnection(ctx context.Context, conn net.Conn) context.Context {
1419 return context.WithValue(ctx, connectionKey{}, conn)
1420}