blob: 2c6eaf0e59cfbb8c632b4442ef13874d758c63f5 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "net/http"
30 "strconv"
31 "sync"
32 "sync/atomic"
33 "time"
34
35 "github.com/golang/protobuf/proto"
36 "golang.org/x/net/http2"
37 "golang.org/x/net/http2/hpack"
38 "google.golang.org/grpc/internal/grpcutil"
39
40 "google.golang.org/grpc/codes"
41 "google.golang.org/grpc/credentials"
42 "google.golang.org/grpc/internal/channelz"
43 "google.golang.org/grpc/internal/grpcrand"
44 "google.golang.org/grpc/keepalive"
45 "google.golang.org/grpc/metadata"
46 "google.golang.org/grpc/peer"
47 "google.golang.org/grpc/stats"
48 "google.golang.org/grpc/status"
49 "google.golang.org/grpc/tap"
50)
51
52var (
53 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
54 // the stream's state.
55 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
56 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
57 // than the limit set by peer.
58 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
59)
60
61// serverConnectionCounter counts the number of connections a server has seen
62// (equal to the number of http2Servers created). Must be accessed atomically.
63var serverConnectionCounter uint64
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67 lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
68 ctx context.Context
69 done chan struct{}
70 conn net.Conn
71 loopy *loopyWriter
72 readerDone chan struct{} // sync point to enable testing.
73 writerDone chan struct{} // sync point to enable testing.
74 remoteAddr net.Addr
75 localAddr net.Addr
khenaidoo5fc5cea2021-08-11 17:39:16 -040076 authInfo credentials.AuthInfo // auth info about the connection
77 inTapHandle tap.ServerInHandle
78 framer *framer
79 // The max number of concurrent streams.
80 maxStreams uint32
81 // controlBuf delivers all the control related tasks (e.g., window
82 // updates, reset streams, and various settings) to the controller.
83 controlBuf *controlBuffer
84 fc *trInFlow
85 stats stats.Handler
86 // Keepalive and max-age parameters for the server.
87 kp keepalive.ServerParameters
88 // Keepalive enforcement policy.
89 kep keepalive.EnforcementPolicy
90 // The time instance last ping was received.
91 lastPingAt time.Time
92 // Number of times the client has violated keepalive ping policy so far.
93 pingStrikes uint8
94 // Flag to signify that number of ping strikes should be reset to 0.
95 // This is set whenever data or header frames are sent.
96 // 1 means yes.
97 resetPingStrikes uint32 // Accessed atomically.
98 initialWindowSize int32
99 bdpEst *bdpEstimator
100 maxSendHeaderListSize *uint32
101
102 mu sync.Mutex // guard the following
103
104 // drainChan is initialized when Drain() is called the first time.
105 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
106 // Then an independent goroutine will be launched to later send the second GoAway.
107 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
108 // Thus call to Drain() will be a no-op if drainChan is already initialized since draining is
109 // already underway.
110 drainChan chan struct{}
111 state transportState
112 activeStreams map[uint32]*Stream
113 // idle is the time instant when the connection went idle.
114 // This is either the beginning of the connection or when the number of
115 // RPCs go down to 0.
116 // When the connection is busy, this value is set to 0.
117 idle time.Time
118
119 // Fields below are for channelz metric collection.
120 channelzID int64 // channelz unique identification number
121 czData *channelzData
122 bufferPool *bufferPool
123
124 connectionID uint64
khenaidoo257f3192021-12-15 16:46:37 -0500125
126 // maxStreamMu guards the maximum stream ID
127 // This lock may not be taken if mu is already held.
128 maxStreamMu sync.Mutex
129 maxStreamID uint32 // max stream ID ever seen
khenaidoo5fc5cea2021-08-11 17:39:16 -0400130}
131
132// NewServerTransport creates a http2 transport with conn and configuration
133// options from config.
134//
135// It returns a non-nil transport and a nil error on success. On failure, it
khenaidoo5cb0d402021-12-08 14:09:16 -0500136// returns a nil transport and a non-nil error. For a special case where the
khenaidoo5fc5cea2021-08-11 17:39:16 -0400137// underlying conn gets closed before the client preface could be read, it
138// returns a nil transport and a nil error.
139func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
140 var authInfo credentials.AuthInfo
141 rawConn := conn
142 if config.Credentials != nil {
143 var err error
144 conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
145 if err != nil {
146 // ErrConnDispatched means that the connection was dispatched away
147 // from gRPC; those connections should be left open. io.EOF means
148 // the connection was closed before handshaking completed, which can
149 // happen naturally from probers. Return these errors directly.
150 if err == credentials.ErrConnDispatched || err == io.EOF {
151 return nil, err
152 }
153 return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
154 }
155 }
156 writeBufSize := config.WriteBufferSize
157 readBufSize := config.ReadBufferSize
158 maxHeaderListSize := defaultServerMaxHeaderListSize
159 if config.MaxHeaderListSize != nil {
160 maxHeaderListSize = *config.MaxHeaderListSize
161 }
162 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
163 // Send initial settings as connection preface to client.
164 isettings := []http2.Setting{{
165 ID: http2.SettingMaxFrameSize,
166 Val: http2MaxFrameLen,
167 }}
168 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
169 // permitted in the HTTP2 spec.
170 maxStreams := config.MaxStreams
171 if maxStreams == 0 {
172 maxStreams = math.MaxUint32
173 } else {
174 isettings = append(isettings, http2.Setting{
175 ID: http2.SettingMaxConcurrentStreams,
176 Val: maxStreams,
177 })
178 }
179 dynamicWindow := true
180 iwz := int32(initialWindowSize)
181 if config.InitialWindowSize >= defaultWindowSize {
182 iwz = config.InitialWindowSize
183 dynamicWindow = false
184 }
185 icwz := int32(initialWindowSize)
186 if config.InitialConnWindowSize >= defaultWindowSize {
187 icwz = config.InitialConnWindowSize
188 dynamicWindow = false
189 }
190 if iwz != defaultWindowSize {
191 isettings = append(isettings, http2.Setting{
192 ID: http2.SettingInitialWindowSize,
193 Val: uint32(iwz)})
194 }
195 if config.MaxHeaderListSize != nil {
196 isettings = append(isettings, http2.Setting{
197 ID: http2.SettingMaxHeaderListSize,
198 Val: *config.MaxHeaderListSize,
199 })
200 }
201 if config.HeaderTableSize != nil {
202 isettings = append(isettings, http2.Setting{
203 ID: http2.SettingHeaderTableSize,
204 Val: *config.HeaderTableSize,
205 })
206 }
207 if err := framer.fr.WriteSettings(isettings...); err != nil {
208 return nil, connectionErrorf(false, err, "transport: %v", err)
209 }
210 // Adjust the connection flow control window if needed.
211 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
212 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
213 return nil, connectionErrorf(false, err, "transport: %v", err)
214 }
215 }
216 kp := config.KeepaliveParams
217 if kp.MaxConnectionIdle == 0 {
218 kp.MaxConnectionIdle = defaultMaxConnectionIdle
219 }
220 if kp.MaxConnectionAge == 0 {
221 kp.MaxConnectionAge = defaultMaxConnectionAge
222 }
223 // Add a jitter to MaxConnectionAge.
224 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
225 if kp.MaxConnectionAgeGrace == 0 {
226 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
227 }
228 if kp.Time == 0 {
229 kp.Time = defaultServerKeepaliveTime
230 }
231 if kp.Timeout == 0 {
232 kp.Timeout = defaultServerKeepaliveTimeout
233 }
234 kep := config.KeepalivePolicy
235 if kep.MinTime == 0 {
236 kep.MinTime = defaultKeepalivePolicyMinTime
237 }
238
239 done := make(chan struct{})
240 t := &http2Server{
241 ctx: setConnection(context.Background(), rawConn),
242 done: done,
243 conn: conn,
244 remoteAddr: conn.RemoteAddr(),
245 localAddr: conn.LocalAddr(),
246 authInfo: authInfo,
247 framer: framer,
248 readerDone: make(chan struct{}),
249 writerDone: make(chan struct{}),
250 maxStreams: maxStreams,
251 inTapHandle: config.InTapHandle,
252 fc: &trInFlow{limit: uint32(icwz)},
253 state: reachable,
254 activeStreams: make(map[uint32]*Stream),
255 stats: config.StatsHandler,
256 kp: kp,
257 idle: time.Now(),
258 kep: kep,
259 initialWindowSize: iwz,
260 czData: new(channelzData),
261 bufferPool: newBufferPool(),
262 }
263 t.controlBuf = newControlBuffer(t.done)
264 if dynamicWindow {
265 t.bdpEst = &bdpEstimator{
266 bdp: initialWindowSize,
267 updateFlowControl: t.updateFlowControl,
268 }
269 }
270 if t.stats != nil {
271 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
272 RemoteAddr: t.remoteAddr,
273 LocalAddr: t.localAddr,
274 })
275 connBegin := &stats.ConnBegin{}
276 t.stats.HandleConn(t.ctx, connBegin)
277 }
278 if channelz.IsOn() {
279 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
280 }
281
282 t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
283
284 t.framer.writer.Flush()
285
286 defer func() {
287 if err != nil {
288 t.Close()
289 }
290 }()
291
292 // Check the validity of client preface.
293 preface := make([]byte, len(clientPreface))
294 if _, err := io.ReadFull(t.conn, preface); err != nil {
295 // In deployments where a gRPC server runs behind a cloud load balancer
296 // which performs regular TCP level health checks, the connection is
khenaidoo5cb0d402021-12-08 14:09:16 -0500297 // closed immediately by the latter. Returning io.EOF here allows the
298 // grpc server implementation to recognize this scenario and suppress
299 // logging to reduce spam.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400300 if err == io.EOF {
khenaidoo5cb0d402021-12-08 14:09:16 -0500301 return nil, io.EOF
khenaidoo5fc5cea2021-08-11 17:39:16 -0400302 }
303 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
304 }
305 if !bytes.Equal(preface, clientPreface) {
306 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
307 }
308
309 frame, err := t.framer.fr.ReadFrame()
310 if err == io.EOF || err == io.ErrUnexpectedEOF {
311 return nil, err
312 }
313 if err != nil {
314 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
315 }
316 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
317 sf, ok := frame.(*http2.SettingsFrame)
318 if !ok {
319 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
320 }
321 t.handleSettings(sf)
322
323 go func() {
324 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
325 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
326 if err := t.loopy.run(); err != nil {
327 if logger.V(logLevel) {
328 logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
329 }
330 }
331 t.conn.Close()
332 t.controlBuf.finish()
333 close(t.writerDone)
334 }()
335 go t.keepalive()
336 return t, nil
337}
338
339// operateHeader takes action on the decoded headers.
340func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
khenaidoo257f3192021-12-15 16:46:37 -0500341 // Acquire max stream ID lock for entire duration
342 t.maxStreamMu.Lock()
343 defer t.maxStreamMu.Unlock()
344
khenaidoo5fc5cea2021-08-11 17:39:16 -0400345 streamID := frame.Header().StreamID
346
347 // frame.Truncated is set to true when framer detects that the current header
348 // list size hits MaxHeaderListSize limit.
349 if frame.Truncated {
350 t.controlBuf.put(&cleanupStream{
351 streamID: streamID,
352 rst: true,
353 rstCode: http2.ErrCodeFrameSize,
354 onWrite: func() {},
355 })
356 return false
357 }
358
khenaidoo257f3192021-12-15 16:46:37 -0500359 if streamID%2 != 1 || streamID <= t.maxStreamID {
360 // illegal gRPC stream id.
361 if logger.V(logLevel) {
362 logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
363 }
364 return true
365 }
366 t.maxStreamID = streamID
367
khenaidoo5fc5cea2021-08-11 17:39:16 -0400368 buf := newRecvBuffer()
369 s := &Stream{
370 id: streamID,
371 st: t,
372 buf: buf,
373 fc: &inFlow{limit: uint32(t.initialWindowSize)},
374 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400375 var (
376 // If a gRPC Response-Headers has already been received, then it means
377 // that the peer is speaking gRPC and we are in gRPC mode.
378 isGRPC = false
379 mdata = make(map[string][]string)
380 httpMethod string
381 // headerError is set if an error is encountered while parsing the headers
382 headerError bool
383
384 timeoutSet bool
385 timeout time.Duration
386 )
387
388 for _, hf := range frame.Fields {
389 switch hf.Name {
390 case "content-type":
391 contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
392 if !validContentType {
393 break
394 }
395 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
396 s.contentSubtype = contentSubtype
397 isGRPC = true
398 case "grpc-encoding":
399 s.recvCompress = hf.Value
400 case ":method":
401 httpMethod = hf.Value
402 case ":path":
403 s.method = hf.Value
404 case "grpc-timeout":
405 timeoutSet = true
406 var err error
407 if timeout, err = decodeTimeout(hf.Value); err != nil {
408 headerError = true
409 }
khenaidoo5cb0d402021-12-08 14:09:16 -0500410 // "Transports must consider requests containing the Connection header
411 // as malformed." - A41
412 case "connection":
413 if logger.V(logLevel) {
414 logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
415 }
416 headerError = true
khenaidoo5fc5cea2021-08-11 17:39:16 -0400417 default:
418 if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
419 break
420 }
421 v, err := decodeMetadataHeader(hf.Name, hf.Value)
422 if err != nil {
423 headerError = true
424 logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
425 break
426 }
427 mdata[hf.Name] = append(mdata[hf.Name], v)
428 }
429 }
430
khenaidoo5cb0d402021-12-08 14:09:16 -0500431 // "If multiple Host headers or multiple :authority headers are present, the
432 // request must be rejected with an HTTP status code 400 as required by Host
433 // validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
434 // with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
435 // error, this takes precedence over a client not speaking gRPC.
436 if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
437 errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
438 if logger.V(logLevel) {
439 logger.Errorf("transport: %v", errMsg)
440 }
441 t.controlBuf.put(&earlyAbortStream{
442 httpStatus: 400,
443 streamID: streamID,
444 contentSubtype: s.contentSubtype,
445 status: status.New(codes.Internal, errMsg),
446 })
447 return false
448 }
449
khenaidoo5fc5cea2021-08-11 17:39:16 -0400450 if !isGRPC || headerError {
451 t.controlBuf.put(&cleanupStream{
452 streamID: streamID,
453 rst: true,
454 rstCode: http2.ErrCodeProtocol,
455 onWrite: func() {},
456 })
457 return false
458 }
459
khenaidoo5cb0d402021-12-08 14:09:16 -0500460 // "If :authority is missing, Host must be renamed to :authority." - A41
461 if len(mdata[":authority"]) == 0 {
462 // No-op if host isn't present, no eventual :authority header is a valid
463 // RPC.
464 if host, ok := mdata["host"]; ok {
465 mdata[":authority"] = host
466 delete(mdata, "host")
467 }
468 } else {
469 // "If :authority is present, Host must be discarded" - A41
470 delete(mdata, "host")
471 }
472
khenaidoo5fc5cea2021-08-11 17:39:16 -0400473 if frame.StreamEnded() {
474 // s is just created by the caller. No lock needed.
475 s.state = streamReadDone
476 }
477 if timeoutSet {
478 s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
479 } else {
480 s.ctx, s.cancel = context.WithCancel(t.ctx)
481 }
482 pr := &peer.Peer{
483 Addr: t.remoteAddr,
484 }
485 // Attach Auth info if there is any.
486 if t.authInfo != nil {
487 pr.AuthInfo = t.authInfo
488 }
489 s.ctx = peer.NewContext(s.ctx, pr)
490 // Attach the received metadata to the context.
491 if len(mdata) > 0 {
492 s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
493 if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
494 s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
495 }
496 if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
497 s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
498 }
499 }
500 t.mu.Lock()
501 if t.state != reachable {
502 t.mu.Unlock()
503 s.cancel()
504 return false
505 }
506 if uint32(len(t.activeStreams)) >= t.maxStreams {
507 t.mu.Unlock()
508 t.controlBuf.put(&cleanupStream{
509 streamID: streamID,
510 rst: true,
511 rstCode: http2.ErrCodeRefusedStream,
512 onWrite: func() {},
513 })
514 s.cancel()
515 return false
516 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400517 if httpMethod != http.MethodPost {
518 t.mu.Unlock()
519 if logger.V(logLevel) {
520 logger.Infof("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
521 }
522 t.controlBuf.put(&cleanupStream{
523 streamID: streamID,
524 rst: true,
525 rstCode: http2.ErrCodeProtocol,
526 onWrite: func() {},
527 })
528 s.cancel()
529 return false
530 }
531 if t.inTapHandle != nil {
532 var err error
533 if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
534 t.mu.Unlock()
535 if logger.V(logLevel) {
536 logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
537 }
538 stat, ok := status.FromError(err)
539 if !ok {
540 stat = status.New(codes.PermissionDenied, err.Error())
541 }
542 t.controlBuf.put(&earlyAbortStream{
khenaidoo5cb0d402021-12-08 14:09:16 -0500543 httpStatus: 200,
khenaidoo5fc5cea2021-08-11 17:39:16 -0400544 streamID: s.id,
545 contentSubtype: s.contentSubtype,
546 status: stat,
547 })
548 return false
549 }
550 }
551 t.activeStreams[streamID] = s
552 if len(t.activeStreams) == 1 {
553 t.idle = time.Time{}
554 }
555 t.mu.Unlock()
556 if channelz.IsOn() {
557 atomic.AddInt64(&t.czData.streamsStarted, 1)
558 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
559 }
560 s.requestRead = func(n int) {
561 t.adjustWindow(s, uint32(n))
562 }
563 s.ctx = traceCtx(s.ctx, s.method)
564 if t.stats != nil {
565 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
566 inHeader := &stats.InHeader{
567 FullMethod: s.method,
568 RemoteAddr: t.remoteAddr,
569 LocalAddr: t.localAddr,
570 Compression: s.recvCompress,
571 WireLength: int(frame.Header().Length),
572 Header: metadata.MD(mdata).Copy(),
573 }
574 t.stats.HandleRPC(s.ctx, inHeader)
575 }
576 s.ctxDone = s.ctx.Done()
577 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
578 s.trReader = &transportReader{
579 reader: &recvBufferReader{
580 ctx: s.ctx,
581 ctxDone: s.ctxDone,
582 recv: s.buf,
583 freeBuffer: t.bufferPool.put,
584 },
585 windowHandler: func(n int) {
586 t.updateWindow(s, uint32(n))
587 },
588 }
589 // Register the stream with loopy.
590 t.controlBuf.put(&registerStream{
591 streamID: s.id,
592 wq: s.wq,
593 })
594 handle(s)
595 return false
596}
597
598// HandleStreams receives incoming streams using the given handler. This is
599// typically run in a separate goroutine.
600// traceCtx attaches trace to ctx and returns the new context.
601func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
602 defer close(t.readerDone)
603 for {
604 t.controlBuf.throttle()
605 frame, err := t.framer.fr.ReadFrame()
606 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
607 if err != nil {
608 if se, ok := err.(http2.StreamError); ok {
609 if logger.V(logLevel) {
610 logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
611 }
612 t.mu.Lock()
613 s := t.activeStreams[se.StreamID]
614 t.mu.Unlock()
615 if s != nil {
616 t.closeStream(s, true, se.Code, false)
617 } else {
618 t.controlBuf.put(&cleanupStream{
619 streamID: se.StreamID,
620 rst: true,
621 rstCode: se.Code,
622 onWrite: func() {},
623 })
624 }
625 continue
626 }
627 if err == io.EOF || err == io.ErrUnexpectedEOF {
628 t.Close()
629 return
630 }
631 if logger.V(logLevel) {
632 logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
633 }
634 t.Close()
635 return
636 }
637 switch frame := frame.(type) {
638 case *http2.MetaHeadersFrame:
639 if t.operateHeaders(frame, handle, traceCtx) {
640 t.Close()
641 break
642 }
643 case *http2.DataFrame:
644 t.handleData(frame)
645 case *http2.RSTStreamFrame:
646 t.handleRSTStream(frame)
647 case *http2.SettingsFrame:
648 t.handleSettings(frame)
649 case *http2.PingFrame:
650 t.handlePing(frame)
651 case *http2.WindowUpdateFrame:
652 t.handleWindowUpdate(frame)
653 case *http2.GoAwayFrame:
654 // TODO: Handle GoAway from the client appropriately.
655 default:
656 if logger.V(logLevel) {
657 logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
658 }
659 }
660 }
661}
662
663func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
664 t.mu.Lock()
665 defer t.mu.Unlock()
666 if t.activeStreams == nil {
667 // The transport is closing.
668 return nil, false
669 }
670 s, ok := t.activeStreams[f.Header().StreamID]
671 if !ok {
672 // The stream is already done.
673 return nil, false
674 }
675 return s, true
676}
677
678// adjustWindow sends out extra window update over the initial window size
679// of stream if the application is requesting data larger in size than
680// the window.
681func (t *http2Server) adjustWindow(s *Stream, n uint32) {
682 if w := s.fc.maybeAdjust(n); w > 0 {
683 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
684 }
685
686}
687
688// updateWindow adjusts the inbound quota for the stream and the transport.
689// Window updates will deliver to the controller for sending when
690// the cumulative quota exceeds the corresponding threshold.
691func (t *http2Server) updateWindow(s *Stream, n uint32) {
692 if w := s.fc.onRead(n); w > 0 {
693 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
694 increment: w,
695 })
696 }
697}
698
699// updateFlowControl updates the incoming flow control windows
700// for the transport and the stream based on the current bdp
701// estimation.
702func (t *http2Server) updateFlowControl(n uint32) {
703 t.mu.Lock()
704 for _, s := range t.activeStreams {
705 s.fc.newLimit(n)
706 }
707 t.initialWindowSize = int32(n)
708 t.mu.Unlock()
709 t.controlBuf.put(&outgoingWindowUpdate{
710 streamID: 0,
711 increment: t.fc.newLimit(n),
712 })
713 t.controlBuf.put(&outgoingSettings{
714 ss: []http2.Setting{
715 {
716 ID: http2.SettingInitialWindowSize,
717 Val: n,
718 },
719 },
720 })
721
722}
723
724func (t *http2Server) handleData(f *http2.DataFrame) {
725 size := f.Header().Length
726 var sendBDPPing bool
727 if t.bdpEst != nil {
728 sendBDPPing = t.bdpEst.add(size)
729 }
730 // Decouple connection's flow control from application's read.
731 // An update on connection's flow control should not depend on
732 // whether user application has read the data or not. Such a
733 // restriction is already imposed on the stream's flow control,
734 // and therefore the sender will be blocked anyways.
735 // Decoupling the connection flow control will prevent other
736 // active(fast) streams from starving in presence of slow or
737 // inactive streams.
738 if w := t.fc.onData(size); w > 0 {
739 t.controlBuf.put(&outgoingWindowUpdate{
740 streamID: 0,
741 increment: w,
742 })
743 }
744 if sendBDPPing {
745 // Avoid excessive ping detection (e.g. in an L7 proxy)
746 // by sending a window update prior to the BDP ping.
747 if w := t.fc.reset(); w > 0 {
748 t.controlBuf.put(&outgoingWindowUpdate{
749 streamID: 0,
750 increment: w,
751 })
752 }
753 t.controlBuf.put(bdpPing)
754 }
755 // Select the right stream to dispatch.
756 s, ok := t.getStream(f)
757 if !ok {
758 return
759 }
760 if s.getState() == streamReadDone {
761 t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
762 return
763 }
764 if size > 0 {
765 if err := s.fc.onData(size); err != nil {
766 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
767 return
768 }
769 if f.Header().Flags.Has(http2.FlagDataPadded) {
770 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
771 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
772 }
773 }
774 // TODO(bradfitz, zhaoq): A copy is required here because there is no
775 // guarantee f.Data() is consumed before the arrival of next frame.
776 // Can this copy be eliminated?
777 if len(f.Data()) > 0 {
778 buffer := t.bufferPool.get()
779 buffer.Reset()
780 buffer.Write(f.Data())
781 s.write(recvMsg{buffer: buffer})
782 }
783 }
khenaidoo5cb0d402021-12-08 14:09:16 -0500784 if f.StreamEnded() {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400785 // Received the end of stream from the client.
786 s.compareAndSwapState(streamActive, streamReadDone)
787 s.write(recvMsg{err: io.EOF})
788 }
789}
790
791func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
792 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
793 if s, ok := t.getStream(f); ok {
794 t.closeStream(s, false, 0, false)
795 return
796 }
797 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
798 t.controlBuf.put(&cleanupStream{
799 streamID: f.Header().StreamID,
800 rst: false,
801 rstCode: 0,
802 onWrite: func() {},
803 })
804}
805
806func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
807 if f.IsAck() {
808 return
809 }
810 var ss []http2.Setting
811 var updateFuncs []func()
812 f.ForeachSetting(func(s http2.Setting) error {
813 switch s.ID {
814 case http2.SettingMaxHeaderListSize:
815 updateFuncs = append(updateFuncs, func() {
816 t.maxSendHeaderListSize = new(uint32)
817 *t.maxSendHeaderListSize = s.Val
818 })
819 default:
820 ss = append(ss, s)
821 }
822 return nil
823 })
824 t.controlBuf.executeAndPut(func(interface{}) bool {
825 for _, f := range updateFuncs {
826 f()
827 }
828 return true
829 }, &incomingSettings{
830 ss: ss,
831 })
832}
833
834const (
835 maxPingStrikes = 2
836 defaultPingTimeout = 2 * time.Hour
837)
838
839func (t *http2Server) handlePing(f *http2.PingFrame) {
840 if f.IsAck() {
841 if f.Data == goAwayPing.data && t.drainChan != nil {
842 close(t.drainChan)
843 return
844 }
845 // Maybe it's a BDP ping.
846 if t.bdpEst != nil {
847 t.bdpEst.calculate(f.Data)
848 }
849 return
850 }
851 pingAck := &ping{ack: true}
852 copy(pingAck.data[:], f.Data[:])
853 t.controlBuf.put(pingAck)
854
855 now := time.Now()
856 defer func() {
857 t.lastPingAt = now
858 }()
859 // A reset ping strikes means that we don't need to check for policy
860 // violation for this ping and the pingStrikes counter should be set
861 // to 0.
862 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
863 t.pingStrikes = 0
864 return
865 }
866 t.mu.Lock()
867 ns := len(t.activeStreams)
868 t.mu.Unlock()
869 if ns < 1 && !t.kep.PermitWithoutStream {
870 // Keepalive shouldn't be active thus, this new ping should
871 // have come after at least defaultPingTimeout.
872 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
873 t.pingStrikes++
874 }
875 } else {
876 // Check if keepalive policy is respected.
877 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
878 t.pingStrikes++
879 }
880 }
881
882 if t.pingStrikes > maxPingStrikes {
883 // Send goaway and close the connection.
884 if logger.V(logLevel) {
885 logger.Errorf("transport: Got too many pings from the client, closing the connection.")
886 }
887 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
888 }
889}
890
891func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
892 t.controlBuf.put(&incomingWindowUpdate{
893 streamID: f.Header().StreamID,
894 increment: f.Increment,
895 })
896}
897
898func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
899 for k, vv := range md {
900 if isReservedHeader(k) {
901 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
902 continue
903 }
904 for _, v := range vv {
905 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
906 }
907 }
908 return headerFields
909}
910
911func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
912 if t.maxSendHeaderListSize == nil {
913 return true
914 }
915 hdrFrame := it.(*headerFrame)
916 var sz int64
917 for _, f := range hdrFrame.hf {
918 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
919 if logger.V(logLevel) {
920 logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
921 }
922 return false
923 }
924 }
925 return true
926}
927
928// WriteHeader sends the header metadata md back to the client.
929func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
930 if s.updateHeaderSent() || s.getState() == streamDone {
931 return ErrIllegalHeaderWrite
932 }
933 s.hdrMu.Lock()
934 if md.Len() > 0 {
935 if s.header.Len() > 0 {
936 s.header = metadata.Join(s.header, md)
937 } else {
938 s.header = md
939 }
940 }
941 if err := t.writeHeaderLocked(s); err != nil {
942 s.hdrMu.Unlock()
943 return err
944 }
945 s.hdrMu.Unlock()
946 return nil
947}
948
949func (t *http2Server) setResetPingStrikes() {
950 atomic.StoreUint32(&t.resetPingStrikes, 1)
951}
952
953func (t *http2Server) writeHeaderLocked(s *Stream) error {
954 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
955 // first and create a slice of that exact size.
956 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
957 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
958 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
959 if s.sendCompress != "" {
960 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
961 }
962 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
963 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
964 streamID: s.id,
965 hf: headerFields,
966 endStream: false,
967 onWrite: t.setResetPingStrikes,
968 })
969 if !success {
970 if err != nil {
971 return err
972 }
973 t.closeStream(s, true, http2.ErrCodeInternal, false)
974 return ErrHeaderListSizeLimitViolation
975 }
976 if t.stats != nil {
977 // Note: Headers are compressed with hpack after this call returns.
978 // No WireLength field is set here.
979 outHeader := &stats.OutHeader{
980 Header: s.header.Copy(),
981 Compression: s.sendCompress,
982 }
983 t.stats.HandleRPC(s.Context(), outHeader)
984 }
985 return nil
986}
987
988// WriteStatus sends stream status to the client and terminates the stream.
989// There is no further I/O operations being able to perform on this stream.
990// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
991// OK is adopted.
992func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
993 if s.getState() == streamDone {
994 return nil
995 }
996 s.hdrMu.Lock()
997 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
998 // first and create a slice of that exact size.
999 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
1000 if !s.updateHeaderSent() { // No headers have been sent.
1001 if len(s.header) > 0 { // Send a separate header frame.
1002 if err := t.writeHeaderLocked(s); err != nil {
1003 s.hdrMu.Unlock()
1004 return err
1005 }
1006 } else { // Send a trailer only response.
1007 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1008 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1009 }
1010 }
1011 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
1012 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
1013
1014 if p := st.Proto(); p != nil && len(p.Details) > 0 {
1015 stBytes, err := proto.Marshal(p)
1016 if err != nil {
1017 // TODO: return error instead, when callers are able to handle it.
1018 logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
1019 } else {
1020 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
1021 }
1022 }
1023
1024 // Attach the trailer metadata.
1025 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
1026 trailingHeader := &headerFrame{
1027 streamID: s.id,
1028 hf: headerFields,
1029 endStream: true,
1030 onWrite: t.setResetPingStrikes,
1031 }
1032 s.hdrMu.Unlock()
1033 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
1034 if !success {
1035 if err != nil {
1036 return err
1037 }
1038 t.closeStream(s, true, http2.ErrCodeInternal, false)
1039 return ErrHeaderListSizeLimitViolation
1040 }
1041 // Send a RST_STREAM after the trailers if the client has not already half-closed.
1042 rst := s.getState() == streamActive
1043 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
1044 if t.stats != nil {
1045 // Note: The trailer fields are compressed with hpack after this call returns.
1046 // No WireLength field is set here.
1047 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
1048 Trailer: s.trailer.Copy(),
1049 })
1050 }
1051 return nil
1052}
1053
1054// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
1055// is returns if it fails (e.g., framing error, transport error).
1056func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
1057 if !s.isHeaderSent() { // Headers haven't been written yet.
1058 if err := t.WriteHeader(s, nil); err != nil {
1059 if _, ok := err.(ConnectionError); ok {
1060 return err
1061 }
1062 // TODO(mmukhi, dfawley): Make sure this is the right code to return.
1063 return status.Errorf(codes.Internal, "transport: %v", err)
1064 }
1065 } else {
1066 // Writing headers checks for this condition.
1067 if s.getState() == streamDone {
1068 // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
1069 s.cancel()
1070 select {
1071 case <-t.done:
1072 return ErrConnClosing
1073 default:
1074 }
1075 return ContextErr(s.ctx.Err())
1076 }
1077 }
1078 df := &dataFrame{
1079 streamID: s.id,
1080 h: hdr,
1081 d: data,
1082 onEachWrite: t.setResetPingStrikes,
1083 }
1084 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
1085 select {
1086 case <-t.done:
1087 return ErrConnClosing
1088 default:
1089 }
1090 return ContextErr(s.ctx.Err())
1091 }
1092 return t.controlBuf.put(df)
1093}
1094
1095// keepalive running in a separate goroutine does the following:
1096// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
1097// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
1098// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
1099// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
1100// after an additional duration of keepalive.Timeout.
1101func (t *http2Server) keepalive() {
1102 p := &ping{}
1103 // True iff a ping has been sent, and no data has been received since then.
1104 outstandingPing := false
1105 // Amount of time remaining before which we should receive an ACK for the
1106 // last sent ping.
1107 kpTimeoutLeft := time.Duration(0)
1108 // Records the last value of t.lastRead before we go block on the timer.
1109 // This is required to check for read activity since then.
1110 prevNano := time.Now().UnixNano()
1111 // Initialize the different timers to their default values.
1112 idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
1113 ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
1114 kpTimer := time.NewTimer(t.kp.Time)
1115 defer func() {
1116 // We need to drain the underlying channel in these timers after a call
1117 // to Stop(), only if we are interested in resetting them. Clearly we
1118 // are not interested in resetting them here.
1119 idleTimer.Stop()
1120 ageTimer.Stop()
1121 kpTimer.Stop()
1122 }()
1123
1124 for {
1125 select {
1126 case <-idleTimer.C:
1127 t.mu.Lock()
1128 idle := t.idle
1129 if idle.IsZero() { // The connection is non-idle.
1130 t.mu.Unlock()
1131 idleTimer.Reset(t.kp.MaxConnectionIdle)
1132 continue
1133 }
1134 val := t.kp.MaxConnectionIdle - time.Since(idle)
1135 t.mu.Unlock()
1136 if val <= 0 {
1137 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
1138 // Gracefully close the connection.
1139 t.Drain()
1140 return
1141 }
1142 idleTimer.Reset(val)
1143 case <-ageTimer.C:
1144 t.Drain()
1145 ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1146 select {
1147 case <-ageTimer.C:
1148 // Close the connection after grace period.
1149 if logger.V(logLevel) {
1150 logger.Infof("transport: closing server transport due to maximum connection age.")
1151 }
1152 t.Close()
1153 case <-t.done:
1154 }
1155 return
1156 case <-kpTimer.C:
1157 lastRead := atomic.LoadInt64(&t.lastRead)
1158 if lastRead > prevNano {
1159 // There has been read activity since the last time we were
1160 // here. Setup the timer to fire at kp.Time seconds from
1161 // lastRead time and continue.
1162 outstandingPing = false
1163 kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1164 prevNano = lastRead
1165 continue
1166 }
1167 if outstandingPing && kpTimeoutLeft <= 0 {
1168 if logger.V(logLevel) {
1169 logger.Infof("transport: closing server transport due to idleness.")
1170 }
1171 t.Close()
1172 return
1173 }
1174 if !outstandingPing {
1175 if channelz.IsOn() {
1176 atomic.AddInt64(&t.czData.kpCount, 1)
1177 }
1178 t.controlBuf.put(p)
1179 kpTimeoutLeft = t.kp.Timeout
1180 outstandingPing = true
1181 }
1182 // The amount of time to sleep here is the minimum of kp.Time and
1183 // timeoutLeft. This will ensure that we wait only for kp.Time
1184 // before sending out the next ping (for cases where the ping is
1185 // acked).
1186 sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
1187 kpTimeoutLeft -= sleepDuration
1188 kpTimer.Reset(sleepDuration)
1189 case <-t.done:
1190 return
1191 }
1192 }
1193}
1194
1195// Close starts shutting down the http2Server transport.
1196// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1197// could cause some resource issue. Revisit this later.
1198func (t *http2Server) Close() {
1199 t.mu.Lock()
1200 if t.state == closing {
1201 t.mu.Unlock()
1202 return
1203 }
1204 t.state = closing
1205 streams := t.activeStreams
1206 t.activeStreams = nil
1207 t.mu.Unlock()
1208 t.controlBuf.finish()
1209 close(t.done)
1210 if err := t.conn.Close(); err != nil && logger.V(logLevel) {
1211 logger.Infof("transport: error closing conn during Close: %v", err)
1212 }
1213 if channelz.IsOn() {
1214 channelz.RemoveEntry(t.channelzID)
1215 }
1216 // Cancel all active streams.
1217 for _, s := range streams {
1218 s.cancel()
1219 }
1220 if t.stats != nil {
1221 connEnd := &stats.ConnEnd{}
1222 t.stats.HandleConn(t.ctx, connEnd)
1223 }
1224}
1225
1226// deleteStream deletes the stream s from transport's active streams.
1227func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1228 // In case stream sending and receiving are invoked in separate
1229 // goroutines (e.g., bi-directional streaming), cancel needs to be
1230 // called to interrupt the potential blocking on other goroutines.
1231 s.cancel()
1232
1233 t.mu.Lock()
1234 if _, ok := t.activeStreams[s.id]; ok {
1235 delete(t.activeStreams, s.id)
1236 if len(t.activeStreams) == 0 {
1237 t.idle = time.Now()
1238 }
1239 }
1240 t.mu.Unlock()
1241
1242 if channelz.IsOn() {
1243 if eosReceived {
1244 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1245 } else {
1246 atomic.AddInt64(&t.czData.streamsFailed, 1)
1247 }
1248 }
1249}
1250
1251// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1252func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1253 oldState := s.swapState(streamDone)
1254 if oldState == streamDone {
1255 // If the stream was already done, return.
1256 return
1257 }
1258
1259 hdr.cleanup = &cleanupStream{
1260 streamID: s.id,
1261 rst: rst,
1262 rstCode: rstCode,
1263 onWrite: func() {
1264 t.deleteStream(s, eosReceived)
1265 },
1266 }
1267 t.controlBuf.put(hdr)
1268}
1269
1270// closeStream clears the footprint of a stream when the stream is not needed any more.
1271func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1272 s.swapState(streamDone)
1273 t.deleteStream(s, eosReceived)
1274
1275 t.controlBuf.put(&cleanupStream{
1276 streamID: s.id,
1277 rst: rst,
1278 rstCode: rstCode,
1279 onWrite: func() {},
1280 })
1281}
1282
1283func (t *http2Server) RemoteAddr() net.Addr {
1284 return t.remoteAddr
1285}
1286
1287func (t *http2Server) Drain() {
1288 t.mu.Lock()
1289 defer t.mu.Unlock()
1290 if t.drainChan != nil {
1291 return
1292 }
1293 t.drainChan = make(chan struct{})
1294 t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
1295}
1296
1297var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1298
1299// Handles outgoing GoAway and returns true if loopy needs to put itself
1300// in draining mode.
1301func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
khenaidoo257f3192021-12-15 16:46:37 -05001302 t.maxStreamMu.Lock()
khenaidoo5fc5cea2021-08-11 17:39:16 -04001303 t.mu.Lock()
1304 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1305 t.mu.Unlock()
khenaidoo257f3192021-12-15 16:46:37 -05001306 t.maxStreamMu.Unlock()
khenaidoo5fc5cea2021-08-11 17:39:16 -04001307 // The transport is closing.
1308 return false, ErrConnClosing
1309 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001310 if !g.headsUp {
1311 // Stop accepting more streams now.
1312 t.state = draining
khenaidoo257f3192021-12-15 16:46:37 -05001313 sid := t.maxStreamID
khenaidoo5fc5cea2021-08-11 17:39:16 -04001314 if len(t.activeStreams) == 0 {
1315 g.closeConn = true
1316 }
1317 t.mu.Unlock()
khenaidoo257f3192021-12-15 16:46:37 -05001318 t.maxStreamMu.Unlock()
khenaidoo5fc5cea2021-08-11 17:39:16 -04001319 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1320 return false, err
1321 }
1322 if g.closeConn {
1323 // Abruptly close the connection following the GoAway (via
1324 // loopywriter). But flush out what's inside the buffer first.
1325 t.framer.writer.Flush()
1326 return false, fmt.Errorf("transport: Connection closing")
1327 }
1328 return true, nil
1329 }
1330 t.mu.Unlock()
khenaidoo257f3192021-12-15 16:46:37 -05001331 t.maxStreamMu.Unlock()
khenaidoo5fc5cea2021-08-11 17:39:16 -04001332 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1333 // Follow that with a ping and wait for the ack to come back or a timer
1334 // to expire. During this time accept new streams since they might have
1335 // originated before the GoAway reaches the client.
1336 // After getting the ack or timer expiration send out another GoAway this
1337 // time with an ID of the max stream server intends to process.
1338 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1339 return false, err
1340 }
1341 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1342 return false, err
1343 }
1344 go func() {
1345 timer := time.NewTimer(time.Minute)
1346 defer timer.Stop()
1347 select {
1348 case <-t.drainChan:
1349 case <-timer.C:
1350 case <-t.done:
1351 return
1352 }
1353 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1354 }()
1355 return false, nil
1356}
1357
1358func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1359 s := channelz.SocketInternalMetric{
1360 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1361 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1362 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1363 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1364 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1365 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1366 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1367 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1368 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1369 LocalFlowControlWindow: int64(t.fc.getSize()),
1370 SocketOptions: channelz.GetSocketOption(t.conn),
1371 LocalAddr: t.localAddr,
1372 RemoteAddr: t.remoteAddr,
1373 // RemoteName :
1374 }
1375 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1376 s.Security = au.GetSecurityValue()
1377 }
1378 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1379 return &s
1380}
1381
1382func (t *http2Server) IncrMsgSent() {
1383 atomic.AddInt64(&t.czData.msgSent, 1)
1384 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1385}
1386
1387func (t *http2Server) IncrMsgRecv() {
1388 atomic.AddInt64(&t.czData.msgRecv, 1)
1389 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1390}
1391
1392func (t *http2Server) getOutFlowWindow() int64 {
1393 resp := make(chan uint32, 1)
1394 timer := time.NewTimer(time.Second)
1395 defer timer.Stop()
1396 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1397 select {
1398 case sz := <-resp:
1399 return int64(sz)
1400 case <-t.done:
1401 return -1
1402 case <-timer.C:
1403 return -2
1404 }
1405}
1406
1407func getJitter(v time.Duration) time.Duration {
1408 if v == infinity {
1409 return 0
1410 }
1411 // Generate a jitter between +/- 10% of the value.
1412 r := int64(v / 10)
1413 j := grpcrand.Int63n(2*r) - r
1414 return time.Duration(j)
1415}
1416
1417type connectionKey struct{}
1418
1419// GetConnection gets the connection from the context.
1420func GetConnection(ctx context.Context) net.Conn {
1421 conn, _ := ctx.Value(connectionKey{}).(net.Conn)
1422 return conn
1423}
1424
1425// SetConnection adds the connection to the context to be able to get
1426// information about the destination ip and port for an incoming RPC. This also
1427// allows any unary or streaming interceptors to see the connection.
1428func setConnection(ctx context.Context, conn net.Conn) context.Context {
1429 return context.WithValue(ctx, connectionKey{}, conn)
1430}