blob: 4e26f6a1d6b519f206b45d9122041f6852a9ae34 [file] [log] [blame]
Don Newton98fd8812019-09-23 15:15:02 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "strconv"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "github.com/golang/protobuf/proto"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37
38 spb "google.golang.org/genproto/googleapis/rpc/status"
39 "google.golang.org/grpc/codes"
40 "google.golang.org/grpc/credentials"
41 "google.golang.org/grpc/grpclog"
42 "google.golang.org/grpc/internal"
43 "google.golang.org/grpc/internal/channelz"
44 "google.golang.org/grpc/internal/grpcrand"
45 "google.golang.org/grpc/keepalive"
46 "google.golang.org/grpc/metadata"
47 "google.golang.org/grpc/peer"
48 "google.golang.org/grpc/stats"
49 "google.golang.org/grpc/status"
50 "google.golang.org/grpc/tap"
51)
52
53var (
54 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
55 // the stream's state.
56 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
57 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
58 // than the limit set by peer.
59 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
60 // statusRawProto is a function to get to the raw status proto wrapped in a
61 // status.Status without a proto.Clone().
62 statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
63)
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67 ctx context.Context
68 ctxDone <-chan struct{} // Cache the context.Done() chan
69 cancel context.CancelFunc
70 conn net.Conn
71 loopy *loopyWriter
72 readerDone chan struct{} // sync point to enable testing.
73 writerDone chan struct{} // sync point to enable testing.
74 remoteAddr net.Addr
75 localAddr net.Addr
76 maxStreamID uint32 // max stream ID ever seen
77 authInfo credentials.AuthInfo // auth info about the connection
78 inTapHandle tap.ServerInHandle
79 framer *framer
80 // The max number of concurrent streams.
81 maxStreams uint32
82 // controlBuf delivers all the control related tasks (e.g., window
83 // updates, reset streams, and various settings) to the controller.
84 controlBuf *controlBuffer
85 fc *trInFlow
86 stats stats.Handler
87 // Flag to keep track of reading activity on transport.
88 // 1 is true and 0 is false.
89 activity uint32 // Accessed atomically.
90 // Keepalive and max-age parameters for the server.
91 kp keepalive.ServerParameters
92
93 // Keepalive enforcement policy.
94 kep keepalive.EnforcementPolicy
95 // The time instance last ping was received.
96 lastPingAt time.Time
97 // Number of times the client has violated keepalive ping policy so far.
98 pingStrikes uint8
99 // Flag to signify that number of ping strikes should be reset to 0.
100 // This is set whenever data or header frames are sent.
101 // 1 means yes.
102 resetPingStrikes uint32 // Accessed atomically.
103 initialWindowSize int32
104 bdpEst *bdpEstimator
105 maxSendHeaderListSize *uint32
106
107 mu sync.Mutex // guard the following
108
109 // drainChan is initialized when drain(...) is called the first time.
110 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
111 // Then an independent goroutine will be launched to later send the second GoAway.
112 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
113 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
114 // already underway.
115 drainChan chan struct{}
116 state transportState
117 activeStreams map[uint32]*Stream
118 // idle is the time instant when the connection went idle.
119 // This is either the beginning of the connection or when the number of
120 // RPCs go down to 0.
121 // When the connection is busy, this value is set to 0.
122 idle time.Time
123
124 // Fields below are for channelz metric collection.
125 channelzID int64 // channelz unique identification number
126 czData *channelzData
127 bufferPool *bufferPool
128}
129
130// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
131// returned if something goes wrong.
132func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
133 writeBufSize := config.WriteBufferSize
134 readBufSize := config.ReadBufferSize
135 maxHeaderListSize := defaultServerMaxHeaderListSize
136 if config.MaxHeaderListSize != nil {
137 maxHeaderListSize = *config.MaxHeaderListSize
138 }
139 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
140 // Send initial settings as connection preface to client.
141 isettings := []http2.Setting{{
142 ID: http2.SettingMaxFrameSize,
143 Val: http2MaxFrameLen,
144 }}
145 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
146 // permitted in the HTTP2 spec.
147 maxStreams := config.MaxStreams
148 if maxStreams == 0 {
149 maxStreams = math.MaxUint32
150 } else {
151 isettings = append(isettings, http2.Setting{
152 ID: http2.SettingMaxConcurrentStreams,
153 Val: maxStreams,
154 })
155 }
156 dynamicWindow := true
157 iwz := int32(initialWindowSize)
158 if config.InitialWindowSize >= defaultWindowSize {
159 iwz = config.InitialWindowSize
160 dynamicWindow = false
161 }
162 icwz := int32(initialWindowSize)
163 if config.InitialConnWindowSize >= defaultWindowSize {
164 icwz = config.InitialConnWindowSize
165 dynamicWindow = false
166 }
167 if iwz != defaultWindowSize {
168 isettings = append(isettings, http2.Setting{
169 ID: http2.SettingInitialWindowSize,
170 Val: uint32(iwz)})
171 }
172 if config.MaxHeaderListSize != nil {
173 isettings = append(isettings, http2.Setting{
174 ID: http2.SettingMaxHeaderListSize,
175 Val: *config.MaxHeaderListSize,
176 })
177 }
178 if err := framer.fr.WriteSettings(isettings...); err != nil {
179 return nil, connectionErrorf(false, err, "transport: %v", err)
180 }
181 // Adjust the connection flow control window if needed.
182 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
183 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
184 return nil, connectionErrorf(false, err, "transport: %v", err)
185 }
186 }
187 kp := config.KeepaliveParams
188 if kp.MaxConnectionIdle == 0 {
189 kp.MaxConnectionIdle = defaultMaxConnectionIdle
190 }
191 if kp.MaxConnectionAge == 0 {
192 kp.MaxConnectionAge = defaultMaxConnectionAge
193 }
194 // Add a jitter to MaxConnectionAge.
195 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
196 if kp.MaxConnectionAgeGrace == 0 {
197 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
198 }
199 if kp.Time == 0 {
200 kp.Time = defaultServerKeepaliveTime
201 }
202 if kp.Timeout == 0 {
203 kp.Timeout = defaultServerKeepaliveTimeout
204 }
205 kep := config.KeepalivePolicy
206 if kep.MinTime == 0 {
207 kep.MinTime = defaultKeepalivePolicyMinTime
208 }
209 ctx, cancel := context.WithCancel(context.Background())
210 t := &http2Server{
211 ctx: ctx,
212 cancel: cancel,
213 ctxDone: ctx.Done(),
214 conn: conn,
215 remoteAddr: conn.RemoteAddr(),
216 localAddr: conn.LocalAddr(),
217 authInfo: config.AuthInfo,
218 framer: framer,
219 readerDone: make(chan struct{}),
220 writerDone: make(chan struct{}),
221 maxStreams: maxStreams,
222 inTapHandle: config.InTapHandle,
223 fc: &trInFlow{limit: uint32(icwz)},
224 state: reachable,
225 activeStreams: make(map[uint32]*Stream),
226 stats: config.StatsHandler,
227 kp: kp,
228 idle: time.Now(),
229 kep: kep,
230 initialWindowSize: iwz,
231 czData: new(channelzData),
232 bufferPool: newBufferPool(),
233 }
234 t.controlBuf = newControlBuffer(t.ctxDone)
235 if dynamicWindow {
236 t.bdpEst = &bdpEstimator{
237 bdp: initialWindowSize,
238 updateFlowControl: t.updateFlowControl,
239 }
240 }
241 if t.stats != nil {
242 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
243 RemoteAddr: t.remoteAddr,
244 LocalAddr: t.localAddr,
245 })
246 connBegin := &stats.ConnBegin{}
247 t.stats.HandleConn(t.ctx, connBegin)
248 }
249 if channelz.IsOn() {
250 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
251 }
252 t.framer.writer.Flush()
253
254 defer func() {
255 if err != nil {
256 t.Close()
257 }
258 }()
259
260 // Check the validity of client preface.
261 preface := make([]byte, len(clientPreface))
262 if _, err := io.ReadFull(t.conn, preface); err != nil {
263 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
264 }
265 if !bytes.Equal(preface, clientPreface) {
266 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
267 }
268
269 frame, err := t.framer.fr.ReadFrame()
270 if err == io.EOF || err == io.ErrUnexpectedEOF {
271 return nil, err
272 }
273 if err != nil {
274 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
275 }
276 atomic.StoreUint32(&t.activity, 1)
277 sf, ok := frame.(*http2.SettingsFrame)
278 if !ok {
279 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
280 }
281 t.handleSettings(sf)
282
283 go func() {
284 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
285 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
286 if err := t.loopy.run(); err != nil {
287 errorf("transport: loopyWriter.run returning. Err: %v", err)
288 }
289 t.conn.Close()
290 close(t.writerDone)
291 }()
292 go t.keepalive()
293 return t, nil
294}
295
296// operateHeader takes action on the decoded headers.
297func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
298 streamID := frame.Header().StreamID
299 state := &decodeState{
300 serverSide: true,
301 }
302 if err := state.decodeHeader(frame); err != nil {
303 if se, ok := status.FromError(err); ok {
304 t.controlBuf.put(&cleanupStream{
305 streamID: streamID,
306 rst: true,
307 rstCode: statusCodeConvTab[se.Code()],
308 onWrite: func() {},
309 })
310 }
311 return false
312 }
313
314 buf := newRecvBuffer()
315 s := &Stream{
316 id: streamID,
317 st: t,
318 buf: buf,
319 fc: &inFlow{limit: uint32(t.initialWindowSize)},
320 recvCompress: state.data.encoding,
321 method: state.data.method,
322 contentSubtype: state.data.contentSubtype,
323 }
324 if frame.StreamEnded() {
325 // s is just created by the caller. No lock needed.
326 s.state = streamReadDone
327 }
328 if state.data.timeoutSet {
329 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
330 } else {
331 s.ctx, s.cancel = context.WithCancel(t.ctx)
332 }
333 pr := &peer.Peer{
334 Addr: t.remoteAddr,
335 }
336 // Attach Auth info if there is any.
337 if t.authInfo != nil {
338 pr.AuthInfo = t.authInfo
339 }
340 s.ctx = peer.NewContext(s.ctx, pr)
341 // Attach the received metadata to the context.
342 if len(state.data.mdata) > 0 {
343 s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
344 }
345 if state.data.statsTags != nil {
346 s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
347 }
348 if state.data.statsTrace != nil {
349 s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
350 }
351 if t.inTapHandle != nil {
352 var err error
353 info := &tap.Info{
354 FullMethodName: state.data.method,
355 }
356 s.ctx, err = t.inTapHandle(s.ctx, info)
357 if err != nil {
358 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
359 t.controlBuf.put(&cleanupStream{
360 streamID: s.id,
361 rst: true,
362 rstCode: http2.ErrCodeRefusedStream,
363 onWrite: func() {},
364 })
365 return false
366 }
367 }
368 t.mu.Lock()
369 if t.state != reachable {
370 t.mu.Unlock()
371 return false
372 }
373 if uint32(len(t.activeStreams)) >= t.maxStreams {
374 t.mu.Unlock()
375 t.controlBuf.put(&cleanupStream{
376 streamID: streamID,
377 rst: true,
378 rstCode: http2.ErrCodeRefusedStream,
379 onWrite: func() {},
380 })
381 return false
382 }
383 if streamID%2 != 1 || streamID <= t.maxStreamID {
384 t.mu.Unlock()
385 // illegal gRPC stream id.
386 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
387 return true
388 }
389 t.maxStreamID = streamID
390 t.activeStreams[streamID] = s
391 if len(t.activeStreams) == 1 {
392 t.idle = time.Time{}
393 }
394 t.mu.Unlock()
395 if channelz.IsOn() {
396 atomic.AddInt64(&t.czData.streamsStarted, 1)
397 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
398 }
399 s.requestRead = func(n int) {
400 t.adjustWindow(s, uint32(n))
401 }
402 s.ctx = traceCtx(s.ctx, s.method)
403 if t.stats != nil {
404 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
405 inHeader := &stats.InHeader{
406 FullMethod: s.method,
407 RemoteAddr: t.remoteAddr,
408 LocalAddr: t.localAddr,
409 Compression: s.recvCompress,
410 WireLength: int(frame.Header().Length),
411 }
412 t.stats.HandleRPC(s.ctx, inHeader)
413 }
414 s.ctxDone = s.ctx.Done()
415 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
416 s.trReader = &transportReader{
417 reader: &recvBufferReader{
418 ctx: s.ctx,
419 ctxDone: s.ctxDone,
420 recv: s.buf,
421 freeBuffer: t.bufferPool.put,
422 },
423 windowHandler: func(n int) {
424 t.updateWindow(s, uint32(n))
425 },
426 }
427 // Register the stream with loopy.
428 t.controlBuf.put(&registerStream{
429 streamID: s.id,
430 wq: s.wq,
431 })
432 handle(s)
433 return false
434}
435
436// HandleStreams receives incoming streams using the given handler. This is
437// typically run in a separate goroutine.
438// traceCtx attaches trace to ctx and returns the new context.
439func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
440 defer close(t.readerDone)
441 for {
442 t.controlBuf.throttle()
443 frame, err := t.framer.fr.ReadFrame()
444 atomic.StoreUint32(&t.activity, 1)
445 if err != nil {
446 if se, ok := err.(http2.StreamError); ok {
447 warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
448 t.mu.Lock()
449 s := t.activeStreams[se.StreamID]
450 t.mu.Unlock()
451 if s != nil {
452 t.closeStream(s, true, se.Code, false)
453 } else {
454 t.controlBuf.put(&cleanupStream{
455 streamID: se.StreamID,
456 rst: true,
457 rstCode: se.Code,
458 onWrite: func() {},
459 })
460 }
461 continue
462 }
463 if err == io.EOF || err == io.ErrUnexpectedEOF {
464 t.Close()
465 return
466 }
467 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
468 t.Close()
469 return
470 }
471 switch frame := frame.(type) {
472 case *http2.MetaHeadersFrame:
473 if t.operateHeaders(frame, handle, traceCtx) {
474 t.Close()
475 break
476 }
477 case *http2.DataFrame:
478 t.handleData(frame)
479 case *http2.RSTStreamFrame:
480 t.handleRSTStream(frame)
481 case *http2.SettingsFrame:
482 t.handleSettings(frame)
483 case *http2.PingFrame:
484 t.handlePing(frame)
485 case *http2.WindowUpdateFrame:
486 t.handleWindowUpdate(frame)
487 case *http2.GoAwayFrame:
488 // TODO: Handle GoAway from the client appropriately.
489 default:
490 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
491 }
492 }
493}
494
495func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
496 t.mu.Lock()
497 defer t.mu.Unlock()
498 if t.activeStreams == nil {
499 // The transport is closing.
500 return nil, false
501 }
502 s, ok := t.activeStreams[f.Header().StreamID]
503 if !ok {
504 // The stream is already done.
505 return nil, false
506 }
507 return s, true
508}
509
510// adjustWindow sends out extra window update over the initial window size
511// of stream if the application is requesting data larger in size than
512// the window.
513func (t *http2Server) adjustWindow(s *Stream, n uint32) {
514 if w := s.fc.maybeAdjust(n); w > 0 {
515 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
516 }
517
518}
519
520// updateWindow adjusts the inbound quota for the stream and the transport.
521// Window updates will deliver to the controller for sending when
522// the cumulative quota exceeds the corresponding threshold.
523func (t *http2Server) updateWindow(s *Stream, n uint32) {
524 if w := s.fc.onRead(n); w > 0 {
525 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
526 increment: w,
527 })
528 }
529}
530
531// updateFlowControl updates the incoming flow control windows
532// for the transport and the stream based on the current bdp
533// estimation.
534func (t *http2Server) updateFlowControl(n uint32) {
535 t.mu.Lock()
536 for _, s := range t.activeStreams {
537 s.fc.newLimit(n)
538 }
539 t.initialWindowSize = int32(n)
540 t.mu.Unlock()
541 t.controlBuf.put(&outgoingWindowUpdate{
542 streamID: 0,
543 increment: t.fc.newLimit(n),
544 })
545 t.controlBuf.put(&outgoingSettings{
546 ss: []http2.Setting{
547 {
548 ID: http2.SettingInitialWindowSize,
549 Val: n,
550 },
551 },
552 })
553
554}
555
556func (t *http2Server) handleData(f *http2.DataFrame) {
557 size := f.Header().Length
558 var sendBDPPing bool
559 if t.bdpEst != nil {
560 sendBDPPing = t.bdpEst.add(size)
561 }
562 // Decouple connection's flow control from application's read.
563 // An update on connection's flow control should not depend on
564 // whether user application has read the data or not. Such a
565 // restriction is already imposed on the stream's flow control,
566 // and therefore the sender will be blocked anyways.
567 // Decoupling the connection flow control will prevent other
568 // active(fast) streams from starving in presence of slow or
569 // inactive streams.
570 if w := t.fc.onData(size); w > 0 {
571 t.controlBuf.put(&outgoingWindowUpdate{
572 streamID: 0,
573 increment: w,
574 })
575 }
576 if sendBDPPing {
577 // Avoid excessive ping detection (e.g. in an L7 proxy)
578 // by sending a window update prior to the BDP ping.
579 if w := t.fc.reset(); w > 0 {
580 t.controlBuf.put(&outgoingWindowUpdate{
581 streamID: 0,
582 increment: w,
583 })
584 }
585 t.controlBuf.put(bdpPing)
586 }
587 // Select the right stream to dispatch.
588 s, ok := t.getStream(f)
589 if !ok {
590 return
591 }
592 if size > 0 {
593 if err := s.fc.onData(size); err != nil {
594 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
595 return
596 }
597 if f.Header().Flags.Has(http2.FlagDataPadded) {
598 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
599 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
600 }
601 }
602 // TODO(bradfitz, zhaoq): A copy is required here because there is no
603 // guarantee f.Data() is consumed before the arrival of next frame.
604 // Can this copy be eliminated?
605 if len(f.Data()) > 0 {
606 buffer := t.bufferPool.get()
607 buffer.Reset()
608 buffer.Write(f.Data())
609 s.write(recvMsg{buffer: buffer})
610 }
611 }
612 if f.Header().Flags.Has(http2.FlagDataEndStream) {
613 // Received the end of stream from the client.
614 s.compareAndSwapState(streamActive, streamReadDone)
615 s.write(recvMsg{err: io.EOF})
616 }
617}
618
619func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
620 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
621 if s, ok := t.getStream(f); ok {
622 t.closeStream(s, false, 0, false)
623 return
624 }
625 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
626 t.controlBuf.put(&cleanupStream{
627 streamID: f.Header().StreamID,
628 rst: false,
629 rstCode: 0,
630 onWrite: func() {},
631 })
632}
633
634func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
635 if f.IsAck() {
636 return
637 }
638 var ss []http2.Setting
639 var updateFuncs []func()
640 f.ForeachSetting(func(s http2.Setting) error {
641 switch s.ID {
642 case http2.SettingMaxHeaderListSize:
643 updateFuncs = append(updateFuncs, func() {
644 t.maxSendHeaderListSize = new(uint32)
645 *t.maxSendHeaderListSize = s.Val
646 })
647 default:
648 ss = append(ss, s)
649 }
650 return nil
651 })
652 t.controlBuf.executeAndPut(func(interface{}) bool {
653 for _, f := range updateFuncs {
654 f()
655 }
656 return true
657 }, &incomingSettings{
658 ss: ss,
659 })
660}
661
662const (
663 maxPingStrikes = 2
664 defaultPingTimeout = 2 * time.Hour
665)
666
667func (t *http2Server) handlePing(f *http2.PingFrame) {
668 if f.IsAck() {
669 if f.Data == goAwayPing.data && t.drainChan != nil {
670 close(t.drainChan)
671 return
672 }
673 // Maybe it's a BDP ping.
674 if t.bdpEst != nil {
675 t.bdpEst.calculate(f.Data)
676 }
677 return
678 }
679 pingAck := &ping{ack: true}
680 copy(pingAck.data[:], f.Data[:])
681 t.controlBuf.put(pingAck)
682
683 now := time.Now()
684 defer func() {
685 t.lastPingAt = now
686 }()
687 // A reset ping strikes means that we don't need to check for policy
688 // violation for this ping and the pingStrikes counter should be set
689 // to 0.
690 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
691 t.pingStrikes = 0
692 return
693 }
694 t.mu.Lock()
695 ns := len(t.activeStreams)
696 t.mu.Unlock()
697 if ns < 1 && !t.kep.PermitWithoutStream {
698 // Keepalive shouldn't be active thus, this new ping should
699 // have come after at least defaultPingTimeout.
700 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
701 t.pingStrikes++
702 }
703 } else {
704 // Check if keepalive policy is respected.
705 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
706 t.pingStrikes++
707 }
708 }
709
710 if t.pingStrikes > maxPingStrikes {
711 // Send goaway and close the connection.
712 errorf("transport: Got too many pings from the client, closing the connection.")
713 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
714 }
715}
716
717func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
718 t.controlBuf.put(&incomingWindowUpdate{
719 streamID: f.Header().StreamID,
720 increment: f.Increment,
721 })
722}
723
724func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
725 for k, vv := range md {
726 if isReservedHeader(k) {
727 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
728 continue
729 }
730 for _, v := range vv {
731 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
732 }
733 }
734 return headerFields
735}
736
737func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
738 if t.maxSendHeaderListSize == nil {
739 return true
740 }
741 hdrFrame := it.(*headerFrame)
742 var sz int64
743 for _, f := range hdrFrame.hf {
744 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
745 errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
746 return false
747 }
748 }
749 return true
750}
751
752// WriteHeader sends the header metedata md back to the client.
753func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
754 if s.updateHeaderSent() || s.getState() == streamDone {
755 return ErrIllegalHeaderWrite
756 }
757 s.hdrMu.Lock()
758 if md.Len() > 0 {
759 if s.header.Len() > 0 {
760 s.header = metadata.Join(s.header, md)
761 } else {
762 s.header = md
763 }
764 }
765 if err := t.writeHeaderLocked(s); err != nil {
766 s.hdrMu.Unlock()
767 return err
768 }
769 s.hdrMu.Unlock()
770 return nil
771}
772
773func (t *http2Server) setResetPingStrikes() {
774 atomic.StoreUint32(&t.resetPingStrikes, 1)
775}
776
777func (t *http2Server) writeHeaderLocked(s *Stream) error {
778 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
779 // first and create a slice of that exact size.
780 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
781 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
782 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
783 if s.sendCompress != "" {
784 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
785 }
786 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
787 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
788 streamID: s.id,
789 hf: headerFields,
790 endStream: false,
791 onWrite: t.setResetPingStrikes,
792 })
793 if !success {
794 if err != nil {
795 return err
796 }
797 t.closeStream(s, true, http2.ErrCodeInternal, false)
798 return ErrHeaderListSizeLimitViolation
799 }
800 if t.stats != nil {
801 // Note: WireLength is not set in outHeader.
802 // TODO(mmukhi): Revisit this later, if needed.
803 outHeader := &stats.OutHeader{}
804 t.stats.HandleRPC(s.Context(), outHeader)
805 }
806 return nil
807}
808
809// WriteStatus sends stream status to the client and terminates the stream.
810// There is no further I/O operations being able to perform on this stream.
811// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
812// OK is adopted.
813func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
814 if s.getState() == streamDone {
815 return nil
816 }
817 s.hdrMu.Lock()
818 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
819 // first and create a slice of that exact size.
820 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
821 if !s.updateHeaderSent() { // No headers have been sent.
822 if len(s.header) > 0 { // Send a separate header frame.
823 if err := t.writeHeaderLocked(s); err != nil {
824 s.hdrMu.Unlock()
825 return err
826 }
827 } else { // Send a trailer only response.
828 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
829 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
830 }
831 }
832 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
833 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
834
835 if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
836 stBytes, err := proto.Marshal(p)
837 if err != nil {
838 // TODO: return error instead, when callers are able to handle it.
839 grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
840 } else {
841 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
842 }
843 }
844
845 // Attach the trailer metadata.
846 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
847 trailingHeader := &headerFrame{
848 streamID: s.id,
849 hf: headerFields,
850 endStream: true,
851 onWrite: t.setResetPingStrikes,
852 }
853 s.hdrMu.Unlock()
854 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
855 if !success {
856 if err != nil {
857 return err
858 }
859 t.closeStream(s, true, http2.ErrCodeInternal, false)
860 return ErrHeaderListSizeLimitViolation
861 }
862 // Send a RST_STREAM after the trailers if the client has not already half-closed.
863 rst := s.getState() == streamActive
864 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
865 if t.stats != nil {
866 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
867 }
868 return nil
869}
870
871// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
872// is returns if it fails (e.g., framing error, transport error).
873func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
874 if !s.isHeaderSent() { // Headers haven't been written yet.
875 if err := t.WriteHeader(s, nil); err != nil {
876 if _, ok := err.(ConnectionError); ok {
877 return err
878 }
879 // TODO(mmukhi, dfawley): Make sure this is the right code to return.
880 return status.Errorf(codes.Internal, "transport: %v", err)
881 }
882 } else {
883 // Writing headers checks for this condition.
884 if s.getState() == streamDone {
885 // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
886 s.cancel()
887 select {
888 case <-t.ctx.Done():
889 return ErrConnClosing
890 default:
891 }
892 return ContextErr(s.ctx.Err())
893 }
894 }
895 // Add some data to header frame so that we can equally distribute bytes across frames.
896 emptyLen := http2MaxFrameLen - len(hdr)
897 if emptyLen > len(data) {
898 emptyLen = len(data)
899 }
900 hdr = append(hdr, data[:emptyLen]...)
901 data = data[emptyLen:]
902 df := &dataFrame{
903 streamID: s.id,
904 h: hdr,
905 d: data,
906 onEachWrite: t.setResetPingStrikes,
907 }
908 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
909 select {
910 case <-t.ctx.Done():
911 return ErrConnClosing
912 default:
913 }
914 return ContextErr(s.ctx.Err())
915 }
916 return t.controlBuf.put(df)
917}
918
919// keepalive running in a separate goroutine does the following:
920// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
921// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
922// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
923// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
924// after an additional duration of keepalive.Timeout.
925func (t *http2Server) keepalive() {
926 p := &ping{}
927 var pingSent bool
928 maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
929 maxAge := time.NewTimer(t.kp.MaxConnectionAge)
930 keepalive := time.NewTimer(t.kp.Time)
931 // NOTE: All exit paths of this function should reset their
932 // respective timers. A failure to do so will cause the
933 // following clean-up to deadlock and eventually leak.
934 defer func() {
935 if !maxIdle.Stop() {
936 <-maxIdle.C
937 }
938 if !maxAge.Stop() {
939 <-maxAge.C
940 }
941 if !keepalive.Stop() {
942 <-keepalive.C
943 }
944 }()
945 for {
946 select {
947 case <-maxIdle.C:
948 t.mu.Lock()
949 idle := t.idle
950 if idle.IsZero() { // The connection is non-idle.
951 t.mu.Unlock()
952 maxIdle.Reset(t.kp.MaxConnectionIdle)
953 continue
954 }
955 val := t.kp.MaxConnectionIdle - time.Since(idle)
956 t.mu.Unlock()
957 if val <= 0 {
958 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
959 // Gracefully close the connection.
960 t.drain(http2.ErrCodeNo, []byte{})
961 // Resetting the timer so that the clean-up doesn't deadlock.
962 maxIdle.Reset(infinity)
963 return
964 }
965 maxIdle.Reset(val)
966 case <-maxAge.C:
967 t.drain(http2.ErrCodeNo, []byte{})
968 maxAge.Reset(t.kp.MaxConnectionAgeGrace)
969 select {
970 case <-maxAge.C:
971 // Close the connection after grace period.
972 infof("transport: closing server transport due to maximum connection age.")
973 t.Close()
974 // Resetting the timer so that the clean-up doesn't deadlock.
975 maxAge.Reset(infinity)
976 case <-t.ctx.Done():
977 }
978 return
979 case <-keepalive.C:
980 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
981 pingSent = false
982 keepalive.Reset(t.kp.Time)
983 continue
984 }
985 if pingSent {
986 infof("transport: closing server transport due to idleness.")
987 t.Close()
988 // Resetting the timer so that the clean-up doesn't deadlock.
989 keepalive.Reset(infinity)
990 return
991 }
992 pingSent = true
993 if channelz.IsOn() {
994 atomic.AddInt64(&t.czData.kpCount, 1)
995 }
996 t.controlBuf.put(p)
997 keepalive.Reset(t.kp.Timeout)
998 case <-t.ctx.Done():
999 return
1000 }
1001 }
1002}
1003
1004// Close starts shutting down the http2Server transport.
1005// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1006// could cause some resource issue. Revisit this later.
1007func (t *http2Server) Close() error {
1008 t.mu.Lock()
1009 if t.state == closing {
1010 t.mu.Unlock()
1011 return errors.New("transport: Close() was already called")
1012 }
1013 t.state = closing
1014 streams := t.activeStreams
1015 t.activeStreams = nil
1016 t.mu.Unlock()
1017 t.controlBuf.finish()
1018 t.cancel()
1019 err := t.conn.Close()
1020 if channelz.IsOn() {
1021 channelz.RemoveEntry(t.channelzID)
1022 }
1023 // Cancel all active streams.
1024 for _, s := range streams {
1025 s.cancel()
1026 }
1027 if t.stats != nil {
1028 connEnd := &stats.ConnEnd{}
1029 t.stats.HandleConn(t.ctx, connEnd)
1030 }
1031 return err
1032}
1033
1034// deleteStream deletes the stream s from transport's active streams.
1035func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1036 // In case stream sending and receiving are invoked in separate
1037 // goroutines (e.g., bi-directional streaming), cancel needs to be
1038 // called to interrupt the potential blocking on other goroutines.
1039 s.cancel()
1040
1041 t.mu.Lock()
1042 if _, ok := t.activeStreams[s.id]; ok {
1043 delete(t.activeStreams, s.id)
1044 if len(t.activeStreams) == 0 {
1045 t.idle = time.Now()
1046 }
1047 }
1048 t.mu.Unlock()
1049
1050 if channelz.IsOn() {
1051 if eosReceived {
1052 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1053 } else {
1054 atomic.AddInt64(&t.czData.streamsFailed, 1)
1055 }
1056 }
1057}
1058
1059// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1060func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1061 oldState := s.swapState(streamDone)
1062 if oldState == streamDone {
1063 // If the stream was already done, return.
1064 return
1065 }
1066
1067 hdr.cleanup = &cleanupStream{
1068 streamID: s.id,
1069 rst: rst,
1070 rstCode: rstCode,
1071 onWrite: func() {
1072 t.deleteStream(s, eosReceived)
1073 },
1074 }
1075 t.controlBuf.put(hdr)
1076}
1077
1078// closeStream clears the footprint of a stream when the stream is not needed any more.
1079func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1080 s.swapState(streamDone)
1081 t.deleteStream(s, eosReceived)
1082
1083 t.controlBuf.put(&cleanupStream{
1084 streamID: s.id,
1085 rst: rst,
1086 rstCode: rstCode,
1087 onWrite: func() {},
1088 })
1089}
1090
1091func (t *http2Server) RemoteAddr() net.Addr {
1092 return t.remoteAddr
1093}
1094
1095func (t *http2Server) Drain() {
1096 t.drain(http2.ErrCodeNo, []byte{})
1097}
1098
1099func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1100 t.mu.Lock()
1101 defer t.mu.Unlock()
1102 if t.drainChan != nil {
1103 return
1104 }
1105 t.drainChan = make(chan struct{})
1106 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1107}
1108
1109var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1110
1111// Handles outgoing GoAway and returns true if loopy needs to put itself
1112// in draining mode.
1113func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1114 t.mu.Lock()
1115 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1116 t.mu.Unlock()
1117 // The transport is closing.
1118 return false, ErrConnClosing
1119 }
1120 sid := t.maxStreamID
1121 if !g.headsUp {
1122 // Stop accepting more streams now.
1123 t.state = draining
1124 if len(t.activeStreams) == 0 {
1125 g.closeConn = true
1126 }
1127 t.mu.Unlock()
1128 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1129 return false, err
1130 }
1131 if g.closeConn {
1132 // Abruptly close the connection following the GoAway (via
1133 // loopywriter). But flush out what's inside the buffer first.
1134 t.framer.writer.Flush()
1135 return false, fmt.Errorf("transport: Connection closing")
1136 }
1137 return true, nil
1138 }
1139 t.mu.Unlock()
1140 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1141 // Follow that with a ping and wait for the ack to come back or a timer
1142 // to expire. During this time accept new streams since they might have
1143 // originated before the GoAway reaches the client.
1144 // After getting the ack or timer expiration send out another GoAway this
1145 // time with an ID of the max stream server intends to process.
1146 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1147 return false, err
1148 }
1149 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1150 return false, err
1151 }
1152 go func() {
1153 timer := time.NewTimer(time.Minute)
1154 defer timer.Stop()
1155 select {
1156 case <-t.drainChan:
1157 case <-timer.C:
1158 case <-t.ctx.Done():
1159 return
1160 }
1161 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1162 }()
1163 return false, nil
1164}
1165
1166func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1167 s := channelz.SocketInternalMetric{
1168 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1169 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1170 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1171 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1172 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1173 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1174 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1175 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1176 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1177 LocalFlowControlWindow: int64(t.fc.getSize()),
1178 SocketOptions: channelz.GetSocketOption(t.conn),
1179 LocalAddr: t.localAddr,
1180 RemoteAddr: t.remoteAddr,
1181 // RemoteName :
1182 }
1183 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1184 s.Security = au.GetSecurityValue()
1185 }
1186 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1187 return &s
1188}
1189
1190func (t *http2Server) IncrMsgSent() {
1191 atomic.AddInt64(&t.czData.msgSent, 1)
1192 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1193}
1194
1195func (t *http2Server) IncrMsgRecv() {
1196 atomic.AddInt64(&t.czData.msgRecv, 1)
1197 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1198}
1199
1200func (t *http2Server) getOutFlowWindow() int64 {
1201 resp := make(chan uint32, 1)
1202 timer := time.NewTimer(time.Second)
1203 defer timer.Stop()
1204 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1205 select {
1206 case sz := <-resp:
1207 return int64(sz)
1208 case <-t.ctxDone:
1209 return -1
1210 case <-timer.C:
1211 return -2
1212 }
1213}
1214
1215func getJitter(v time.Duration) time.Duration {
1216 if v == infinity {
1217 return 0
1218 }
1219 // Generate a jitter between +/- 10% of the value.
1220 r := int64(v / 10)
1221 j := grpcrand.Int63n(2*r) - r
1222 return time.Duration(j)
1223}