blob: 8b04b0392a0a6379e18b47eb65924e0da66206f7 [file] [log] [blame]
Prince Pereirac1c21d62021-04-22 08:38:15 +00001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "strconv"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "github.com/golang/protobuf/proto"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37
38 spb "google.golang.org/genproto/googleapis/rpc/status"
39 "google.golang.org/grpc/codes"
40 "google.golang.org/grpc/credentials"
41 "google.golang.org/grpc/grpclog"
42 "google.golang.org/grpc/internal"
43 "google.golang.org/grpc/internal/channelz"
44 "google.golang.org/grpc/internal/grpcrand"
45 "google.golang.org/grpc/keepalive"
46 "google.golang.org/grpc/metadata"
47 "google.golang.org/grpc/peer"
48 "google.golang.org/grpc/stats"
49 "google.golang.org/grpc/status"
50 "google.golang.org/grpc/tap"
51)
52
53var (
54 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
55 // the stream's state.
56 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
57 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
58 // than the limit set by peer.
59 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
60 // statusRawProto is a function to get to the raw status proto wrapped in a
61 // status.Status without a proto.Clone().
62 statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
63)
64
65// serverConnectionCounter counts the number of connections a server has seen
66// (equal to the number of http2Servers created). Must be accessed atomically.
67var serverConnectionCounter uint64
68
69// http2Server implements the ServerTransport interface with HTTP2.
70type http2Server struct {
71 lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
72 ctx context.Context
73 done chan struct{}
74 conn net.Conn
75 loopy *loopyWriter
76 readerDone chan struct{} // sync point to enable testing.
77 writerDone chan struct{} // sync point to enable testing.
78 remoteAddr net.Addr
79 localAddr net.Addr
80 maxStreamID uint32 // max stream ID ever seen
81 authInfo credentials.AuthInfo // auth info about the connection
82 inTapHandle tap.ServerInHandle
83 framer *framer
84 // The max number of concurrent streams.
85 maxStreams uint32
86 // controlBuf delivers all the control related tasks (e.g., window
87 // updates, reset streams, and various settings) to the controller.
88 controlBuf *controlBuffer
89 fc *trInFlow
90 stats stats.Handler
91 // Keepalive and max-age parameters for the server.
92 kp keepalive.ServerParameters
93 // Keepalive enforcement policy.
94 kep keepalive.EnforcementPolicy
95 // The time instance last ping was received.
96 lastPingAt time.Time
97 // Number of times the client has violated keepalive ping policy so far.
98 pingStrikes uint8
99 // Flag to signify that number of ping strikes should be reset to 0.
100 // This is set whenever data or header frames are sent.
101 // 1 means yes.
102 resetPingStrikes uint32 // Accessed atomically.
103 initialWindowSize int32
104 bdpEst *bdpEstimator
105 maxSendHeaderListSize *uint32
106
107 mu sync.Mutex // guard the following
108
109 // drainChan is initialized when drain(...) is called the first time.
110 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
111 // Then an independent goroutine will be launched to later send the second GoAway.
112 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
113 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
114 // already underway.
115 drainChan chan struct{}
116 state transportState
117 activeStreams map[uint32]*Stream
118 // idle is the time instant when the connection went idle.
119 // This is either the beginning of the connection or when the number of
120 // RPCs go down to 0.
121 // When the connection is busy, this value is set to 0.
122 idle time.Time
123
124 // Fields below are for channelz metric collection.
125 channelzID int64 // channelz unique identification number
126 czData *channelzData
127 bufferPool *bufferPool
128
129 connectionID uint64
130}
131
132// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
133// returned if something goes wrong.
134func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
135 writeBufSize := config.WriteBufferSize
136 readBufSize := config.ReadBufferSize
137 maxHeaderListSize := defaultServerMaxHeaderListSize
138 if config.MaxHeaderListSize != nil {
139 maxHeaderListSize = *config.MaxHeaderListSize
140 }
141 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
142 // Send initial settings as connection preface to client.
143 isettings := []http2.Setting{{
144 ID: http2.SettingMaxFrameSize,
145 Val: http2MaxFrameLen,
146 }}
147 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
148 // permitted in the HTTP2 spec.
149 maxStreams := config.MaxStreams
150 if maxStreams == 0 {
151 maxStreams = math.MaxUint32
152 } else {
153 isettings = append(isettings, http2.Setting{
154 ID: http2.SettingMaxConcurrentStreams,
155 Val: maxStreams,
156 })
157 }
158 dynamicWindow := true
159 iwz := int32(initialWindowSize)
160 if config.InitialWindowSize >= defaultWindowSize {
161 iwz = config.InitialWindowSize
162 dynamicWindow = false
163 }
164 icwz := int32(initialWindowSize)
165 if config.InitialConnWindowSize >= defaultWindowSize {
166 icwz = config.InitialConnWindowSize
167 dynamicWindow = false
168 }
169 if iwz != defaultWindowSize {
170 isettings = append(isettings, http2.Setting{
171 ID: http2.SettingInitialWindowSize,
172 Val: uint32(iwz)})
173 }
174 if config.MaxHeaderListSize != nil {
175 isettings = append(isettings, http2.Setting{
176 ID: http2.SettingMaxHeaderListSize,
177 Val: *config.MaxHeaderListSize,
178 })
179 }
180 if config.HeaderTableSize != nil {
181 isettings = append(isettings, http2.Setting{
182 ID: http2.SettingHeaderTableSize,
183 Val: *config.HeaderTableSize,
184 })
185 }
186 if err := framer.fr.WriteSettings(isettings...); err != nil {
187 return nil, connectionErrorf(false, err, "transport: %v", err)
188 }
189 // Adjust the connection flow control window if needed.
190 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
191 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
192 return nil, connectionErrorf(false, err, "transport: %v", err)
193 }
194 }
195 kp := config.KeepaliveParams
196 if kp.MaxConnectionIdle == 0 {
197 kp.MaxConnectionIdle = defaultMaxConnectionIdle
198 }
199 if kp.MaxConnectionAge == 0 {
200 kp.MaxConnectionAge = defaultMaxConnectionAge
201 }
202 // Add a jitter to MaxConnectionAge.
203 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
204 if kp.MaxConnectionAgeGrace == 0 {
205 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
206 }
207 if kp.Time == 0 {
208 kp.Time = defaultServerKeepaliveTime
209 }
210 if kp.Timeout == 0 {
211 kp.Timeout = defaultServerKeepaliveTimeout
212 }
213 kep := config.KeepalivePolicy
214 if kep.MinTime == 0 {
215 kep.MinTime = defaultKeepalivePolicyMinTime
216 }
217 done := make(chan struct{})
218 t := &http2Server{
219 ctx: context.Background(),
220 done: done,
221 conn: conn,
222 remoteAddr: conn.RemoteAddr(),
223 localAddr: conn.LocalAddr(),
224 authInfo: config.AuthInfo,
225 framer: framer,
226 readerDone: make(chan struct{}),
227 writerDone: make(chan struct{}),
228 maxStreams: maxStreams,
229 inTapHandle: config.InTapHandle,
230 fc: &trInFlow{limit: uint32(icwz)},
231 state: reachable,
232 activeStreams: make(map[uint32]*Stream),
233 stats: config.StatsHandler,
234 kp: kp,
235 idle: time.Now(),
236 kep: kep,
237 initialWindowSize: iwz,
238 czData: new(channelzData),
239 bufferPool: newBufferPool(),
240 }
241 t.controlBuf = newControlBuffer(t.done)
242 if dynamicWindow {
243 t.bdpEst = &bdpEstimator{
244 bdp: initialWindowSize,
245 updateFlowControl: t.updateFlowControl,
246 }
247 }
248 if t.stats != nil {
249 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
250 RemoteAddr: t.remoteAddr,
251 LocalAddr: t.localAddr,
252 })
253 connBegin := &stats.ConnBegin{}
254 t.stats.HandleConn(t.ctx, connBegin)
255 }
256 if channelz.IsOn() {
257 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
258 }
259
260 t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
261
262 t.framer.writer.Flush()
263
264 defer func() {
265 if err != nil {
266 t.Close()
267 }
268 }()
269
270 // Check the validity of client preface.
271 preface := make([]byte, len(clientPreface))
272 if _, err := io.ReadFull(t.conn, preface); err != nil {
273 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
274 }
275 if !bytes.Equal(preface, clientPreface) {
276 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
277 }
278
279 frame, err := t.framer.fr.ReadFrame()
280 if err == io.EOF || err == io.ErrUnexpectedEOF {
281 return nil, err
282 }
283 if err != nil {
284 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
285 }
286 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
287 sf, ok := frame.(*http2.SettingsFrame)
288 if !ok {
289 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
290 }
291 t.handleSettings(sf)
292
293 go func() {
294 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
295 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
296 if err := t.loopy.run(); err != nil {
297 errorf("transport: loopyWriter.run returning. Err: %v", err)
298 }
299 t.conn.Close()
300 close(t.writerDone)
301 }()
302 go t.keepalive()
303 return t, nil
304}
305
306// operateHeader takes action on the decoded headers.
307func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
308 streamID := frame.Header().StreamID
309 state := &decodeState{
310 serverSide: true,
311 }
312 if err := state.decodeHeader(frame); err != nil {
313 if se, ok := status.FromError(err); ok {
314 t.controlBuf.put(&cleanupStream{
315 streamID: streamID,
316 rst: true,
317 rstCode: statusCodeConvTab[se.Code()],
318 onWrite: func() {},
319 })
320 }
321 return false
322 }
323
324 buf := newRecvBuffer()
325 s := &Stream{
326 id: streamID,
327 st: t,
328 buf: buf,
329 fc: &inFlow{limit: uint32(t.initialWindowSize)},
330 recvCompress: state.data.encoding,
331 method: state.data.method,
332 contentSubtype: state.data.contentSubtype,
333 }
334 if frame.StreamEnded() {
335 // s is just created by the caller. No lock needed.
336 s.state = streamReadDone
337 }
338 if state.data.timeoutSet {
339 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
340 } else {
341 s.ctx, s.cancel = context.WithCancel(t.ctx)
342 }
343 pr := &peer.Peer{
344 Addr: t.remoteAddr,
345 }
346 // Attach Auth info if there is any.
347 if t.authInfo != nil {
348 pr.AuthInfo = t.authInfo
349 }
350 s.ctx = peer.NewContext(s.ctx, pr)
351 // Attach the received metadata to the context.
352 if len(state.data.mdata) > 0 {
353 s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
354 }
355 if state.data.statsTags != nil {
356 s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
357 }
358 if state.data.statsTrace != nil {
359 s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
360 }
361 if t.inTapHandle != nil {
362 var err error
363 info := &tap.Info{
364 FullMethodName: state.data.method,
365 }
366 s.ctx, err = t.inTapHandle(s.ctx, info)
367 if err != nil {
368 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
369 t.controlBuf.put(&cleanupStream{
370 streamID: s.id,
371 rst: true,
372 rstCode: http2.ErrCodeRefusedStream,
373 onWrite: func() {},
374 })
375 s.cancel()
376 return false
377 }
378 }
379 t.mu.Lock()
380 if t.state != reachable {
381 t.mu.Unlock()
382 s.cancel()
383 return false
384 }
385 if uint32(len(t.activeStreams)) >= t.maxStreams {
386 t.mu.Unlock()
387 t.controlBuf.put(&cleanupStream{
388 streamID: streamID,
389 rst: true,
390 rstCode: http2.ErrCodeRefusedStream,
391 onWrite: func() {},
392 })
393 s.cancel()
394 return false
395 }
396 if streamID%2 != 1 || streamID <= t.maxStreamID {
397 t.mu.Unlock()
398 // illegal gRPC stream id.
399 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
400 s.cancel()
401 return true
402 }
403 t.maxStreamID = streamID
404 t.activeStreams[streamID] = s
405 if len(t.activeStreams) == 1 {
406 t.idle = time.Time{}
407 }
408 t.mu.Unlock()
409 if channelz.IsOn() {
410 atomic.AddInt64(&t.czData.streamsStarted, 1)
411 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
412 }
413 s.requestRead = func(n int) {
414 t.adjustWindow(s, uint32(n))
415 }
416 s.ctx = traceCtx(s.ctx, s.method)
417 if t.stats != nil {
418 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
419 inHeader := &stats.InHeader{
420 FullMethod: s.method,
421 RemoteAddr: t.remoteAddr,
422 LocalAddr: t.localAddr,
423 Compression: s.recvCompress,
424 WireLength: int(frame.Header().Length),
425 Header: metadata.MD(state.data.mdata).Copy(),
426 }
427 t.stats.HandleRPC(s.ctx, inHeader)
428 }
429 s.ctxDone = s.ctx.Done()
430 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
431 s.trReader = &transportReader{
432 reader: &recvBufferReader{
433 ctx: s.ctx,
434 ctxDone: s.ctxDone,
435 recv: s.buf,
436 freeBuffer: t.bufferPool.put,
437 },
438 windowHandler: func(n int) {
439 t.updateWindow(s, uint32(n))
440 },
441 }
442 // Register the stream with loopy.
443 t.controlBuf.put(&registerStream{
444 streamID: s.id,
445 wq: s.wq,
446 })
447 handle(s)
448 return false
449}
450
451// HandleStreams receives incoming streams using the given handler. This is
452// typically run in a separate goroutine.
453// traceCtx attaches trace to ctx and returns the new context.
454func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
455 defer close(t.readerDone)
456 for {
457 t.controlBuf.throttle()
458 frame, err := t.framer.fr.ReadFrame()
459 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
460 if err != nil {
461 if se, ok := err.(http2.StreamError); ok {
462 warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
463 t.mu.Lock()
464 s := t.activeStreams[se.StreamID]
465 t.mu.Unlock()
466 if s != nil {
467 t.closeStream(s, true, se.Code, false)
468 } else {
469 t.controlBuf.put(&cleanupStream{
470 streamID: se.StreamID,
471 rst: true,
472 rstCode: se.Code,
473 onWrite: func() {},
474 })
475 }
476 continue
477 }
478 if err == io.EOF || err == io.ErrUnexpectedEOF {
479 t.Close()
480 return
481 }
482 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
483 t.Close()
484 return
485 }
486 switch frame := frame.(type) {
487 case *http2.MetaHeadersFrame:
488 if t.operateHeaders(frame, handle, traceCtx) {
489 t.Close()
490 break
491 }
492 case *http2.DataFrame:
493 t.handleData(frame)
494 case *http2.RSTStreamFrame:
495 t.handleRSTStream(frame)
496 case *http2.SettingsFrame:
497 t.handleSettings(frame)
498 case *http2.PingFrame:
499 t.handlePing(frame)
500 case *http2.WindowUpdateFrame:
501 t.handleWindowUpdate(frame)
502 case *http2.GoAwayFrame:
503 // TODO: Handle GoAway from the client appropriately.
504 default:
505 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
506 }
507 }
508}
509
510func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
511 t.mu.Lock()
512 defer t.mu.Unlock()
513 if t.activeStreams == nil {
514 // The transport is closing.
515 return nil, false
516 }
517 s, ok := t.activeStreams[f.Header().StreamID]
518 if !ok {
519 // The stream is already done.
520 return nil, false
521 }
522 return s, true
523}
524
525// adjustWindow sends out extra window update over the initial window size
526// of stream if the application is requesting data larger in size than
527// the window.
528func (t *http2Server) adjustWindow(s *Stream, n uint32) {
529 if w := s.fc.maybeAdjust(n); w > 0 {
530 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
531 }
532
533}
534
535// updateWindow adjusts the inbound quota for the stream and the transport.
536// Window updates will deliver to the controller for sending when
537// the cumulative quota exceeds the corresponding threshold.
538func (t *http2Server) updateWindow(s *Stream, n uint32) {
539 if w := s.fc.onRead(n); w > 0 {
540 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
541 increment: w,
542 })
543 }
544}
545
546// updateFlowControl updates the incoming flow control windows
547// for the transport and the stream based on the current bdp
548// estimation.
549func (t *http2Server) updateFlowControl(n uint32) {
550 t.mu.Lock()
551 for _, s := range t.activeStreams {
552 s.fc.newLimit(n)
553 }
554 t.initialWindowSize = int32(n)
555 t.mu.Unlock()
556 t.controlBuf.put(&outgoingWindowUpdate{
557 streamID: 0,
558 increment: t.fc.newLimit(n),
559 })
560 t.controlBuf.put(&outgoingSettings{
561 ss: []http2.Setting{
562 {
563 ID: http2.SettingInitialWindowSize,
564 Val: n,
565 },
566 },
567 })
568
569}
570
571func (t *http2Server) handleData(f *http2.DataFrame) {
572 size := f.Header().Length
573 var sendBDPPing bool
574 if t.bdpEst != nil {
575 sendBDPPing = t.bdpEst.add(size)
576 }
577 // Decouple connection's flow control from application's read.
578 // An update on connection's flow control should not depend on
579 // whether user application has read the data or not. Such a
580 // restriction is already imposed on the stream's flow control,
581 // and therefore the sender will be blocked anyways.
582 // Decoupling the connection flow control will prevent other
583 // active(fast) streams from starving in presence of slow or
584 // inactive streams.
585 if w := t.fc.onData(size); w > 0 {
586 t.controlBuf.put(&outgoingWindowUpdate{
587 streamID: 0,
588 increment: w,
589 })
590 }
591 if sendBDPPing {
592 // Avoid excessive ping detection (e.g. in an L7 proxy)
593 // by sending a window update prior to the BDP ping.
594 if w := t.fc.reset(); w > 0 {
595 t.controlBuf.put(&outgoingWindowUpdate{
596 streamID: 0,
597 increment: w,
598 })
599 }
600 t.controlBuf.put(bdpPing)
601 }
602 // Select the right stream to dispatch.
603 s, ok := t.getStream(f)
604 if !ok {
605 return
606 }
607 if size > 0 {
608 if err := s.fc.onData(size); err != nil {
609 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
610 return
611 }
612 if f.Header().Flags.Has(http2.FlagDataPadded) {
613 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
614 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
615 }
616 }
617 // TODO(bradfitz, zhaoq): A copy is required here because there is no
618 // guarantee f.Data() is consumed before the arrival of next frame.
619 // Can this copy be eliminated?
620 if len(f.Data()) > 0 {
621 buffer := t.bufferPool.get()
622 buffer.Reset()
623 buffer.Write(f.Data())
624 s.write(recvMsg{buffer: buffer})
625 }
626 }
627 if f.Header().Flags.Has(http2.FlagDataEndStream) {
628 // Received the end of stream from the client.
629 s.compareAndSwapState(streamActive, streamReadDone)
630 s.write(recvMsg{err: io.EOF})
631 }
632}
633
634func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
635 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
636 if s, ok := t.getStream(f); ok {
637 t.closeStream(s, false, 0, false)
638 return
639 }
640 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
641 t.controlBuf.put(&cleanupStream{
642 streamID: f.Header().StreamID,
643 rst: false,
644 rstCode: 0,
645 onWrite: func() {},
646 })
647}
648
649func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
650 if f.IsAck() {
651 return
652 }
653 var ss []http2.Setting
654 var updateFuncs []func()
655 f.ForeachSetting(func(s http2.Setting) error {
656 switch s.ID {
657 case http2.SettingMaxHeaderListSize:
658 updateFuncs = append(updateFuncs, func() {
659 t.maxSendHeaderListSize = new(uint32)
660 *t.maxSendHeaderListSize = s.Val
661 })
662 default:
663 ss = append(ss, s)
664 }
665 return nil
666 })
667 t.controlBuf.executeAndPut(func(interface{}) bool {
668 for _, f := range updateFuncs {
669 f()
670 }
671 return true
672 }, &incomingSettings{
673 ss: ss,
674 })
675}
676
677const (
678 maxPingStrikes = 2
679 defaultPingTimeout = 2 * time.Hour
680)
681
682func (t *http2Server) handlePing(f *http2.PingFrame) {
683 if f.IsAck() {
684 if f.Data == goAwayPing.data && t.drainChan != nil {
685 close(t.drainChan)
686 return
687 }
688 // Maybe it's a BDP ping.
689 if t.bdpEst != nil {
690 t.bdpEst.calculate(f.Data)
691 }
692 return
693 }
694 pingAck := &ping{ack: true}
695 copy(pingAck.data[:], f.Data[:])
696 t.controlBuf.put(pingAck)
697
698 now := time.Now()
699 defer func() {
700 t.lastPingAt = now
701 }()
702 // A reset ping strikes means that we don't need to check for policy
703 // violation for this ping and the pingStrikes counter should be set
704 // to 0.
705 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
706 t.pingStrikes = 0
707 return
708 }
709 t.mu.Lock()
710 ns := len(t.activeStreams)
711 t.mu.Unlock()
712 if ns < 1 && !t.kep.PermitWithoutStream {
713 // Keepalive shouldn't be active thus, this new ping should
714 // have come after at least defaultPingTimeout.
715 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
716 t.pingStrikes++
717 }
718 } else {
719 // Check if keepalive policy is respected.
720 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
721 t.pingStrikes++
722 }
723 }
724
725 if t.pingStrikes > maxPingStrikes {
726 // Send goaway and close the connection.
727 errorf("transport: Got too many pings from the client, closing the connection.")
728 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
729 }
730}
731
732func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
733 t.controlBuf.put(&incomingWindowUpdate{
734 streamID: f.Header().StreamID,
735 increment: f.Increment,
736 })
737}
738
739func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
740 for k, vv := range md {
741 if isReservedHeader(k) {
742 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
743 continue
744 }
745 for _, v := range vv {
746 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
747 }
748 }
749 return headerFields
750}
751
752func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
753 if t.maxSendHeaderListSize == nil {
754 return true
755 }
756 hdrFrame := it.(*headerFrame)
757 var sz int64
758 for _, f := range hdrFrame.hf {
759 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
760 errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
761 return false
762 }
763 }
764 return true
765}
766
767// WriteHeader sends the header metadata md back to the client.
768func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
769 if s.updateHeaderSent() || s.getState() == streamDone {
770 return ErrIllegalHeaderWrite
771 }
772 s.hdrMu.Lock()
773 if md.Len() > 0 {
774 if s.header.Len() > 0 {
775 s.header = metadata.Join(s.header, md)
776 } else {
777 s.header = md
778 }
779 }
780 if err := t.writeHeaderLocked(s); err != nil {
781 s.hdrMu.Unlock()
782 return err
783 }
784 s.hdrMu.Unlock()
785 return nil
786}
787
788func (t *http2Server) setResetPingStrikes() {
789 atomic.StoreUint32(&t.resetPingStrikes, 1)
790}
791
792func (t *http2Server) writeHeaderLocked(s *Stream) error {
793 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
794 // first and create a slice of that exact size.
795 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
796 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
797 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
798 if s.sendCompress != "" {
799 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
800 }
801 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
802 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
803 streamID: s.id,
804 hf: headerFields,
805 endStream: false,
806 onWrite: t.setResetPingStrikes,
807 })
808 if !success {
809 if err != nil {
810 return err
811 }
812 t.closeStream(s, true, http2.ErrCodeInternal, false)
813 return ErrHeaderListSizeLimitViolation
814 }
815 if t.stats != nil {
816 // Note: WireLength is not set in outHeader.
817 // TODO(mmukhi): Revisit this later, if needed.
818 outHeader := &stats.OutHeader{
819 Header: s.header.Copy(),
820 }
821 t.stats.HandleRPC(s.Context(), outHeader)
822 }
823 return nil
824}
825
826// WriteStatus sends stream status to the client and terminates the stream.
827// There is no further I/O operations being able to perform on this stream.
828// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
829// OK is adopted.
830func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
831 if s.getState() == streamDone {
832 return nil
833 }
834 s.hdrMu.Lock()
835 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
836 // first and create a slice of that exact size.
837 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
838 if !s.updateHeaderSent() { // No headers have been sent.
839 if len(s.header) > 0 { // Send a separate header frame.
840 if err := t.writeHeaderLocked(s); err != nil {
841 s.hdrMu.Unlock()
842 return err
843 }
844 } else { // Send a trailer only response.
845 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
846 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
847 }
848 }
849 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
850 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
851
852 if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
853 stBytes, err := proto.Marshal(p)
854 if err != nil {
855 // TODO: return error instead, when callers are able to handle it.
856 grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
857 } else {
858 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
859 }
860 }
861
862 // Attach the trailer metadata.
863 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
864 trailingHeader := &headerFrame{
865 streamID: s.id,
866 hf: headerFields,
867 endStream: true,
868 onWrite: t.setResetPingStrikes,
869 }
870 s.hdrMu.Unlock()
871 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
872 if !success {
873 if err != nil {
874 return err
875 }
876 t.closeStream(s, true, http2.ErrCodeInternal, false)
877 return ErrHeaderListSizeLimitViolation
878 }
879 // Send a RST_STREAM after the trailers if the client has not already half-closed.
880 rst := s.getState() == streamActive
881 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
882 if t.stats != nil {
883 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
884 Trailer: s.trailer.Copy(),
885 })
886 }
887 return nil
888}
889
890// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
891// is returns if it fails (e.g., framing error, transport error).
892func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
893 if !s.isHeaderSent() { // Headers haven't been written yet.
894 if err := t.WriteHeader(s, nil); err != nil {
895 if _, ok := err.(ConnectionError); ok {
896 return err
897 }
898 // TODO(mmukhi, dfawley): Make sure this is the right code to return.
899 return status.Errorf(codes.Internal, "transport: %v", err)
900 }
901 } else {
902 // Writing headers checks for this condition.
903 if s.getState() == streamDone {
904 // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
905 s.cancel()
906 select {
907 case <-t.done:
908 return ErrConnClosing
909 default:
910 }
911 return ContextErr(s.ctx.Err())
912 }
913 }
914 // Add some data to header frame so that we can equally distribute bytes across frames.
915 emptyLen := http2MaxFrameLen - len(hdr)
916 if emptyLen > len(data) {
917 emptyLen = len(data)
918 }
919 hdr = append(hdr, data[:emptyLen]...)
920 data = data[emptyLen:]
921 df := &dataFrame{
922 streamID: s.id,
923 h: hdr,
924 d: data,
925 onEachWrite: t.setResetPingStrikes,
926 }
927 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
928 select {
929 case <-t.done:
930 return ErrConnClosing
931 default:
932 }
933 return ContextErr(s.ctx.Err())
934 }
935 return t.controlBuf.put(df)
936}
937
938// keepalive running in a separate goroutine does the following:
939// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
940// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
941// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
942// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
943// after an additional duration of keepalive.Timeout.
944func (t *http2Server) keepalive() {
945 p := &ping{}
946 // True iff a ping has been sent, and no data has been received since then.
947 outstandingPing := false
948 // Amount of time remaining before which we should receive an ACK for the
949 // last sent ping.
950 kpTimeoutLeft := time.Duration(0)
951 // Records the last value of t.lastRead before we go block on the timer.
952 // This is required to check for read activity since then.
953 prevNano := time.Now().UnixNano()
954 // Initialize the different timers to their default values.
955 idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
956 ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
957 kpTimer := time.NewTimer(t.kp.Time)
958 defer func() {
959 // We need to drain the underlying channel in these timers after a call
960 // to Stop(), only if we are interested in resetting them. Clearly we
961 // are not interested in resetting them here.
962 idleTimer.Stop()
963 ageTimer.Stop()
964 kpTimer.Stop()
965 }()
966
967 for {
968 select {
969 case <-idleTimer.C:
970 t.mu.Lock()
971 idle := t.idle
972 if idle.IsZero() { // The connection is non-idle.
973 t.mu.Unlock()
974 idleTimer.Reset(t.kp.MaxConnectionIdle)
975 continue
976 }
977 val := t.kp.MaxConnectionIdle - time.Since(idle)
978 t.mu.Unlock()
979 if val <= 0 {
980 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
981 // Gracefully close the connection.
982 t.drain(http2.ErrCodeNo, []byte{})
983 return
984 }
985 idleTimer.Reset(val)
986 case <-ageTimer.C:
987 t.drain(http2.ErrCodeNo, []byte{})
988 ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
989 select {
990 case <-ageTimer.C:
991 // Close the connection after grace period.
992 infof("transport: closing server transport due to maximum connection age.")
993 t.Close()
994 case <-t.done:
995 }
996 return
997 case <-kpTimer.C:
998 lastRead := atomic.LoadInt64(&t.lastRead)
999 if lastRead > prevNano {
1000 // There has been read activity since the last time we were
1001 // here. Setup the timer to fire at kp.Time seconds from
1002 // lastRead time and continue.
1003 outstandingPing = false
1004 kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1005 prevNano = lastRead
1006 continue
1007 }
1008 if outstandingPing && kpTimeoutLeft <= 0 {
1009 infof("transport: closing server transport due to idleness.")
1010 t.Close()
1011 return
1012 }
1013 if !outstandingPing {
1014 if channelz.IsOn() {
1015 atomic.AddInt64(&t.czData.kpCount, 1)
1016 }
1017 t.controlBuf.put(p)
1018 kpTimeoutLeft = t.kp.Timeout
1019 outstandingPing = true
1020 }
1021 // The amount of time to sleep here is the minimum of kp.Time and
1022 // timeoutLeft. This will ensure that we wait only for kp.Time
1023 // before sending out the next ping (for cases where the ping is
1024 // acked).
1025 sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
1026 kpTimeoutLeft -= sleepDuration
1027 kpTimer.Reset(sleepDuration)
1028 case <-t.done:
1029 return
1030 }
1031 }
1032}
1033
1034// Close starts shutting down the http2Server transport.
1035// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1036// could cause some resource issue. Revisit this later.
1037func (t *http2Server) Close() error {
1038 t.mu.Lock()
1039 if t.state == closing {
1040 t.mu.Unlock()
1041 return errors.New("transport: Close() was already called")
1042 }
1043 t.state = closing
1044 streams := t.activeStreams
1045 t.activeStreams = nil
1046 t.mu.Unlock()
1047 t.controlBuf.finish()
1048 close(t.done)
1049 err := t.conn.Close()
1050 if channelz.IsOn() {
1051 channelz.RemoveEntry(t.channelzID)
1052 }
1053 // Cancel all active streams.
1054 for _, s := range streams {
1055 s.cancel()
1056 }
1057 if t.stats != nil {
1058 connEnd := &stats.ConnEnd{}
1059 t.stats.HandleConn(t.ctx, connEnd)
1060 }
1061 return err
1062}
1063
1064// deleteStream deletes the stream s from transport's active streams.
1065func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1066 // In case stream sending and receiving are invoked in separate
1067 // goroutines (e.g., bi-directional streaming), cancel needs to be
1068 // called to interrupt the potential blocking on other goroutines.
1069 s.cancel()
1070
1071 t.mu.Lock()
1072 if _, ok := t.activeStreams[s.id]; ok {
1073 delete(t.activeStreams, s.id)
1074 if len(t.activeStreams) == 0 {
1075 t.idle = time.Now()
1076 }
1077 }
1078 t.mu.Unlock()
1079
1080 if channelz.IsOn() {
1081 if eosReceived {
1082 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1083 } else {
1084 atomic.AddInt64(&t.czData.streamsFailed, 1)
1085 }
1086 }
1087}
1088
1089// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1090func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1091 oldState := s.swapState(streamDone)
1092 if oldState == streamDone {
1093 // If the stream was already done, return.
1094 return
1095 }
1096
1097 hdr.cleanup = &cleanupStream{
1098 streamID: s.id,
1099 rst: rst,
1100 rstCode: rstCode,
1101 onWrite: func() {
1102 t.deleteStream(s, eosReceived)
1103 },
1104 }
1105 t.controlBuf.put(hdr)
1106}
1107
1108// closeStream clears the footprint of a stream when the stream is not needed any more.
1109func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1110 s.swapState(streamDone)
1111 t.deleteStream(s, eosReceived)
1112
1113 t.controlBuf.put(&cleanupStream{
1114 streamID: s.id,
1115 rst: rst,
1116 rstCode: rstCode,
1117 onWrite: func() {},
1118 })
1119}
1120
1121func (t *http2Server) RemoteAddr() net.Addr {
1122 return t.remoteAddr
1123}
1124
1125func (t *http2Server) Drain() {
1126 t.drain(http2.ErrCodeNo, []byte{})
1127}
1128
1129func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1130 t.mu.Lock()
1131 defer t.mu.Unlock()
1132 if t.drainChan != nil {
1133 return
1134 }
1135 t.drainChan = make(chan struct{})
1136 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1137}
1138
1139var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1140
1141// Handles outgoing GoAway and returns true if loopy needs to put itself
1142// in draining mode.
1143func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1144 t.mu.Lock()
1145 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1146 t.mu.Unlock()
1147 // The transport is closing.
1148 return false, ErrConnClosing
1149 }
1150 sid := t.maxStreamID
1151 if !g.headsUp {
1152 // Stop accepting more streams now.
1153 t.state = draining
1154 if len(t.activeStreams) == 0 {
1155 g.closeConn = true
1156 }
1157 t.mu.Unlock()
1158 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1159 return false, err
1160 }
1161 if g.closeConn {
1162 // Abruptly close the connection following the GoAway (via
1163 // loopywriter). But flush out what's inside the buffer first.
1164 t.framer.writer.Flush()
1165 return false, fmt.Errorf("transport: Connection closing")
1166 }
1167 return true, nil
1168 }
1169 t.mu.Unlock()
1170 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1171 // Follow that with a ping and wait for the ack to come back or a timer
1172 // to expire. During this time accept new streams since they might have
1173 // originated before the GoAway reaches the client.
1174 // After getting the ack or timer expiration send out another GoAway this
1175 // time with an ID of the max stream server intends to process.
1176 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1177 return false, err
1178 }
1179 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1180 return false, err
1181 }
1182 go func() {
1183 timer := time.NewTimer(time.Minute)
1184 defer timer.Stop()
1185 select {
1186 case <-t.drainChan:
1187 case <-timer.C:
1188 case <-t.done:
1189 return
1190 }
1191 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1192 }()
1193 return false, nil
1194}
1195
1196func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1197 s := channelz.SocketInternalMetric{
1198 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1199 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1200 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1201 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1202 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1203 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1204 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1205 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1206 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1207 LocalFlowControlWindow: int64(t.fc.getSize()),
1208 SocketOptions: channelz.GetSocketOption(t.conn),
1209 LocalAddr: t.localAddr,
1210 RemoteAddr: t.remoteAddr,
1211 // RemoteName :
1212 }
1213 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1214 s.Security = au.GetSecurityValue()
1215 }
1216 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1217 return &s
1218}
1219
1220func (t *http2Server) IncrMsgSent() {
1221 atomic.AddInt64(&t.czData.msgSent, 1)
1222 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1223}
1224
1225func (t *http2Server) IncrMsgRecv() {
1226 atomic.AddInt64(&t.czData.msgRecv, 1)
1227 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1228}
1229
1230func (t *http2Server) getOutFlowWindow() int64 {
1231 resp := make(chan uint32, 1)
1232 timer := time.NewTimer(time.Second)
1233 defer timer.Stop()
1234 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1235 select {
1236 case sz := <-resp:
1237 return int64(sz)
1238 case <-t.done:
1239 return -1
1240 case <-timer.C:
1241 return -2
1242 }
1243}
1244
1245func getJitter(v time.Duration) time.Duration {
1246 if v == infinity {
1247 return 0
1248 }
1249 // Generate a jitter between +/- 10% of the value.
1250 r := int64(v / 10)
1251 j := grpcrand.Int63n(2*r) - r
1252 return time.Duration(j)
1253}