blob: 435092e5c853a3fae3623011626cb7e633cb81b3 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "strconv"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "github.com/golang/protobuf/proto"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/grpclog"
41 "google.golang.org/grpc/internal/channelz"
42 "google.golang.org/grpc/internal/grpcrand"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/metadata"
45 "google.golang.org/grpc/peer"
46 "google.golang.org/grpc/stats"
47 "google.golang.org/grpc/status"
48 "google.golang.org/grpc/tap"
49)
50
51var (
52 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
53 // the stream's state.
54 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
55 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
56 // than the limit set by peer.
57 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
58)
59
60// http2Server implements the ServerTransport interface with HTTP2.
61type http2Server struct {
62 ctx context.Context
63 ctxDone <-chan struct{} // Cache the context.Done() chan
64 cancel context.CancelFunc
65 conn net.Conn
66 loopy *loopyWriter
67 readerDone chan struct{} // sync point to enable testing.
68 writerDone chan struct{} // sync point to enable testing.
69 remoteAddr net.Addr
70 localAddr net.Addr
71 maxStreamID uint32 // max stream ID ever seen
72 authInfo credentials.AuthInfo // auth info about the connection
73 inTapHandle tap.ServerInHandle
74 framer *framer
75 // The max number of concurrent streams.
76 maxStreams uint32
77 // controlBuf delivers all the control related tasks (e.g., window
78 // updates, reset streams, and various settings) to the controller.
79 controlBuf *controlBuffer
80 fc *trInFlow
81 stats stats.Handler
82 // Flag to keep track of reading activity on transport.
83 // 1 is true and 0 is false.
84 activity uint32 // Accessed atomically.
85 // Keepalive and max-age parameters for the server.
86 kp keepalive.ServerParameters
87
88 // Keepalive enforcement policy.
89 kep keepalive.EnforcementPolicy
90 // The time instance last ping was received.
91 lastPingAt time.Time
92 // Number of times the client has violated keepalive ping policy so far.
93 pingStrikes uint8
94 // Flag to signify that number of ping strikes should be reset to 0.
95 // This is set whenever data or header frames are sent.
96 // 1 means yes.
97 resetPingStrikes uint32 // Accessed atomically.
98 initialWindowSize int32
99 bdpEst *bdpEstimator
100 maxSendHeaderListSize *uint32
101
102 mu sync.Mutex // guard the following
103
104 // drainChan is initialized when drain(...) is called the first time.
105 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
106 // Then an independent goroutine will be launched to later send the second GoAway.
107 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
108 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
109 // already underway.
110 drainChan chan struct{}
111 state transportState
112 activeStreams map[uint32]*Stream
113 // idle is the time instant when the connection went idle.
114 // This is either the beginning of the connection or when the number of
115 // RPCs go down to 0.
116 // When the connection is busy, this value is set to 0.
117 idle time.Time
118
119 // Fields below are for channelz metric collection.
120 channelzID int64 // channelz unique identification number
121 czData *channelzData
122}
123
124// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
125// returned if something goes wrong.
126func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
127 writeBufSize := config.WriteBufferSize
128 readBufSize := config.ReadBufferSize
129 maxHeaderListSize := defaultServerMaxHeaderListSize
130 if config.MaxHeaderListSize != nil {
131 maxHeaderListSize = *config.MaxHeaderListSize
132 }
133 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
134 // Send initial settings as connection preface to client.
135 var isettings []http2.Setting
136 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
137 // permitted in the HTTP2 spec.
138 maxStreams := config.MaxStreams
139 if maxStreams == 0 {
140 maxStreams = math.MaxUint32
141 } else {
142 isettings = append(isettings, http2.Setting{
143 ID: http2.SettingMaxConcurrentStreams,
144 Val: maxStreams,
145 })
146 }
147 dynamicWindow := true
148 iwz := int32(initialWindowSize)
149 if config.InitialWindowSize >= defaultWindowSize {
150 iwz = config.InitialWindowSize
151 dynamicWindow = false
152 }
153 icwz := int32(initialWindowSize)
154 if config.InitialConnWindowSize >= defaultWindowSize {
155 icwz = config.InitialConnWindowSize
156 dynamicWindow = false
157 }
158 if iwz != defaultWindowSize {
159 isettings = append(isettings, http2.Setting{
160 ID: http2.SettingInitialWindowSize,
161 Val: uint32(iwz)})
162 }
163 if config.MaxHeaderListSize != nil {
164 isettings = append(isettings, http2.Setting{
165 ID: http2.SettingMaxHeaderListSize,
166 Val: *config.MaxHeaderListSize,
167 })
168 }
169 if err := framer.fr.WriteSettings(isettings...); err != nil {
170 return nil, connectionErrorf(false, err, "transport: %v", err)
171 }
172 // Adjust the connection flow control window if needed.
173 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
174 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
175 return nil, connectionErrorf(false, err, "transport: %v", err)
176 }
177 }
178 kp := config.KeepaliveParams
179 if kp.MaxConnectionIdle == 0 {
180 kp.MaxConnectionIdle = defaultMaxConnectionIdle
181 }
182 if kp.MaxConnectionAge == 0 {
183 kp.MaxConnectionAge = defaultMaxConnectionAge
184 }
185 // Add a jitter to MaxConnectionAge.
186 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
187 if kp.MaxConnectionAgeGrace == 0 {
188 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
189 }
190 if kp.Time == 0 {
191 kp.Time = defaultServerKeepaliveTime
192 }
193 if kp.Timeout == 0 {
194 kp.Timeout = defaultServerKeepaliveTimeout
195 }
196 kep := config.KeepalivePolicy
197 if kep.MinTime == 0 {
198 kep.MinTime = defaultKeepalivePolicyMinTime
199 }
200 ctx, cancel := context.WithCancel(context.Background())
201 t := &http2Server{
202 ctx: ctx,
203 cancel: cancel,
204 ctxDone: ctx.Done(),
205 conn: conn,
206 remoteAddr: conn.RemoteAddr(),
207 localAddr: conn.LocalAddr(),
208 authInfo: config.AuthInfo,
209 framer: framer,
210 readerDone: make(chan struct{}),
211 writerDone: make(chan struct{}),
212 maxStreams: maxStreams,
213 inTapHandle: config.InTapHandle,
214 fc: &trInFlow{limit: uint32(icwz)},
215 state: reachable,
216 activeStreams: make(map[uint32]*Stream),
217 stats: config.StatsHandler,
218 kp: kp,
219 idle: time.Now(),
220 kep: kep,
221 initialWindowSize: iwz,
222 czData: new(channelzData),
223 }
224 t.controlBuf = newControlBuffer(t.ctxDone)
225 if dynamicWindow {
226 t.bdpEst = &bdpEstimator{
227 bdp: initialWindowSize,
228 updateFlowControl: t.updateFlowControl,
229 }
230 }
231 if t.stats != nil {
232 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
233 RemoteAddr: t.remoteAddr,
234 LocalAddr: t.localAddr,
235 })
236 connBegin := &stats.ConnBegin{}
237 t.stats.HandleConn(t.ctx, connBegin)
238 }
239 if channelz.IsOn() {
240 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
241 }
242 t.framer.writer.Flush()
243
244 defer func() {
245 if err != nil {
246 t.Close()
247 }
248 }()
249
250 // Check the validity of client preface.
251 preface := make([]byte, len(clientPreface))
252 if _, err := io.ReadFull(t.conn, preface); err != nil {
253 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
254 }
255 if !bytes.Equal(preface, clientPreface) {
256 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
257 }
258
259 frame, err := t.framer.fr.ReadFrame()
260 if err == io.EOF || err == io.ErrUnexpectedEOF {
261 return nil, err
262 }
263 if err != nil {
264 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
265 }
266 atomic.StoreUint32(&t.activity, 1)
267 sf, ok := frame.(*http2.SettingsFrame)
268 if !ok {
269 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
270 }
271 t.handleSettings(sf)
272
273 go func() {
274 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
275 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
276 if err := t.loopy.run(); err != nil {
277 errorf("transport: loopyWriter.run returning. Err: %v", err)
278 }
279 t.conn.Close()
280 close(t.writerDone)
281 }()
282 go t.keepalive()
283 return t, nil
284}
285
286// operateHeader takes action on the decoded headers.
287func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
288 streamID := frame.Header().StreamID
289 state := &decodeState{
290 serverSide: true,
291 }
292 if err := state.decodeHeader(frame); err != nil {
293 if se, ok := status.FromError(err); ok {
294 t.controlBuf.put(&cleanupStream{
295 streamID: streamID,
296 rst: true,
297 rstCode: statusCodeConvTab[se.Code()],
298 onWrite: func() {},
299 })
300 }
301 return false
302 }
303
304 buf := newRecvBuffer()
305 s := &Stream{
306 id: streamID,
307 st: t,
308 buf: buf,
309 fc: &inFlow{limit: uint32(t.initialWindowSize)},
310 recvCompress: state.data.encoding,
311 method: state.data.method,
312 contentSubtype: state.data.contentSubtype,
313 }
314 if frame.StreamEnded() {
315 // s is just created by the caller. No lock needed.
316 s.state = streamReadDone
317 }
318 if state.data.timeoutSet {
319 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
320 } else {
321 s.ctx, s.cancel = context.WithCancel(t.ctx)
322 }
323 pr := &peer.Peer{
324 Addr: t.remoteAddr,
325 }
326 // Attach Auth info if there is any.
327 if t.authInfo != nil {
328 pr.AuthInfo = t.authInfo
329 }
330 s.ctx = peer.NewContext(s.ctx, pr)
331 // Attach the received metadata to the context.
332 if len(state.data.mdata) > 0 {
333 s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
334 }
335 if state.data.statsTags != nil {
336 s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
337 }
338 if state.data.statsTrace != nil {
339 s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
340 }
341 if t.inTapHandle != nil {
342 var err error
343 info := &tap.Info{
344 FullMethodName: state.data.method,
345 }
346 s.ctx, err = t.inTapHandle(s.ctx, info)
347 if err != nil {
348 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
349 t.controlBuf.put(&cleanupStream{
350 streamID: s.id,
351 rst: true,
352 rstCode: http2.ErrCodeRefusedStream,
353 onWrite: func() {},
354 })
355 return false
356 }
357 }
358 t.mu.Lock()
359 if t.state != reachable {
360 t.mu.Unlock()
361 return false
362 }
363 if uint32(len(t.activeStreams)) >= t.maxStreams {
364 t.mu.Unlock()
365 t.controlBuf.put(&cleanupStream{
366 streamID: streamID,
367 rst: true,
368 rstCode: http2.ErrCodeRefusedStream,
369 onWrite: func() {},
370 })
371 return false
372 }
373 if streamID%2 != 1 || streamID <= t.maxStreamID {
374 t.mu.Unlock()
375 // illegal gRPC stream id.
376 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
377 return true
378 }
379 t.maxStreamID = streamID
380 t.activeStreams[streamID] = s
381 if len(t.activeStreams) == 1 {
382 t.idle = time.Time{}
383 }
384 t.mu.Unlock()
385 if channelz.IsOn() {
386 atomic.AddInt64(&t.czData.streamsStarted, 1)
387 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
388 }
389 s.requestRead = func(n int) {
390 t.adjustWindow(s, uint32(n))
391 }
392 s.ctx = traceCtx(s.ctx, s.method)
393 if t.stats != nil {
394 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
395 inHeader := &stats.InHeader{
396 FullMethod: s.method,
397 RemoteAddr: t.remoteAddr,
398 LocalAddr: t.localAddr,
399 Compression: s.recvCompress,
400 WireLength: int(frame.Header().Length),
401 }
402 t.stats.HandleRPC(s.ctx, inHeader)
403 }
404 s.ctxDone = s.ctx.Done()
405 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
406 s.trReader = &transportReader{
407 reader: &recvBufferReader{
408 ctx: s.ctx,
409 ctxDone: s.ctxDone,
410 recv: s.buf,
411 },
412 windowHandler: func(n int) {
413 t.updateWindow(s, uint32(n))
414 },
415 }
416 // Register the stream with loopy.
417 t.controlBuf.put(&registerStream{
418 streamID: s.id,
419 wq: s.wq,
420 })
421 handle(s)
422 return false
423}
424
425// HandleStreams receives incoming streams using the given handler. This is
426// typically run in a separate goroutine.
427// traceCtx attaches trace to ctx and returns the new context.
428func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
429 defer close(t.readerDone)
430 for {
431 frame, err := t.framer.fr.ReadFrame()
432 atomic.StoreUint32(&t.activity, 1)
433 if err != nil {
434 if se, ok := err.(http2.StreamError); ok {
435 warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
436 t.mu.Lock()
437 s := t.activeStreams[se.StreamID]
438 t.mu.Unlock()
439 if s != nil {
440 t.closeStream(s, true, se.Code, false)
441 } else {
442 t.controlBuf.put(&cleanupStream{
443 streamID: se.StreamID,
444 rst: true,
445 rstCode: se.Code,
446 onWrite: func() {},
447 })
448 }
449 continue
450 }
451 if err == io.EOF || err == io.ErrUnexpectedEOF {
452 t.Close()
453 return
454 }
455 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
456 t.Close()
457 return
458 }
459 switch frame := frame.(type) {
460 case *http2.MetaHeadersFrame:
461 if t.operateHeaders(frame, handle, traceCtx) {
462 t.Close()
463 break
464 }
465 case *http2.DataFrame:
466 t.handleData(frame)
467 case *http2.RSTStreamFrame:
468 t.handleRSTStream(frame)
469 case *http2.SettingsFrame:
470 t.handleSettings(frame)
471 case *http2.PingFrame:
472 t.handlePing(frame)
473 case *http2.WindowUpdateFrame:
474 t.handleWindowUpdate(frame)
475 case *http2.GoAwayFrame:
476 // TODO: Handle GoAway from the client appropriately.
477 default:
478 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
479 }
480 }
481}
482
483func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
484 t.mu.Lock()
485 defer t.mu.Unlock()
486 if t.activeStreams == nil {
487 // The transport is closing.
488 return nil, false
489 }
490 s, ok := t.activeStreams[f.Header().StreamID]
491 if !ok {
492 // The stream is already done.
493 return nil, false
494 }
495 return s, true
496}
497
498// adjustWindow sends out extra window update over the initial window size
499// of stream if the application is requesting data larger in size than
500// the window.
501func (t *http2Server) adjustWindow(s *Stream, n uint32) {
502 if w := s.fc.maybeAdjust(n); w > 0 {
503 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
504 }
505
506}
507
508// updateWindow adjusts the inbound quota for the stream and the transport.
509// Window updates will deliver to the controller for sending when
510// the cumulative quota exceeds the corresponding threshold.
511func (t *http2Server) updateWindow(s *Stream, n uint32) {
512 if w := s.fc.onRead(n); w > 0 {
513 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
514 increment: w,
515 })
516 }
517}
518
519// updateFlowControl updates the incoming flow control windows
520// for the transport and the stream based on the current bdp
521// estimation.
522func (t *http2Server) updateFlowControl(n uint32) {
523 t.mu.Lock()
524 for _, s := range t.activeStreams {
525 s.fc.newLimit(n)
526 }
527 t.initialWindowSize = int32(n)
528 t.mu.Unlock()
529 t.controlBuf.put(&outgoingWindowUpdate{
530 streamID: 0,
531 increment: t.fc.newLimit(n),
532 })
533 t.controlBuf.put(&outgoingSettings{
534 ss: []http2.Setting{
535 {
536 ID: http2.SettingInitialWindowSize,
537 Val: n,
538 },
539 },
540 })
541
542}
543
544func (t *http2Server) handleData(f *http2.DataFrame) {
545 size := f.Header().Length
546 var sendBDPPing bool
547 if t.bdpEst != nil {
548 sendBDPPing = t.bdpEst.add(size)
549 }
550 // Decouple connection's flow control from application's read.
551 // An update on connection's flow control should not depend on
552 // whether user application has read the data or not. Such a
553 // restriction is already imposed on the stream's flow control,
554 // and therefore the sender will be blocked anyways.
555 // Decoupling the connection flow control will prevent other
556 // active(fast) streams from starving in presence of slow or
557 // inactive streams.
558 if w := t.fc.onData(size); w > 0 {
559 t.controlBuf.put(&outgoingWindowUpdate{
560 streamID: 0,
561 increment: w,
562 })
563 }
564 if sendBDPPing {
565 // Avoid excessive ping detection (e.g. in an L7 proxy)
566 // by sending a window update prior to the BDP ping.
567 if w := t.fc.reset(); w > 0 {
568 t.controlBuf.put(&outgoingWindowUpdate{
569 streamID: 0,
570 increment: w,
571 })
572 }
573 t.controlBuf.put(bdpPing)
574 }
575 // Select the right stream to dispatch.
576 s, ok := t.getStream(f)
577 if !ok {
578 return
579 }
580 if size > 0 {
581 if err := s.fc.onData(size); err != nil {
582 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
583 return
584 }
585 if f.Header().Flags.Has(http2.FlagDataPadded) {
586 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
587 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
588 }
589 }
590 // TODO(bradfitz, zhaoq): A copy is required here because there is no
591 // guarantee f.Data() is consumed before the arrival of next frame.
592 // Can this copy be eliminated?
593 if len(f.Data()) > 0 {
594 data := make([]byte, len(f.Data()))
595 copy(data, f.Data())
596 s.write(recvMsg{data: data})
597 }
598 }
599 if f.Header().Flags.Has(http2.FlagDataEndStream) {
600 // Received the end of stream from the client.
601 s.compareAndSwapState(streamActive, streamReadDone)
602 s.write(recvMsg{err: io.EOF})
603 }
604}
605
606func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
607 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
608 if s, ok := t.getStream(f); ok {
609 t.closeStream(s, false, 0, false)
610 return
611 }
612 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
613 t.controlBuf.put(&cleanupStream{
614 streamID: f.Header().StreamID,
615 rst: false,
616 rstCode: 0,
617 onWrite: func() {},
618 })
619}
620
621func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
622 if f.IsAck() {
623 return
624 }
625 var ss []http2.Setting
626 var updateFuncs []func()
627 f.ForeachSetting(func(s http2.Setting) error {
628 switch s.ID {
629 case http2.SettingMaxHeaderListSize:
630 updateFuncs = append(updateFuncs, func() {
631 t.maxSendHeaderListSize = new(uint32)
632 *t.maxSendHeaderListSize = s.Val
633 })
634 default:
635 ss = append(ss, s)
636 }
637 return nil
638 })
639 t.controlBuf.executeAndPut(func(interface{}) bool {
640 for _, f := range updateFuncs {
641 f()
642 }
643 return true
644 }, &incomingSettings{
645 ss: ss,
646 })
647}
648
649const (
650 maxPingStrikes = 2
651 defaultPingTimeout = 2 * time.Hour
652)
653
654func (t *http2Server) handlePing(f *http2.PingFrame) {
655 if f.IsAck() {
656 if f.Data == goAwayPing.data && t.drainChan != nil {
657 close(t.drainChan)
658 return
659 }
660 // Maybe it's a BDP ping.
661 if t.bdpEst != nil {
662 t.bdpEst.calculate(f.Data)
663 }
664 return
665 }
666 pingAck := &ping{ack: true}
667 copy(pingAck.data[:], f.Data[:])
668 t.controlBuf.put(pingAck)
669
670 now := time.Now()
671 defer func() {
672 t.lastPingAt = now
673 }()
674 // A reset ping strikes means that we don't need to check for policy
675 // violation for this ping and the pingStrikes counter should be set
676 // to 0.
677 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
678 t.pingStrikes = 0
679 return
680 }
681 t.mu.Lock()
682 ns := len(t.activeStreams)
683 t.mu.Unlock()
684 if ns < 1 && !t.kep.PermitWithoutStream {
685 // Keepalive shouldn't be active thus, this new ping should
686 // have come after at least defaultPingTimeout.
687 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
688 t.pingStrikes++
689 }
690 } else {
691 // Check if keepalive policy is respected.
692 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
693 t.pingStrikes++
694 }
695 }
696
697 if t.pingStrikes > maxPingStrikes {
698 // Send goaway and close the connection.
699 errorf("transport: Got too many pings from the client, closing the connection.")
700 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
701 }
702}
703
704func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
705 t.controlBuf.put(&incomingWindowUpdate{
706 streamID: f.Header().StreamID,
707 increment: f.Increment,
708 })
709}
710
711func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
712 for k, vv := range md {
713 if isReservedHeader(k) {
714 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
715 continue
716 }
717 for _, v := range vv {
718 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
719 }
720 }
721 return headerFields
722}
723
724func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
725 if t.maxSendHeaderListSize == nil {
726 return true
727 }
728 hdrFrame := it.(*headerFrame)
729 var sz int64
730 for _, f := range hdrFrame.hf {
731 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
732 errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
733 return false
734 }
735 }
736 return true
737}
738
739// WriteHeader sends the header metedata md back to the client.
740func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
741 if s.updateHeaderSent() || s.getState() == streamDone {
742 return ErrIllegalHeaderWrite
743 }
744 s.hdrMu.Lock()
745 if md.Len() > 0 {
746 if s.header.Len() > 0 {
747 s.header = metadata.Join(s.header, md)
748 } else {
749 s.header = md
750 }
751 }
752 if err := t.writeHeaderLocked(s); err != nil {
753 s.hdrMu.Unlock()
754 return err
755 }
756 s.hdrMu.Unlock()
757 return nil
758}
759
760func (t *http2Server) writeHeaderLocked(s *Stream) error {
761 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
762 // first and create a slice of that exact size.
763 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
764 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
765 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
766 if s.sendCompress != "" {
767 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
768 }
769 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
770 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
771 streamID: s.id,
772 hf: headerFields,
773 endStream: false,
774 onWrite: func() {
775 atomic.StoreUint32(&t.resetPingStrikes, 1)
776 },
777 })
778 if !success {
779 if err != nil {
780 return err
781 }
782 t.closeStream(s, true, http2.ErrCodeInternal, false)
783 return ErrHeaderListSizeLimitViolation
784 }
785 if t.stats != nil {
786 // Note: WireLength is not set in outHeader.
787 // TODO(mmukhi): Revisit this later, if needed.
788 outHeader := &stats.OutHeader{}
789 t.stats.HandleRPC(s.Context(), outHeader)
790 }
791 return nil
792}
793
794// WriteStatus sends stream status to the client and terminates the stream.
795// There is no further I/O operations being able to perform on this stream.
796// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
797// OK is adopted.
798func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
799 if s.getState() == streamDone {
800 return nil
801 }
802 s.hdrMu.Lock()
803 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
804 // first and create a slice of that exact size.
805 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
806 if !s.updateHeaderSent() { // No headers have been sent.
807 if len(s.header) > 0 { // Send a separate header frame.
808 if err := t.writeHeaderLocked(s); err != nil {
809 s.hdrMu.Unlock()
810 return err
811 }
812 } else { // Send a trailer only response.
813 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
814 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
815 }
816 }
817 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
818 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
819
820 if p := st.Proto(); p != nil && len(p.Details) > 0 {
821 stBytes, err := proto.Marshal(p)
822 if err != nil {
823 // TODO: return error instead, when callers are able to handle it.
824 grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
825 } else {
826 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
827 }
828 }
829
830 // Attach the trailer metadata.
831 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
832 trailingHeader := &headerFrame{
833 streamID: s.id,
834 hf: headerFields,
835 endStream: true,
836 onWrite: func() {
837 atomic.StoreUint32(&t.resetPingStrikes, 1)
838 },
839 }
840 s.hdrMu.Unlock()
841 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
842 if !success {
843 if err != nil {
844 return err
845 }
846 t.closeStream(s, true, http2.ErrCodeInternal, false)
847 return ErrHeaderListSizeLimitViolation
848 }
849 // Send a RST_STREAM after the trailers if the client has not already half-closed.
850 rst := s.getState() == streamActive
851 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
852 if t.stats != nil {
853 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
854 }
855 return nil
856}
857
858// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
859// is returns if it fails (e.g., framing error, transport error).
860func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
861 if !s.isHeaderSent() { // Headers haven't been written yet.
862 if err := t.WriteHeader(s, nil); err != nil {
863 if _, ok := err.(ConnectionError); ok {
864 return err
865 }
866 // TODO(mmukhi, dfawley): Make sure this is the right code to return.
867 return status.Errorf(codes.Internal, "transport: %v", err)
868 }
869 } else {
870 // Writing headers checks for this condition.
871 if s.getState() == streamDone {
872 // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
873 s.cancel()
874 select {
875 case <-t.ctx.Done():
876 return ErrConnClosing
877 default:
878 }
879 return ContextErr(s.ctx.Err())
880 }
881 }
882 // Add some data to header frame so that we can equally distribute bytes across frames.
883 emptyLen := http2MaxFrameLen - len(hdr)
884 if emptyLen > len(data) {
885 emptyLen = len(data)
886 }
887 hdr = append(hdr, data[:emptyLen]...)
888 data = data[emptyLen:]
889 df := &dataFrame{
890 streamID: s.id,
891 h: hdr,
892 d: data,
893 onEachWrite: func() {
894 atomic.StoreUint32(&t.resetPingStrikes, 1)
895 },
896 }
897 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
898 select {
899 case <-t.ctx.Done():
900 return ErrConnClosing
901 default:
902 }
903 return ContextErr(s.ctx.Err())
904 }
905 return t.controlBuf.put(df)
906}
907
908// keepalive running in a separate goroutine does the following:
909// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
910// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
911// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
912// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
913// after an additional duration of keepalive.Timeout.
914func (t *http2Server) keepalive() {
915 p := &ping{}
916 var pingSent bool
917 maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
918 maxAge := time.NewTimer(t.kp.MaxConnectionAge)
919 keepalive := time.NewTimer(t.kp.Time)
920 // NOTE: All exit paths of this function should reset their
921 // respective timers. A failure to do so will cause the
922 // following clean-up to deadlock and eventually leak.
923 defer func() {
924 if !maxIdle.Stop() {
925 <-maxIdle.C
926 }
927 if !maxAge.Stop() {
928 <-maxAge.C
929 }
930 if !keepalive.Stop() {
931 <-keepalive.C
932 }
933 }()
934 for {
935 select {
936 case <-maxIdle.C:
937 t.mu.Lock()
938 idle := t.idle
939 if idle.IsZero() { // The connection is non-idle.
940 t.mu.Unlock()
941 maxIdle.Reset(t.kp.MaxConnectionIdle)
942 continue
943 }
944 val := t.kp.MaxConnectionIdle - time.Since(idle)
945 t.mu.Unlock()
946 if val <= 0 {
947 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
948 // Gracefully close the connection.
949 t.drain(http2.ErrCodeNo, []byte{})
950 // Resetting the timer so that the clean-up doesn't deadlock.
951 maxIdle.Reset(infinity)
952 return
953 }
954 maxIdle.Reset(val)
955 case <-maxAge.C:
956 t.drain(http2.ErrCodeNo, []byte{})
957 maxAge.Reset(t.kp.MaxConnectionAgeGrace)
958 select {
959 case <-maxAge.C:
960 // Close the connection after grace period.
961 t.Close()
962 // Resetting the timer so that the clean-up doesn't deadlock.
963 maxAge.Reset(infinity)
964 case <-t.ctx.Done():
965 }
966 return
967 case <-keepalive.C:
968 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
969 pingSent = false
970 keepalive.Reset(t.kp.Time)
971 continue
972 }
973 if pingSent {
974 t.Close()
975 // Resetting the timer so that the clean-up doesn't deadlock.
976 keepalive.Reset(infinity)
977 return
978 }
979 pingSent = true
980 if channelz.IsOn() {
981 atomic.AddInt64(&t.czData.kpCount, 1)
982 }
983 t.controlBuf.put(p)
984 keepalive.Reset(t.kp.Timeout)
985 case <-t.ctx.Done():
986 return
987 }
988 }
989}
990
991// Close starts shutting down the http2Server transport.
992// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
993// could cause some resource issue. Revisit this later.
994func (t *http2Server) Close() error {
995 t.mu.Lock()
996 if t.state == closing {
997 t.mu.Unlock()
998 return errors.New("transport: Close() was already called")
999 }
1000 t.state = closing
1001 streams := t.activeStreams
1002 t.activeStreams = nil
1003 t.mu.Unlock()
1004 t.controlBuf.finish()
1005 t.cancel()
1006 err := t.conn.Close()
1007 if channelz.IsOn() {
1008 channelz.RemoveEntry(t.channelzID)
1009 }
1010 // Cancel all active streams.
1011 for _, s := range streams {
1012 s.cancel()
1013 }
1014 if t.stats != nil {
1015 connEnd := &stats.ConnEnd{}
1016 t.stats.HandleConn(t.ctx, connEnd)
1017 }
1018 return err
1019}
1020
1021// deleteStream deletes the stream s from transport's active streams.
1022func (t *http2Server) deleteStream(s *Stream, eosReceived bool) (oldState streamState) {
1023 oldState = s.swapState(streamDone)
1024 if oldState == streamDone {
1025 // If the stream was already done, return.
1026 return oldState
1027 }
1028
1029 // In case stream sending and receiving are invoked in separate
1030 // goroutines (e.g., bi-directional streaming), cancel needs to be
1031 // called to interrupt the potential blocking on other goroutines.
1032 s.cancel()
1033
1034 t.mu.Lock()
1035 if _, ok := t.activeStreams[s.id]; ok {
1036 delete(t.activeStreams, s.id)
1037 if len(t.activeStreams) == 0 {
1038 t.idle = time.Now()
1039 }
1040 }
1041 t.mu.Unlock()
1042
1043 if channelz.IsOn() {
1044 if eosReceived {
1045 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1046 } else {
1047 atomic.AddInt64(&t.czData.streamsFailed, 1)
1048 }
1049 }
1050
1051 return oldState
1052}
1053
1054// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1055func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1056 oldState := t.deleteStream(s, eosReceived)
1057 // If the stream is already closed, then don't put trailing header to controlbuf.
1058 if oldState == streamDone {
1059 return
1060 }
1061
1062 hdr.cleanup = &cleanupStream{
1063 streamID: s.id,
1064 rst: rst,
1065 rstCode: rstCode,
1066 onWrite: func() {},
1067 }
1068 t.controlBuf.put(hdr)
1069}
1070
1071// closeStream clears the footprint of a stream when the stream is not needed any more.
1072func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1073 t.deleteStream(s, eosReceived)
1074 t.controlBuf.put(&cleanupStream{
1075 streamID: s.id,
1076 rst: rst,
1077 rstCode: rstCode,
1078 onWrite: func() {},
1079 })
1080}
1081
1082func (t *http2Server) RemoteAddr() net.Addr {
1083 return t.remoteAddr
1084}
1085
1086func (t *http2Server) Drain() {
1087 t.drain(http2.ErrCodeNo, []byte{})
1088}
1089
1090func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1091 t.mu.Lock()
1092 defer t.mu.Unlock()
1093 if t.drainChan != nil {
1094 return
1095 }
1096 t.drainChan = make(chan struct{})
1097 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1098}
1099
1100var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1101
1102// Handles outgoing GoAway and returns true if loopy needs to put itself
1103// in draining mode.
1104func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1105 t.mu.Lock()
1106 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1107 t.mu.Unlock()
1108 // The transport is closing.
1109 return false, ErrConnClosing
1110 }
1111 sid := t.maxStreamID
1112 if !g.headsUp {
1113 // Stop accepting more streams now.
1114 t.state = draining
1115 if len(t.activeStreams) == 0 {
1116 g.closeConn = true
1117 }
1118 t.mu.Unlock()
1119 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1120 return false, err
1121 }
1122 if g.closeConn {
1123 // Abruptly close the connection following the GoAway (via
1124 // loopywriter). But flush out what's inside the buffer first.
1125 t.framer.writer.Flush()
1126 return false, fmt.Errorf("transport: Connection closing")
1127 }
1128 return true, nil
1129 }
1130 t.mu.Unlock()
1131 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1132 // Follow that with a ping and wait for the ack to come back or a timer
1133 // to expire. During this time accept new streams since they might have
1134 // originated before the GoAway reaches the client.
1135 // After getting the ack or timer expiration send out another GoAway this
1136 // time with an ID of the max stream server intends to process.
1137 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1138 return false, err
1139 }
1140 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1141 return false, err
1142 }
1143 go func() {
1144 timer := time.NewTimer(time.Minute)
1145 defer timer.Stop()
1146 select {
1147 case <-t.drainChan:
1148 case <-timer.C:
1149 case <-t.ctx.Done():
1150 return
1151 }
1152 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1153 }()
1154 return false, nil
1155}
1156
1157func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1158 s := channelz.SocketInternalMetric{
1159 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1160 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1161 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1162 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1163 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1164 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1165 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1166 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1167 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1168 LocalFlowControlWindow: int64(t.fc.getSize()),
1169 SocketOptions: channelz.GetSocketOption(t.conn),
1170 LocalAddr: t.localAddr,
1171 RemoteAddr: t.remoteAddr,
1172 // RemoteName :
1173 }
1174 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1175 s.Security = au.GetSecurityValue()
1176 }
1177 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1178 return &s
1179}
1180
1181func (t *http2Server) IncrMsgSent() {
1182 atomic.AddInt64(&t.czData.msgSent, 1)
1183 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1184}
1185
1186func (t *http2Server) IncrMsgRecv() {
1187 atomic.AddInt64(&t.czData.msgRecv, 1)
1188 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1189}
1190
1191func (t *http2Server) getOutFlowWindow() int64 {
1192 resp := make(chan uint32, 1)
1193 timer := time.NewTimer(time.Second)
1194 defer timer.Stop()
1195 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1196 select {
1197 case sz := <-resp:
1198 return int64(sz)
1199 case <-t.ctxDone:
1200 return -1
1201 case <-timer.C:
1202 return -2
1203 }
1204}
1205
1206func getJitter(v time.Duration) time.Duration {
1207 if v == infinity {
1208 return 0
1209 }
1210 // Generate a jitter between +/- 10% of the value.
1211 r := int64(v / 10)
1212 j := grpcrand.Int63n(2*r) - r
1213 return time.Duration(j)
1214}