blob: fa33ffb18856f2ad2f1b0a90081781d2d5f179de [file] [log] [blame]
Scott Baker105df152020-04-13 15:55:14 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "strconv"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "github.com/golang/protobuf/proto"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/grpclog"
41 "google.golang.org/grpc/internal/channelz"
42 "google.golang.org/grpc/internal/grpcrand"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/metadata"
45 "google.golang.org/grpc/peer"
46 "google.golang.org/grpc/stats"
47 "google.golang.org/grpc/status"
48 "google.golang.org/grpc/tap"
49)
50
51var (
52 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
53 // the stream's state.
54 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
55 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
56 // than the limit set by peer.
57 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
58)
59
60// serverConnectionCounter counts the number of connections a server has seen
61// (equal to the number of http2Servers created). Must be accessed atomically.
62var serverConnectionCounter uint64
63
64// http2Server implements the ServerTransport interface with HTTP2.
65type http2Server struct {
66 lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
67 ctx context.Context
68 done chan struct{}
69 conn net.Conn
70 loopy *loopyWriter
71 readerDone chan struct{} // sync point to enable testing.
72 writerDone chan struct{} // sync point to enable testing.
73 remoteAddr net.Addr
74 localAddr net.Addr
75 maxStreamID uint32 // max stream ID ever seen
76 authInfo credentials.AuthInfo // auth info about the connection
77 inTapHandle tap.ServerInHandle
78 framer *framer
79 // The max number of concurrent streams.
80 maxStreams uint32
81 // controlBuf delivers all the control related tasks (e.g., window
82 // updates, reset streams, and various settings) to the controller.
83 controlBuf *controlBuffer
84 fc *trInFlow
85 stats stats.Handler
86 // Keepalive and max-age parameters for the server.
87 kp keepalive.ServerParameters
88 // Keepalive enforcement policy.
89 kep keepalive.EnforcementPolicy
90 // The time instance last ping was received.
91 lastPingAt time.Time
92 // Number of times the client has violated keepalive ping policy so far.
93 pingStrikes uint8
94 // Flag to signify that number of ping strikes should be reset to 0.
95 // This is set whenever data or header frames are sent.
96 // 1 means yes.
97 resetPingStrikes uint32 // Accessed atomically.
98 initialWindowSize int32
99 bdpEst *bdpEstimator
100 maxSendHeaderListSize *uint32
101
102 mu sync.Mutex // guard the following
103
104 // drainChan is initialized when drain(...) is called the first time.
105 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
106 // Then an independent goroutine will be launched to later send the second GoAway.
107 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
108 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
109 // already underway.
110 drainChan chan struct{}
111 state transportState
112 activeStreams map[uint32]*Stream
113 // idle is the time instant when the connection went idle.
114 // This is either the beginning of the connection or when the number of
115 // RPCs go down to 0.
116 // When the connection is busy, this value is set to 0.
117 idle time.Time
118
119 // Fields below are for channelz metric collection.
120 channelzID int64 // channelz unique identification number
121 czData *channelzData
122 bufferPool *bufferPool
123
124 connectionID uint64
125}
126
127// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
128// returned if something goes wrong.
129func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
130 writeBufSize := config.WriteBufferSize
131 readBufSize := config.ReadBufferSize
132 maxHeaderListSize := defaultServerMaxHeaderListSize
133 if config.MaxHeaderListSize != nil {
134 maxHeaderListSize = *config.MaxHeaderListSize
135 }
136 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
137 // Send initial settings as connection preface to client.
138 isettings := []http2.Setting{{
139 ID: http2.SettingMaxFrameSize,
140 Val: http2MaxFrameLen,
141 }}
142 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
143 // permitted in the HTTP2 spec.
144 maxStreams := config.MaxStreams
145 if maxStreams == 0 {
146 maxStreams = math.MaxUint32
147 } else {
148 isettings = append(isettings, http2.Setting{
149 ID: http2.SettingMaxConcurrentStreams,
150 Val: maxStreams,
151 })
152 }
153 dynamicWindow := true
154 iwz := int32(initialWindowSize)
155 if config.InitialWindowSize >= defaultWindowSize {
156 iwz = config.InitialWindowSize
157 dynamicWindow = false
158 }
159 icwz := int32(initialWindowSize)
160 if config.InitialConnWindowSize >= defaultWindowSize {
161 icwz = config.InitialConnWindowSize
162 dynamicWindow = false
163 }
164 if iwz != defaultWindowSize {
165 isettings = append(isettings, http2.Setting{
166 ID: http2.SettingInitialWindowSize,
167 Val: uint32(iwz)})
168 }
169 if config.MaxHeaderListSize != nil {
170 isettings = append(isettings, http2.Setting{
171 ID: http2.SettingMaxHeaderListSize,
172 Val: *config.MaxHeaderListSize,
173 })
174 }
175 if config.HeaderTableSize != nil {
176 isettings = append(isettings, http2.Setting{
177 ID: http2.SettingHeaderTableSize,
178 Val: *config.HeaderTableSize,
179 })
180 }
181 if err := framer.fr.WriteSettings(isettings...); err != nil {
182 return nil, connectionErrorf(false, err, "transport: %v", err)
183 }
184 // Adjust the connection flow control window if needed.
185 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
186 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
187 return nil, connectionErrorf(false, err, "transport: %v", err)
188 }
189 }
190 kp := config.KeepaliveParams
191 if kp.MaxConnectionIdle == 0 {
192 kp.MaxConnectionIdle = defaultMaxConnectionIdle
193 }
194 if kp.MaxConnectionAge == 0 {
195 kp.MaxConnectionAge = defaultMaxConnectionAge
196 }
197 // Add a jitter to MaxConnectionAge.
198 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
199 if kp.MaxConnectionAgeGrace == 0 {
200 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
201 }
202 if kp.Time == 0 {
203 kp.Time = defaultServerKeepaliveTime
204 }
205 if kp.Timeout == 0 {
206 kp.Timeout = defaultServerKeepaliveTimeout
207 }
208 kep := config.KeepalivePolicy
209 if kep.MinTime == 0 {
210 kep.MinTime = defaultKeepalivePolicyMinTime
211 }
212 done := make(chan struct{})
213 t := &http2Server{
214 ctx: context.Background(),
215 done: done,
216 conn: conn,
217 remoteAddr: conn.RemoteAddr(),
218 localAddr: conn.LocalAddr(),
219 authInfo: config.AuthInfo,
220 framer: framer,
221 readerDone: make(chan struct{}),
222 writerDone: make(chan struct{}),
223 maxStreams: maxStreams,
224 inTapHandle: config.InTapHandle,
225 fc: &trInFlow{limit: uint32(icwz)},
226 state: reachable,
227 activeStreams: make(map[uint32]*Stream),
228 stats: config.StatsHandler,
229 kp: kp,
230 idle: time.Now(),
231 kep: kep,
232 initialWindowSize: iwz,
233 czData: new(channelzData),
234 bufferPool: newBufferPool(),
235 }
236 t.controlBuf = newControlBuffer(t.done)
237 if dynamicWindow {
238 t.bdpEst = &bdpEstimator{
239 bdp: initialWindowSize,
240 updateFlowControl: t.updateFlowControl,
241 }
242 }
243 if t.stats != nil {
244 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
245 RemoteAddr: t.remoteAddr,
246 LocalAddr: t.localAddr,
247 })
248 connBegin := &stats.ConnBegin{}
249 t.stats.HandleConn(t.ctx, connBegin)
250 }
251 if channelz.IsOn() {
252 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
253 }
254
255 t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
256
257 t.framer.writer.Flush()
258
259 defer func() {
260 if err != nil {
261 t.Close()
262 }
263 }()
264
265 // Check the validity of client preface.
266 preface := make([]byte, len(clientPreface))
267 if _, err := io.ReadFull(t.conn, preface); err != nil {
268 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
269 }
270 if !bytes.Equal(preface, clientPreface) {
271 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
272 }
273
274 frame, err := t.framer.fr.ReadFrame()
275 if err == io.EOF || err == io.ErrUnexpectedEOF {
276 return nil, err
277 }
278 if err != nil {
279 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
280 }
281 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
282 sf, ok := frame.(*http2.SettingsFrame)
283 if !ok {
284 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
285 }
286 t.handleSettings(sf)
287
288 go func() {
289 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
290 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
291 if err := t.loopy.run(); err != nil {
292 errorf("transport: loopyWriter.run returning. Err: %v", err)
293 }
294 t.conn.Close()
295 close(t.writerDone)
296 }()
297 go t.keepalive()
298 return t, nil
299}
300
301// operateHeader takes action on the decoded headers.
302func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
303 streamID := frame.Header().StreamID
304 state := &decodeState{
305 serverSide: true,
306 }
307 if err := state.decodeHeader(frame); err != nil {
308 if se, ok := status.FromError(err); ok {
309 t.controlBuf.put(&cleanupStream{
310 streamID: streamID,
311 rst: true,
312 rstCode: statusCodeConvTab[se.Code()],
313 onWrite: func() {},
314 })
315 }
316 return false
317 }
318
319 buf := newRecvBuffer()
320 s := &Stream{
321 id: streamID,
322 st: t,
323 buf: buf,
324 fc: &inFlow{limit: uint32(t.initialWindowSize)},
325 recvCompress: state.data.encoding,
326 method: state.data.method,
327 contentSubtype: state.data.contentSubtype,
328 }
329 if frame.StreamEnded() {
330 // s is just created by the caller. No lock needed.
331 s.state = streamReadDone
332 }
333 if state.data.timeoutSet {
334 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
335 } else {
336 s.ctx, s.cancel = context.WithCancel(t.ctx)
337 }
338 pr := &peer.Peer{
339 Addr: t.remoteAddr,
340 }
341 // Attach Auth info if there is any.
342 if t.authInfo != nil {
343 pr.AuthInfo = t.authInfo
344 }
345 s.ctx = peer.NewContext(s.ctx, pr)
346 // Attach the received metadata to the context.
347 if len(state.data.mdata) > 0 {
348 s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
349 }
350 if state.data.statsTags != nil {
351 s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
352 }
353 if state.data.statsTrace != nil {
354 s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
355 }
356 if t.inTapHandle != nil {
357 var err error
358 info := &tap.Info{
359 FullMethodName: state.data.method,
360 }
361 s.ctx, err = t.inTapHandle(s.ctx, info)
362 if err != nil {
363 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
364 t.controlBuf.put(&cleanupStream{
365 streamID: s.id,
366 rst: true,
367 rstCode: http2.ErrCodeRefusedStream,
368 onWrite: func() {},
369 })
370 s.cancel()
371 return false
372 }
373 }
374 t.mu.Lock()
375 if t.state != reachable {
376 t.mu.Unlock()
377 s.cancel()
378 return false
379 }
380 if uint32(len(t.activeStreams)) >= t.maxStreams {
381 t.mu.Unlock()
382 t.controlBuf.put(&cleanupStream{
383 streamID: streamID,
384 rst: true,
385 rstCode: http2.ErrCodeRefusedStream,
386 onWrite: func() {},
387 })
388 s.cancel()
389 return false
390 }
391 if streamID%2 != 1 || streamID <= t.maxStreamID {
392 t.mu.Unlock()
393 // illegal gRPC stream id.
394 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
395 s.cancel()
396 return true
397 }
398 t.maxStreamID = streamID
399 t.activeStreams[streamID] = s
400 if len(t.activeStreams) == 1 {
401 t.idle = time.Time{}
402 }
403 t.mu.Unlock()
404 if channelz.IsOn() {
405 atomic.AddInt64(&t.czData.streamsStarted, 1)
406 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
407 }
408 s.requestRead = func(n int) {
409 t.adjustWindow(s, uint32(n))
410 }
411 s.ctx = traceCtx(s.ctx, s.method)
412 if t.stats != nil {
413 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
414 inHeader := &stats.InHeader{
415 FullMethod: s.method,
416 RemoteAddr: t.remoteAddr,
417 LocalAddr: t.localAddr,
418 Compression: s.recvCompress,
419 WireLength: int(frame.Header().Length),
420 Header: metadata.MD(state.data.mdata).Copy(),
421 }
422 t.stats.HandleRPC(s.ctx, inHeader)
423 }
424 s.ctxDone = s.ctx.Done()
425 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
426 s.trReader = &transportReader{
427 reader: &recvBufferReader{
428 ctx: s.ctx,
429 ctxDone: s.ctxDone,
430 recv: s.buf,
431 freeBuffer: t.bufferPool.put,
432 },
433 windowHandler: func(n int) {
434 t.updateWindow(s, uint32(n))
435 },
436 }
437 // Register the stream with loopy.
438 t.controlBuf.put(&registerStream{
439 streamID: s.id,
440 wq: s.wq,
441 })
442 handle(s)
443 return false
444}
445
446// HandleStreams receives incoming streams using the given handler. This is
447// typically run in a separate goroutine.
448// traceCtx attaches trace to ctx and returns the new context.
449func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
450 defer close(t.readerDone)
451 for {
452 t.controlBuf.throttle()
453 frame, err := t.framer.fr.ReadFrame()
454 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
455 if err != nil {
456 if se, ok := err.(http2.StreamError); ok {
457 warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
458 t.mu.Lock()
459 s := t.activeStreams[se.StreamID]
460 t.mu.Unlock()
461 if s != nil {
462 t.closeStream(s, true, se.Code, false)
463 } else {
464 t.controlBuf.put(&cleanupStream{
465 streamID: se.StreamID,
466 rst: true,
467 rstCode: se.Code,
468 onWrite: func() {},
469 })
470 }
471 continue
472 }
473 if err == io.EOF || err == io.ErrUnexpectedEOF {
474 t.Close()
475 return
476 }
477 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
478 t.Close()
479 return
480 }
481 switch frame := frame.(type) {
482 case *http2.MetaHeadersFrame:
483 if t.operateHeaders(frame, handle, traceCtx) {
484 t.Close()
485 break
486 }
487 case *http2.DataFrame:
488 t.handleData(frame)
489 case *http2.RSTStreamFrame:
490 t.handleRSTStream(frame)
491 case *http2.SettingsFrame:
492 t.handleSettings(frame)
493 case *http2.PingFrame:
494 t.handlePing(frame)
495 case *http2.WindowUpdateFrame:
496 t.handleWindowUpdate(frame)
497 case *http2.GoAwayFrame:
498 // TODO: Handle GoAway from the client appropriately.
499 default:
500 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
501 }
502 }
503}
504
505func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
506 t.mu.Lock()
507 defer t.mu.Unlock()
508 if t.activeStreams == nil {
509 // The transport is closing.
510 return nil, false
511 }
512 s, ok := t.activeStreams[f.Header().StreamID]
513 if !ok {
514 // The stream is already done.
515 return nil, false
516 }
517 return s, true
518}
519
520// adjustWindow sends out extra window update over the initial window size
521// of stream if the application is requesting data larger in size than
522// the window.
523func (t *http2Server) adjustWindow(s *Stream, n uint32) {
524 if w := s.fc.maybeAdjust(n); w > 0 {
525 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
526 }
527
528}
529
530// updateWindow adjusts the inbound quota for the stream and the transport.
531// Window updates will deliver to the controller for sending when
532// the cumulative quota exceeds the corresponding threshold.
533func (t *http2Server) updateWindow(s *Stream, n uint32) {
534 if w := s.fc.onRead(n); w > 0 {
535 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
536 increment: w,
537 })
538 }
539}
540
541// updateFlowControl updates the incoming flow control windows
542// for the transport and the stream based on the current bdp
543// estimation.
544func (t *http2Server) updateFlowControl(n uint32) {
545 t.mu.Lock()
546 for _, s := range t.activeStreams {
547 s.fc.newLimit(n)
548 }
549 t.initialWindowSize = int32(n)
550 t.mu.Unlock()
551 t.controlBuf.put(&outgoingWindowUpdate{
552 streamID: 0,
553 increment: t.fc.newLimit(n),
554 })
555 t.controlBuf.put(&outgoingSettings{
556 ss: []http2.Setting{
557 {
558 ID: http2.SettingInitialWindowSize,
559 Val: n,
560 },
561 },
562 })
563
564}
565
566func (t *http2Server) handleData(f *http2.DataFrame) {
567 size := f.Header().Length
568 var sendBDPPing bool
569 if t.bdpEst != nil {
570 sendBDPPing = t.bdpEst.add(size)
571 }
572 // Decouple connection's flow control from application's read.
573 // An update on connection's flow control should not depend on
574 // whether user application has read the data or not. Such a
575 // restriction is already imposed on the stream's flow control,
576 // and therefore the sender will be blocked anyways.
577 // Decoupling the connection flow control will prevent other
578 // active(fast) streams from starving in presence of slow or
579 // inactive streams.
580 if w := t.fc.onData(size); w > 0 {
581 t.controlBuf.put(&outgoingWindowUpdate{
582 streamID: 0,
583 increment: w,
584 })
585 }
586 if sendBDPPing {
587 // Avoid excessive ping detection (e.g. in an L7 proxy)
588 // by sending a window update prior to the BDP ping.
589 if w := t.fc.reset(); w > 0 {
590 t.controlBuf.put(&outgoingWindowUpdate{
591 streamID: 0,
592 increment: w,
593 })
594 }
595 t.controlBuf.put(bdpPing)
596 }
597 // Select the right stream to dispatch.
598 s, ok := t.getStream(f)
599 if !ok {
600 return
601 }
602 if size > 0 {
603 if err := s.fc.onData(size); err != nil {
604 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
605 return
606 }
607 if f.Header().Flags.Has(http2.FlagDataPadded) {
608 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
609 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
610 }
611 }
612 // TODO(bradfitz, zhaoq): A copy is required here because there is no
613 // guarantee f.Data() is consumed before the arrival of next frame.
614 // Can this copy be eliminated?
615 if len(f.Data()) > 0 {
616 buffer := t.bufferPool.get()
617 buffer.Reset()
618 buffer.Write(f.Data())
619 s.write(recvMsg{buffer: buffer})
620 }
621 }
622 if f.Header().Flags.Has(http2.FlagDataEndStream) {
623 // Received the end of stream from the client.
624 s.compareAndSwapState(streamActive, streamReadDone)
625 s.write(recvMsg{err: io.EOF})
626 }
627}
628
629func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
630 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
631 if s, ok := t.getStream(f); ok {
632 t.closeStream(s, false, 0, false)
633 return
634 }
635 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
636 t.controlBuf.put(&cleanupStream{
637 streamID: f.Header().StreamID,
638 rst: false,
639 rstCode: 0,
640 onWrite: func() {},
641 })
642}
643
644func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
645 if f.IsAck() {
646 return
647 }
648 var ss []http2.Setting
649 var updateFuncs []func()
650 f.ForeachSetting(func(s http2.Setting) error {
651 switch s.ID {
652 case http2.SettingMaxHeaderListSize:
653 updateFuncs = append(updateFuncs, func() {
654 t.maxSendHeaderListSize = new(uint32)
655 *t.maxSendHeaderListSize = s.Val
656 })
657 default:
658 ss = append(ss, s)
659 }
660 return nil
661 })
662 t.controlBuf.executeAndPut(func(interface{}) bool {
663 for _, f := range updateFuncs {
664 f()
665 }
666 return true
667 }, &incomingSettings{
668 ss: ss,
669 })
670}
671
672const (
673 maxPingStrikes = 2
674 defaultPingTimeout = 2 * time.Hour
675)
676
677func (t *http2Server) handlePing(f *http2.PingFrame) {
678 if f.IsAck() {
679 if f.Data == goAwayPing.data && t.drainChan != nil {
680 close(t.drainChan)
681 return
682 }
683 // Maybe it's a BDP ping.
684 if t.bdpEst != nil {
685 t.bdpEst.calculate(f.Data)
686 }
687 return
688 }
689 pingAck := &ping{ack: true}
690 copy(pingAck.data[:], f.Data[:])
691 t.controlBuf.put(pingAck)
692
693 now := time.Now()
694 defer func() {
695 t.lastPingAt = now
696 }()
697 // A reset ping strikes means that we don't need to check for policy
698 // violation for this ping and the pingStrikes counter should be set
699 // to 0.
700 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
701 t.pingStrikes = 0
702 return
703 }
704 t.mu.Lock()
705 ns := len(t.activeStreams)
706 t.mu.Unlock()
707 if ns < 1 && !t.kep.PermitWithoutStream {
708 // Keepalive shouldn't be active thus, this new ping should
709 // have come after at least defaultPingTimeout.
710 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
711 t.pingStrikes++
712 }
713 } else {
714 // Check if keepalive policy is respected.
715 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
716 t.pingStrikes++
717 }
718 }
719
720 if t.pingStrikes > maxPingStrikes {
721 // Send goaway and close the connection.
722 errorf("transport: Got too many pings from the client, closing the connection.")
723 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
724 }
725}
726
727func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
728 t.controlBuf.put(&incomingWindowUpdate{
729 streamID: f.Header().StreamID,
730 increment: f.Increment,
731 })
732}
733
734func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
735 for k, vv := range md {
736 if isReservedHeader(k) {
737 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
738 continue
739 }
740 for _, v := range vv {
741 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
742 }
743 }
744 return headerFields
745}
746
747func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
748 if t.maxSendHeaderListSize == nil {
749 return true
750 }
751 hdrFrame := it.(*headerFrame)
752 var sz int64
753 for _, f := range hdrFrame.hf {
754 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
755 errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
756 return false
757 }
758 }
759 return true
760}
761
762// WriteHeader sends the header metadata md back to the client.
763func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
764 if s.updateHeaderSent() || s.getState() == streamDone {
765 return ErrIllegalHeaderWrite
766 }
767 s.hdrMu.Lock()
768 if md.Len() > 0 {
769 if s.header.Len() > 0 {
770 s.header = metadata.Join(s.header, md)
771 } else {
772 s.header = md
773 }
774 }
775 if err := t.writeHeaderLocked(s); err != nil {
776 s.hdrMu.Unlock()
777 return err
778 }
779 s.hdrMu.Unlock()
780 return nil
781}
782
783func (t *http2Server) setResetPingStrikes() {
784 atomic.StoreUint32(&t.resetPingStrikes, 1)
785}
786
787func (t *http2Server) writeHeaderLocked(s *Stream) error {
788 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
789 // first and create a slice of that exact size.
790 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
791 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
792 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
793 if s.sendCompress != "" {
794 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
795 }
796 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
797 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
798 streamID: s.id,
799 hf: headerFields,
800 endStream: false,
801 onWrite: t.setResetPingStrikes,
802 })
803 if !success {
804 if err != nil {
805 return err
806 }
807 t.closeStream(s, true, http2.ErrCodeInternal, false)
808 return ErrHeaderListSizeLimitViolation
809 }
810 if t.stats != nil {
811 // Note: Headers are compressed with hpack after this call returns.
812 // No WireLength field is set here.
813 outHeader := &stats.OutHeader{
814 Header: s.header.Copy(),
815 Compression: s.sendCompress,
816 }
817 t.stats.HandleRPC(s.Context(), outHeader)
818 }
819 return nil
820}
821
822// WriteStatus sends stream status to the client and terminates the stream.
823// There is no further I/O operations being able to perform on this stream.
824// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
825// OK is adopted.
826func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
827 if s.getState() == streamDone {
828 return nil
829 }
830 s.hdrMu.Lock()
831 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
832 // first and create a slice of that exact size.
833 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
834 if !s.updateHeaderSent() { // No headers have been sent.
835 if len(s.header) > 0 { // Send a separate header frame.
836 if err := t.writeHeaderLocked(s); err != nil {
837 s.hdrMu.Unlock()
838 return err
839 }
840 } else { // Send a trailer only response.
841 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
842 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
843 }
844 }
845 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
846 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
847
848 if p := st.Proto(); p != nil && len(p.Details) > 0 {
849 stBytes, err := proto.Marshal(p)
850 if err != nil {
851 // TODO: return error instead, when callers are able to handle it.
852 grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
853 } else {
854 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
855 }
856 }
857
858 // Attach the trailer metadata.
859 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
860 trailingHeader := &headerFrame{
861 streamID: s.id,
862 hf: headerFields,
863 endStream: true,
864 onWrite: t.setResetPingStrikes,
865 }
866 s.hdrMu.Unlock()
867 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
868 if !success {
869 if err != nil {
870 return err
871 }
872 t.closeStream(s, true, http2.ErrCodeInternal, false)
873 return ErrHeaderListSizeLimitViolation
874 }
875 // Send a RST_STREAM after the trailers if the client has not already half-closed.
876 rst := s.getState() == streamActive
877 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
878 if t.stats != nil {
879 // Note: The trailer fields are compressed with hpack after this call returns.
880 // No WireLength field is set here.
881 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
882 Trailer: s.trailer.Copy(),
883 })
884 }
885 return nil
886}
887
888// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
889// is returns if it fails (e.g., framing error, transport error).
890func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
891 if !s.isHeaderSent() { // Headers haven't been written yet.
892 if err := t.WriteHeader(s, nil); err != nil {
893 if _, ok := err.(ConnectionError); ok {
894 return err
895 }
896 // TODO(mmukhi, dfawley): Make sure this is the right code to return.
897 return status.Errorf(codes.Internal, "transport: %v", err)
898 }
899 } else {
900 // Writing headers checks for this condition.
901 if s.getState() == streamDone {
902 // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
903 s.cancel()
904 select {
905 case <-t.done:
906 return ErrConnClosing
907 default:
908 }
909 return ContextErr(s.ctx.Err())
910 }
911 }
912 // Add some data to header frame so that we can equally distribute bytes across frames.
913 emptyLen := http2MaxFrameLen - len(hdr)
914 if emptyLen > len(data) {
915 emptyLen = len(data)
916 }
917 hdr = append(hdr, data[:emptyLen]...)
918 data = data[emptyLen:]
919 df := &dataFrame{
920 streamID: s.id,
921 h: hdr,
922 d: data,
923 onEachWrite: t.setResetPingStrikes,
924 }
925 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
926 select {
927 case <-t.done:
928 return ErrConnClosing
929 default:
930 }
931 return ContextErr(s.ctx.Err())
932 }
933 return t.controlBuf.put(df)
934}
935
936// keepalive running in a separate goroutine does the following:
937// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
938// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
939// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
940// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
941// after an additional duration of keepalive.Timeout.
942func (t *http2Server) keepalive() {
943 p := &ping{}
944 // True iff a ping has been sent, and no data has been received since then.
945 outstandingPing := false
946 // Amount of time remaining before which we should receive an ACK for the
947 // last sent ping.
948 kpTimeoutLeft := time.Duration(0)
949 // Records the last value of t.lastRead before we go block on the timer.
950 // This is required to check for read activity since then.
951 prevNano := time.Now().UnixNano()
952 // Initialize the different timers to their default values.
953 idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
954 ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
955 kpTimer := time.NewTimer(t.kp.Time)
956 defer func() {
957 // We need to drain the underlying channel in these timers after a call
958 // to Stop(), only if we are interested in resetting them. Clearly we
959 // are not interested in resetting them here.
960 idleTimer.Stop()
961 ageTimer.Stop()
962 kpTimer.Stop()
963 }()
964
965 for {
966 select {
967 case <-idleTimer.C:
968 t.mu.Lock()
969 idle := t.idle
970 if idle.IsZero() { // The connection is non-idle.
971 t.mu.Unlock()
972 idleTimer.Reset(t.kp.MaxConnectionIdle)
973 continue
974 }
975 val := t.kp.MaxConnectionIdle - time.Since(idle)
976 t.mu.Unlock()
977 if val <= 0 {
978 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
979 // Gracefully close the connection.
980 t.drain(http2.ErrCodeNo, []byte{})
981 return
982 }
983 idleTimer.Reset(val)
984 case <-ageTimer.C:
985 t.drain(http2.ErrCodeNo, []byte{})
986 ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
987 select {
988 case <-ageTimer.C:
989 // Close the connection after grace period.
990 infof("transport: closing server transport due to maximum connection age.")
991 t.Close()
992 case <-t.done:
993 }
994 return
995 case <-kpTimer.C:
996 lastRead := atomic.LoadInt64(&t.lastRead)
997 if lastRead > prevNano {
998 // There has been read activity since the last time we were
999 // here. Setup the timer to fire at kp.Time seconds from
1000 // lastRead time and continue.
1001 outstandingPing = false
1002 kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1003 prevNano = lastRead
1004 continue
1005 }
1006 if outstandingPing && kpTimeoutLeft <= 0 {
1007 infof("transport: closing server transport due to idleness.")
1008 t.Close()
1009 return
1010 }
1011 if !outstandingPing {
1012 if channelz.IsOn() {
1013 atomic.AddInt64(&t.czData.kpCount, 1)
1014 }
1015 t.controlBuf.put(p)
1016 kpTimeoutLeft = t.kp.Timeout
1017 outstandingPing = true
1018 }
1019 // The amount of time to sleep here is the minimum of kp.Time and
1020 // timeoutLeft. This will ensure that we wait only for kp.Time
1021 // before sending out the next ping (for cases where the ping is
1022 // acked).
1023 sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
1024 kpTimeoutLeft -= sleepDuration
1025 kpTimer.Reset(sleepDuration)
1026 case <-t.done:
1027 return
1028 }
1029 }
1030}
1031
1032// Close starts shutting down the http2Server transport.
1033// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1034// could cause some resource issue. Revisit this later.
1035func (t *http2Server) Close() error {
1036 t.mu.Lock()
1037 if t.state == closing {
1038 t.mu.Unlock()
1039 return errors.New("transport: Close() was already called")
1040 }
1041 t.state = closing
1042 streams := t.activeStreams
1043 t.activeStreams = nil
1044 t.mu.Unlock()
1045 t.controlBuf.finish()
1046 close(t.done)
1047 err := t.conn.Close()
1048 if channelz.IsOn() {
1049 channelz.RemoveEntry(t.channelzID)
1050 }
1051 // Cancel all active streams.
1052 for _, s := range streams {
1053 s.cancel()
1054 }
1055 if t.stats != nil {
1056 connEnd := &stats.ConnEnd{}
1057 t.stats.HandleConn(t.ctx, connEnd)
1058 }
1059 return err
1060}
1061
1062// deleteStream deletes the stream s from transport's active streams.
1063func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1064 // In case stream sending and receiving are invoked in separate
1065 // goroutines (e.g., bi-directional streaming), cancel needs to be
1066 // called to interrupt the potential blocking on other goroutines.
1067 s.cancel()
1068
1069 t.mu.Lock()
1070 if _, ok := t.activeStreams[s.id]; ok {
1071 delete(t.activeStreams, s.id)
1072 if len(t.activeStreams) == 0 {
1073 t.idle = time.Now()
1074 }
1075 }
1076 t.mu.Unlock()
1077
1078 if channelz.IsOn() {
1079 if eosReceived {
1080 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1081 } else {
1082 atomic.AddInt64(&t.czData.streamsFailed, 1)
1083 }
1084 }
1085}
1086
1087// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1088func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1089 oldState := s.swapState(streamDone)
1090 if oldState == streamDone {
1091 // If the stream was already done, return.
1092 return
1093 }
1094
1095 hdr.cleanup = &cleanupStream{
1096 streamID: s.id,
1097 rst: rst,
1098 rstCode: rstCode,
1099 onWrite: func() {
1100 t.deleteStream(s, eosReceived)
1101 },
1102 }
1103 t.controlBuf.put(hdr)
1104}
1105
1106// closeStream clears the footprint of a stream when the stream is not needed any more.
1107func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1108 s.swapState(streamDone)
1109 t.deleteStream(s, eosReceived)
1110
1111 t.controlBuf.put(&cleanupStream{
1112 streamID: s.id,
1113 rst: rst,
1114 rstCode: rstCode,
1115 onWrite: func() {},
1116 })
1117}
1118
1119func (t *http2Server) RemoteAddr() net.Addr {
1120 return t.remoteAddr
1121}
1122
1123func (t *http2Server) Drain() {
1124 t.drain(http2.ErrCodeNo, []byte{})
1125}
1126
1127func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1128 t.mu.Lock()
1129 defer t.mu.Unlock()
1130 if t.drainChan != nil {
1131 return
1132 }
1133 t.drainChan = make(chan struct{})
1134 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1135}
1136
1137var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1138
1139// Handles outgoing GoAway and returns true if loopy needs to put itself
1140// in draining mode.
1141func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1142 t.mu.Lock()
1143 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1144 t.mu.Unlock()
1145 // The transport is closing.
1146 return false, ErrConnClosing
1147 }
1148 sid := t.maxStreamID
1149 if !g.headsUp {
1150 // Stop accepting more streams now.
1151 t.state = draining
1152 if len(t.activeStreams) == 0 {
1153 g.closeConn = true
1154 }
1155 t.mu.Unlock()
1156 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1157 return false, err
1158 }
1159 if g.closeConn {
1160 // Abruptly close the connection following the GoAway (via
1161 // loopywriter). But flush out what's inside the buffer first.
1162 t.framer.writer.Flush()
1163 return false, fmt.Errorf("transport: Connection closing")
1164 }
1165 return true, nil
1166 }
1167 t.mu.Unlock()
1168 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1169 // Follow that with a ping and wait for the ack to come back or a timer
1170 // to expire. During this time accept new streams since they might have
1171 // originated before the GoAway reaches the client.
1172 // After getting the ack or timer expiration send out another GoAway this
1173 // time with an ID of the max stream server intends to process.
1174 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1175 return false, err
1176 }
1177 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1178 return false, err
1179 }
1180 go func() {
1181 timer := time.NewTimer(time.Minute)
1182 defer timer.Stop()
1183 select {
1184 case <-t.drainChan:
1185 case <-timer.C:
1186 case <-t.done:
1187 return
1188 }
1189 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1190 }()
1191 return false, nil
1192}
1193
1194func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1195 s := channelz.SocketInternalMetric{
1196 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1197 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1198 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1199 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1200 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1201 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1202 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1203 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1204 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1205 LocalFlowControlWindow: int64(t.fc.getSize()),
1206 SocketOptions: channelz.GetSocketOption(t.conn),
1207 LocalAddr: t.localAddr,
1208 RemoteAddr: t.remoteAddr,
1209 // RemoteName :
1210 }
1211 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1212 s.Security = au.GetSecurityValue()
1213 }
1214 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1215 return &s
1216}
1217
1218func (t *http2Server) IncrMsgSent() {
1219 atomic.AddInt64(&t.czData.msgSent, 1)
1220 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1221}
1222
1223func (t *http2Server) IncrMsgRecv() {
1224 atomic.AddInt64(&t.czData.msgRecv, 1)
1225 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1226}
1227
1228func (t *http2Server) getOutFlowWindow() int64 {
1229 resp := make(chan uint32, 1)
1230 timer := time.NewTimer(time.Second)
1231 defer timer.Stop()
1232 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1233 select {
1234 case sz := <-resp:
1235 return int64(sz)
1236 case <-t.done:
1237 return -1
1238 case <-timer.C:
1239 return -2
1240 }
1241}
1242
1243func getJitter(v time.Duration) time.Duration {
1244 if v == infinity {
1245 return 0
1246 }
1247 // Generate a jitter between +/- 10% of the value.
1248 r := int64(v / 10)
1249 j := grpcrand.Int63n(2*r) - r
1250 return time.Duration(j)
1251}