blob: 07603836468f60b298c37cacab1c8a5a0639ab4c [file] [log] [blame]
Don Newton98fd8812019-09-23 15:15:02 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "strconv"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "github.com/golang/protobuf/proto"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37
38 spb "google.golang.org/genproto/googleapis/rpc/status"
39 "google.golang.org/grpc/codes"
40 "google.golang.org/grpc/credentials"
41 "google.golang.org/grpc/grpclog"
42 "google.golang.org/grpc/internal"
43 "google.golang.org/grpc/internal/channelz"
44 "google.golang.org/grpc/internal/grpcrand"
45 "google.golang.org/grpc/keepalive"
46 "google.golang.org/grpc/metadata"
47 "google.golang.org/grpc/peer"
48 "google.golang.org/grpc/stats"
49 "google.golang.org/grpc/status"
50 "google.golang.org/grpc/tap"
51)
52
53var (
54 // ErrIllegalHeaderWrite indicates that setting header is illegal because of
55 // the stream's state.
56 ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
57 // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
58 // than the limit set by peer.
59 ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
60 // statusRawProto is a function to get to the raw status proto wrapped in a
61 // status.Status without a proto.Clone().
62 statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
63)
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67 ctx context.Context
Don Newtone0d34a82019-11-14 10:58:06 -050068 done chan struct{}
Don Newton98fd8812019-09-23 15:15:02 -040069 conn net.Conn
70 loopy *loopyWriter
71 readerDone chan struct{} // sync point to enable testing.
72 writerDone chan struct{} // sync point to enable testing.
73 remoteAddr net.Addr
74 localAddr net.Addr
75 maxStreamID uint32 // max stream ID ever seen
76 authInfo credentials.AuthInfo // auth info about the connection
77 inTapHandle tap.ServerInHandle
78 framer *framer
79 // The max number of concurrent streams.
80 maxStreams uint32
81 // controlBuf delivers all the control related tasks (e.g., window
82 // updates, reset streams, and various settings) to the controller.
83 controlBuf *controlBuffer
84 fc *trInFlow
85 stats stats.Handler
86 // Flag to keep track of reading activity on transport.
87 // 1 is true and 0 is false.
88 activity uint32 // Accessed atomically.
89 // Keepalive and max-age parameters for the server.
90 kp keepalive.ServerParameters
91
92 // Keepalive enforcement policy.
93 kep keepalive.EnforcementPolicy
94 // The time instance last ping was received.
95 lastPingAt time.Time
96 // Number of times the client has violated keepalive ping policy so far.
97 pingStrikes uint8
98 // Flag to signify that number of ping strikes should be reset to 0.
99 // This is set whenever data or header frames are sent.
100 // 1 means yes.
101 resetPingStrikes uint32 // Accessed atomically.
102 initialWindowSize int32
103 bdpEst *bdpEstimator
104 maxSendHeaderListSize *uint32
105
106 mu sync.Mutex // guard the following
107
108 // drainChan is initialized when drain(...) is called the first time.
109 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
110 // Then an independent goroutine will be launched to later send the second GoAway.
111 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
112 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
113 // already underway.
114 drainChan chan struct{}
115 state transportState
116 activeStreams map[uint32]*Stream
117 // idle is the time instant when the connection went idle.
118 // This is either the beginning of the connection or when the number of
119 // RPCs go down to 0.
120 // When the connection is busy, this value is set to 0.
121 idle time.Time
122
123 // Fields below are for channelz metric collection.
124 channelzID int64 // channelz unique identification number
125 czData *channelzData
126 bufferPool *bufferPool
127}
128
129// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
130// returned if something goes wrong.
131func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
132 writeBufSize := config.WriteBufferSize
133 readBufSize := config.ReadBufferSize
134 maxHeaderListSize := defaultServerMaxHeaderListSize
135 if config.MaxHeaderListSize != nil {
136 maxHeaderListSize = *config.MaxHeaderListSize
137 }
138 framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
139 // Send initial settings as connection preface to client.
140 isettings := []http2.Setting{{
141 ID: http2.SettingMaxFrameSize,
142 Val: http2MaxFrameLen,
143 }}
144 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
145 // permitted in the HTTP2 spec.
146 maxStreams := config.MaxStreams
147 if maxStreams == 0 {
148 maxStreams = math.MaxUint32
149 } else {
150 isettings = append(isettings, http2.Setting{
151 ID: http2.SettingMaxConcurrentStreams,
152 Val: maxStreams,
153 })
154 }
155 dynamicWindow := true
156 iwz := int32(initialWindowSize)
157 if config.InitialWindowSize >= defaultWindowSize {
158 iwz = config.InitialWindowSize
159 dynamicWindow = false
160 }
161 icwz := int32(initialWindowSize)
162 if config.InitialConnWindowSize >= defaultWindowSize {
163 icwz = config.InitialConnWindowSize
164 dynamicWindow = false
165 }
166 if iwz != defaultWindowSize {
167 isettings = append(isettings, http2.Setting{
168 ID: http2.SettingInitialWindowSize,
169 Val: uint32(iwz)})
170 }
171 if config.MaxHeaderListSize != nil {
172 isettings = append(isettings, http2.Setting{
173 ID: http2.SettingMaxHeaderListSize,
174 Val: *config.MaxHeaderListSize,
175 })
176 }
Don Newtone0d34a82019-11-14 10:58:06 -0500177 if config.HeaderTableSize != nil {
178 isettings = append(isettings, http2.Setting{
179 ID: http2.SettingHeaderTableSize,
180 Val: *config.HeaderTableSize,
181 })
182 }
Don Newton98fd8812019-09-23 15:15:02 -0400183 if err := framer.fr.WriteSettings(isettings...); err != nil {
184 return nil, connectionErrorf(false, err, "transport: %v", err)
185 }
186 // Adjust the connection flow control window if needed.
187 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
188 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
189 return nil, connectionErrorf(false, err, "transport: %v", err)
190 }
191 }
192 kp := config.KeepaliveParams
193 if kp.MaxConnectionIdle == 0 {
194 kp.MaxConnectionIdle = defaultMaxConnectionIdle
195 }
196 if kp.MaxConnectionAge == 0 {
197 kp.MaxConnectionAge = defaultMaxConnectionAge
198 }
199 // Add a jitter to MaxConnectionAge.
200 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
201 if kp.MaxConnectionAgeGrace == 0 {
202 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
203 }
204 if kp.Time == 0 {
205 kp.Time = defaultServerKeepaliveTime
206 }
207 if kp.Timeout == 0 {
208 kp.Timeout = defaultServerKeepaliveTimeout
209 }
210 kep := config.KeepalivePolicy
211 if kep.MinTime == 0 {
212 kep.MinTime = defaultKeepalivePolicyMinTime
213 }
Don Newtone0d34a82019-11-14 10:58:06 -0500214 done := make(chan struct{})
Don Newton98fd8812019-09-23 15:15:02 -0400215 t := &http2Server{
Don Newtone0d34a82019-11-14 10:58:06 -0500216 ctx: context.Background(),
217 done: done,
Don Newton98fd8812019-09-23 15:15:02 -0400218 conn: conn,
219 remoteAddr: conn.RemoteAddr(),
220 localAddr: conn.LocalAddr(),
221 authInfo: config.AuthInfo,
222 framer: framer,
223 readerDone: make(chan struct{}),
224 writerDone: make(chan struct{}),
225 maxStreams: maxStreams,
226 inTapHandle: config.InTapHandle,
227 fc: &trInFlow{limit: uint32(icwz)},
228 state: reachable,
229 activeStreams: make(map[uint32]*Stream),
230 stats: config.StatsHandler,
231 kp: kp,
232 idle: time.Now(),
233 kep: kep,
234 initialWindowSize: iwz,
235 czData: new(channelzData),
236 bufferPool: newBufferPool(),
237 }
Don Newtone0d34a82019-11-14 10:58:06 -0500238 t.controlBuf = newControlBuffer(t.done)
Don Newton98fd8812019-09-23 15:15:02 -0400239 if dynamicWindow {
240 t.bdpEst = &bdpEstimator{
241 bdp: initialWindowSize,
242 updateFlowControl: t.updateFlowControl,
243 }
244 }
245 if t.stats != nil {
246 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
247 RemoteAddr: t.remoteAddr,
248 LocalAddr: t.localAddr,
249 })
250 connBegin := &stats.ConnBegin{}
251 t.stats.HandleConn(t.ctx, connBegin)
252 }
253 if channelz.IsOn() {
254 t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
255 }
256 t.framer.writer.Flush()
257
258 defer func() {
259 if err != nil {
260 t.Close()
261 }
262 }()
263
264 // Check the validity of client preface.
265 preface := make([]byte, len(clientPreface))
266 if _, err := io.ReadFull(t.conn, preface); err != nil {
267 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
268 }
269 if !bytes.Equal(preface, clientPreface) {
270 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
271 }
272
273 frame, err := t.framer.fr.ReadFrame()
274 if err == io.EOF || err == io.ErrUnexpectedEOF {
275 return nil, err
276 }
277 if err != nil {
278 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
279 }
280 atomic.StoreUint32(&t.activity, 1)
281 sf, ok := frame.(*http2.SettingsFrame)
282 if !ok {
283 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
284 }
285 t.handleSettings(sf)
286
287 go func() {
288 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
289 t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
290 if err := t.loopy.run(); err != nil {
291 errorf("transport: loopyWriter.run returning. Err: %v", err)
292 }
293 t.conn.Close()
294 close(t.writerDone)
295 }()
296 go t.keepalive()
297 return t, nil
298}
299
300// operateHeader takes action on the decoded headers.
301func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
302 streamID := frame.Header().StreamID
303 state := &decodeState{
304 serverSide: true,
305 }
306 if err := state.decodeHeader(frame); err != nil {
307 if se, ok := status.FromError(err); ok {
308 t.controlBuf.put(&cleanupStream{
309 streamID: streamID,
310 rst: true,
311 rstCode: statusCodeConvTab[se.Code()],
312 onWrite: func() {},
313 })
314 }
315 return false
316 }
317
318 buf := newRecvBuffer()
319 s := &Stream{
320 id: streamID,
321 st: t,
322 buf: buf,
323 fc: &inFlow{limit: uint32(t.initialWindowSize)},
324 recvCompress: state.data.encoding,
325 method: state.data.method,
326 contentSubtype: state.data.contentSubtype,
327 }
328 if frame.StreamEnded() {
329 // s is just created by the caller. No lock needed.
330 s.state = streamReadDone
331 }
332 if state.data.timeoutSet {
333 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
334 } else {
335 s.ctx, s.cancel = context.WithCancel(t.ctx)
336 }
337 pr := &peer.Peer{
338 Addr: t.remoteAddr,
339 }
340 // Attach Auth info if there is any.
341 if t.authInfo != nil {
342 pr.AuthInfo = t.authInfo
343 }
344 s.ctx = peer.NewContext(s.ctx, pr)
345 // Attach the received metadata to the context.
346 if len(state.data.mdata) > 0 {
347 s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
348 }
349 if state.data.statsTags != nil {
350 s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
351 }
352 if state.data.statsTrace != nil {
353 s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
354 }
355 if t.inTapHandle != nil {
356 var err error
357 info := &tap.Info{
358 FullMethodName: state.data.method,
359 }
360 s.ctx, err = t.inTapHandle(s.ctx, info)
361 if err != nil {
362 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
363 t.controlBuf.put(&cleanupStream{
364 streamID: s.id,
365 rst: true,
366 rstCode: http2.ErrCodeRefusedStream,
367 onWrite: func() {},
368 })
Don Newtone0d34a82019-11-14 10:58:06 -0500369 s.cancel()
Don Newton98fd8812019-09-23 15:15:02 -0400370 return false
371 }
372 }
373 t.mu.Lock()
374 if t.state != reachable {
375 t.mu.Unlock()
Don Newtone0d34a82019-11-14 10:58:06 -0500376 s.cancel()
Don Newton98fd8812019-09-23 15:15:02 -0400377 return false
378 }
379 if uint32(len(t.activeStreams)) >= t.maxStreams {
380 t.mu.Unlock()
381 t.controlBuf.put(&cleanupStream{
382 streamID: streamID,
383 rst: true,
384 rstCode: http2.ErrCodeRefusedStream,
385 onWrite: func() {},
386 })
Don Newtone0d34a82019-11-14 10:58:06 -0500387 s.cancel()
Don Newton98fd8812019-09-23 15:15:02 -0400388 return false
389 }
390 if streamID%2 != 1 || streamID <= t.maxStreamID {
391 t.mu.Unlock()
392 // illegal gRPC stream id.
393 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
Don Newtone0d34a82019-11-14 10:58:06 -0500394 s.cancel()
Don Newton98fd8812019-09-23 15:15:02 -0400395 return true
396 }
397 t.maxStreamID = streamID
398 t.activeStreams[streamID] = s
399 if len(t.activeStreams) == 1 {
400 t.idle = time.Time{}
401 }
402 t.mu.Unlock()
403 if channelz.IsOn() {
404 atomic.AddInt64(&t.czData.streamsStarted, 1)
405 atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
406 }
407 s.requestRead = func(n int) {
408 t.adjustWindow(s, uint32(n))
409 }
410 s.ctx = traceCtx(s.ctx, s.method)
411 if t.stats != nil {
412 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
413 inHeader := &stats.InHeader{
414 FullMethod: s.method,
415 RemoteAddr: t.remoteAddr,
416 LocalAddr: t.localAddr,
417 Compression: s.recvCompress,
418 WireLength: int(frame.Header().Length),
419 }
420 t.stats.HandleRPC(s.ctx, inHeader)
421 }
422 s.ctxDone = s.ctx.Done()
423 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
424 s.trReader = &transportReader{
425 reader: &recvBufferReader{
426 ctx: s.ctx,
427 ctxDone: s.ctxDone,
428 recv: s.buf,
429 freeBuffer: t.bufferPool.put,
430 },
431 windowHandler: func(n int) {
432 t.updateWindow(s, uint32(n))
433 },
434 }
435 // Register the stream with loopy.
436 t.controlBuf.put(&registerStream{
437 streamID: s.id,
438 wq: s.wq,
439 })
440 handle(s)
441 return false
442}
443
444// HandleStreams receives incoming streams using the given handler. This is
445// typically run in a separate goroutine.
446// traceCtx attaches trace to ctx and returns the new context.
447func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
448 defer close(t.readerDone)
449 for {
450 t.controlBuf.throttle()
451 frame, err := t.framer.fr.ReadFrame()
452 atomic.StoreUint32(&t.activity, 1)
453 if err != nil {
454 if se, ok := err.(http2.StreamError); ok {
455 warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
456 t.mu.Lock()
457 s := t.activeStreams[se.StreamID]
458 t.mu.Unlock()
459 if s != nil {
460 t.closeStream(s, true, se.Code, false)
461 } else {
462 t.controlBuf.put(&cleanupStream{
463 streamID: se.StreamID,
464 rst: true,
465 rstCode: se.Code,
466 onWrite: func() {},
467 })
468 }
469 continue
470 }
471 if err == io.EOF || err == io.ErrUnexpectedEOF {
472 t.Close()
473 return
474 }
475 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
476 t.Close()
477 return
478 }
479 switch frame := frame.(type) {
480 case *http2.MetaHeadersFrame:
481 if t.operateHeaders(frame, handle, traceCtx) {
482 t.Close()
483 break
484 }
485 case *http2.DataFrame:
486 t.handleData(frame)
487 case *http2.RSTStreamFrame:
488 t.handleRSTStream(frame)
489 case *http2.SettingsFrame:
490 t.handleSettings(frame)
491 case *http2.PingFrame:
492 t.handlePing(frame)
493 case *http2.WindowUpdateFrame:
494 t.handleWindowUpdate(frame)
495 case *http2.GoAwayFrame:
496 // TODO: Handle GoAway from the client appropriately.
497 default:
498 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
499 }
500 }
501}
502
503func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
504 t.mu.Lock()
505 defer t.mu.Unlock()
506 if t.activeStreams == nil {
507 // The transport is closing.
508 return nil, false
509 }
510 s, ok := t.activeStreams[f.Header().StreamID]
511 if !ok {
512 // The stream is already done.
513 return nil, false
514 }
515 return s, true
516}
517
518// adjustWindow sends out extra window update over the initial window size
519// of stream if the application is requesting data larger in size than
520// the window.
521func (t *http2Server) adjustWindow(s *Stream, n uint32) {
522 if w := s.fc.maybeAdjust(n); w > 0 {
523 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
524 }
525
526}
527
528// updateWindow adjusts the inbound quota for the stream and the transport.
529// Window updates will deliver to the controller for sending when
530// the cumulative quota exceeds the corresponding threshold.
531func (t *http2Server) updateWindow(s *Stream, n uint32) {
532 if w := s.fc.onRead(n); w > 0 {
533 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
534 increment: w,
535 })
536 }
537}
538
539// updateFlowControl updates the incoming flow control windows
540// for the transport and the stream based on the current bdp
541// estimation.
542func (t *http2Server) updateFlowControl(n uint32) {
543 t.mu.Lock()
544 for _, s := range t.activeStreams {
545 s.fc.newLimit(n)
546 }
547 t.initialWindowSize = int32(n)
548 t.mu.Unlock()
549 t.controlBuf.put(&outgoingWindowUpdate{
550 streamID: 0,
551 increment: t.fc.newLimit(n),
552 })
553 t.controlBuf.put(&outgoingSettings{
554 ss: []http2.Setting{
555 {
556 ID: http2.SettingInitialWindowSize,
557 Val: n,
558 },
559 },
560 })
561
562}
563
564func (t *http2Server) handleData(f *http2.DataFrame) {
565 size := f.Header().Length
566 var sendBDPPing bool
567 if t.bdpEst != nil {
568 sendBDPPing = t.bdpEst.add(size)
569 }
570 // Decouple connection's flow control from application's read.
571 // An update on connection's flow control should not depend on
572 // whether user application has read the data or not. Such a
573 // restriction is already imposed on the stream's flow control,
574 // and therefore the sender will be blocked anyways.
575 // Decoupling the connection flow control will prevent other
576 // active(fast) streams from starving in presence of slow or
577 // inactive streams.
578 if w := t.fc.onData(size); w > 0 {
579 t.controlBuf.put(&outgoingWindowUpdate{
580 streamID: 0,
581 increment: w,
582 })
583 }
584 if sendBDPPing {
585 // Avoid excessive ping detection (e.g. in an L7 proxy)
586 // by sending a window update prior to the BDP ping.
587 if w := t.fc.reset(); w > 0 {
588 t.controlBuf.put(&outgoingWindowUpdate{
589 streamID: 0,
590 increment: w,
591 })
592 }
593 t.controlBuf.put(bdpPing)
594 }
595 // Select the right stream to dispatch.
596 s, ok := t.getStream(f)
597 if !ok {
598 return
599 }
600 if size > 0 {
601 if err := s.fc.onData(size); err != nil {
602 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
603 return
604 }
605 if f.Header().Flags.Has(http2.FlagDataPadded) {
606 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
607 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
608 }
609 }
610 // TODO(bradfitz, zhaoq): A copy is required here because there is no
611 // guarantee f.Data() is consumed before the arrival of next frame.
612 // Can this copy be eliminated?
613 if len(f.Data()) > 0 {
614 buffer := t.bufferPool.get()
615 buffer.Reset()
616 buffer.Write(f.Data())
617 s.write(recvMsg{buffer: buffer})
618 }
619 }
620 if f.Header().Flags.Has(http2.FlagDataEndStream) {
621 // Received the end of stream from the client.
622 s.compareAndSwapState(streamActive, streamReadDone)
623 s.write(recvMsg{err: io.EOF})
624 }
625}
626
627func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
628 // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
629 if s, ok := t.getStream(f); ok {
630 t.closeStream(s, false, 0, false)
631 return
632 }
633 // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
634 t.controlBuf.put(&cleanupStream{
635 streamID: f.Header().StreamID,
636 rst: false,
637 rstCode: 0,
638 onWrite: func() {},
639 })
640}
641
642func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
643 if f.IsAck() {
644 return
645 }
646 var ss []http2.Setting
647 var updateFuncs []func()
648 f.ForeachSetting(func(s http2.Setting) error {
649 switch s.ID {
650 case http2.SettingMaxHeaderListSize:
651 updateFuncs = append(updateFuncs, func() {
652 t.maxSendHeaderListSize = new(uint32)
653 *t.maxSendHeaderListSize = s.Val
654 })
655 default:
656 ss = append(ss, s)
657 }
658 return nil
659 })
660 t.controlBuf.executeAndPut(func(interface{}) bool {
661 for _, f := range updateFuncs {
662 f()
663 }
664 return true
665 }, &incomingSettings{
666 ss: ss,
667 })
668}
669
670const (
671 maxPingStrikes = 2
672 defaultPingTimeout = 2 * time.Hour
673)
674
675func (t *http2Server) handlePing(f *http2.PingFrame) {
676 if f.IsAck() {
677 if f.Data == goAwayPing.data && t.drainChan != nil {
678 close(t.drainChan)
679 return
680 }
681 // Maybe it's a BDP ping.
682 if t.bdpEst != nil {
683 t.bdpEst.calculate(f.Data)
684 }
685 return
686 }
687 pingAck := &ping{ack: true}
688 copy(pingAck.data[:], f.Data[:])
689 t.controlBuf.put(pingAck)
690
691 now := time.Now()
692 defer func() {
693 t.lastPingAt = now
694 }()
695 // A reset ping strikes means that we don't need to check for policy
696 // violation for this ping and the pingStrikes counter should be set
697 // to 0.
698 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
699 t.pingStrikes = 0
700 return
701 }
702 t.mu.Lock()
703 ns := len(t.activeStreams)
704 t.mu.Unlock()
705 if ns < 1 && !t.kep.PermitWithoutStream {
706 // Keepalive shouldn't be active thus, this new ping should
707 // have come after at least defaultPingTimeout.
708 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
709 t.pingStrikes++
710 }
711 } else {
712 // Check if keepalive policy is respected.
713 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
714 t.pingStrikes++
715 }
716 }
717
718 if t.pingStrikes > maxPingStrikes {
719 // Send goaway and close the connection.
720 errorf("transport: Got too many pings from the client, closing the connection.")
721 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
722 }
723}
724
725func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
726 t.controlBuf.put(&incomingWindowUpdate{
727 streamID: f.Header().StreamID,
728 increment: f.Increment,
729 })
730}
731
732func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
733 for k, vv := range md {
734 if isReservedHeader(k) {
735 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
736 continue
737 }
738 for _, v := range vv {
739 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
740 }
741 }
742 return headerFields
743}
744
745func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
746 if t.maxSendHeaderListSize == nil {
747 return true
748 }
749 hdrFrame := it.(*headerFrame)
750 var sz int64
751 for _, f := range hdrFrame.hf {
752 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
753 errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
754 return false
755 }
756 }
757 return true
758}
759
Don Newtone0d34a82019-11-14 10:58:06 -0500760// WriteHeader sends the header metadata md back to the client.
Don Newton98fd8812019-09-23 15:15:02 -0400761func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
762 if s.updateHeaderSent() || s.getState() == streamDone {
763 return ErrIllegalHeaderWrite
764 }
765 s.hdrMu.Lock()
766 if md.Len() > 0 {
767 if s.header.Len() > 0 {
768 s.header = metadata.Join(s.header, md)
769 } else {
770 s.header = md
771 }
772 }
773 if err := t.writeHeaderLocked(s); err != nil {
774 s.hdrMu.Unlock()
775 return err
776 }
777 s.hdrMu.Unlock()
778 return nil
779}
780
781func (t *http2Server) setResetPingStrikes() {
782 atomic.StoreUint32(&t.resetPingStrikes, 1)
783}
784
785func (t *http2Server) writeHeaderLocked(s *Stream) error {
786 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
787 // first and create a slice of that exact size.
788 headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
789 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
790 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
791 if s.sendCompress != "" {
792 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
793 }
794 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
795 success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
796 streamID: s.id,
797 hf: headerFields,
798 endStream: false,
799 onWrite: t.setResetPingStrikes,
800 })
801 if !success {
802 if err != nil {
803 return err
804 }
805 t.closeStream(s, true, http2.ErrCodeInternal, false)
806 return ErrHeaderListSizeLimitViolation
807 }
808 if t.stats != nil {
809 // Note: WireLength is not set in outHeader.
810 // TODO(mmukhi): Revisit this later, if needed.
811 outHeader := &stats.OutHeader{}
812 t.stats.HandleRPC(s.Context(), outHeader)
813 }
814 return nil
815}
816
817// WriteStatus sends stream status to the client and terminates the stream.
818// There is no further I/O operations being able to perform on this stream.
819// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
820// OK is adopted.
821func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
822 if s.getState() == streamDone {
823 return nil
824 }
825 s.hdrMu.Lock()
826 // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
827 // first and create a slice of that exact size.
828 headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
829 if !s.updateHeaderSent() { // No headers have been sent.
830 if len(s.header) > 0 { // Send a separate header frame.
831 if err := t.writeHeaderLocked(s); err != nil {
832 s.hdrMu.Unlock()
833 return err
834 }
835 } else { // Send a trailer only response.
836 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
837 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
838 }
839 }
840 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
841 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
842
843 if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
844 stBytes, err := proto.Marshal(p)
845 if err != nil {
846 // TODO: return error instead, when callers are able to handle it.
847 grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
848 } else {
849 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
850 }
851 }
852
853 // Attach the trailer metadata.
854 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
855 trailingHeader := &headerFrame{
856 streamID: s.id,
857 hf: headerFields,
858 endStream: true,
859 onWrite: t.setResetPingStrikes,
860 }
861 s.hdrMu.Unlock()
862 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
863 if !success {
864 if err != nil {
865 return err
866 }
867 t.closeStream(s, true, http2.ErrCodeInternal, false)
868 return ErrHeaderListSizeLimitViolation
869 }
870 // Send a RST_STREAM after the trailers if the client has not already half-closed.
871 rst := s.getState() == streamActive
872 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
873 if t.stats != nil {
874 t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
875 }
876 return nil
877}
878
879// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
880// is returns if it fails (e.g., framing error, transport error).
881func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
882 if !s.isHeaderSent() { // Headers haven't been written yet.
883 if err := t.WriteHeader(s, nil); err != nil {
884 if _, ok := err.(ConnectionError); ok {
885 return err
886 }
887 // TODO(mmukhi, dfawley): Make sure this is the right code to return.
888 return status.Errorf(codes.Internal, "transport: %v", err)
889 }
890 } else {
891 // Writing headers checks for this condition.
892 if s.getState() == streamDone {
893 // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
894 s.cancel()
895 select {
Don Newtone0d34a82019-11-14 10:58:06 -0500896 case <-t.done:
Don Newton98fd8812019-09-23 15:15:02 -0400897 return ErrConnClosing
898 default:
899 }
900 return ContextErr(s.ctx.Err())
901 }
902 }
903 // Add some data to header frame so that we can equally distribute bytes across frames.
904 emptyLen := http2MaxFrameLen - len(hdr)
905 if emptyLen > len(data) {
906 emptyLen = len(data)
907 }
908 hdr = append(hdr, data[:emptyLen]...)
909 data = data[emptyLen:]
910 df := &dataFrame{
911 streamID: s.id,
912 h: hdr,
913 d: data,
914 onEachWrite: t.setResetPingStrikes,
915 }
916 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
917 select {
Don Newtone0d34a82019-11-14 10:58:06 -0500918 case <-t.done:
Don Newton98fd8812019-09-23 15:15:02 -0400919 return ErrConnClosing
920 default:
921 }
922 return ContextErr(s.ctx.Err())
923 }
924 return t.controlBuf.put(df)
925}
926
927// keepalive running in a separate goroutine does the following:
928// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
929// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
930// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
931// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
932// after an additional duration of keepalive.Timeout.
933func (t *http2Server) keepalive() {
934 p := &ping{}
935 var pingSent bool
936 maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
937 maxAge := time.NewTimer(t.kp.MaxConnectionAge)
938 keepalive := time.NewTimer(t.kp.Time)
939 // NOTE: All exit paths of this function should reset their
940 // respective timers. A failure to do so will cause the
941 // following clean-up to deadlock and eventually leak.
942 defer func() {
943 if !maxIdle.Stop() {
944 <-maxIdle.C
945 }
946 if !maxAge.Stop() {
947 <-maxAge.C
948 }
949 if !keepalive.Stop() {
950 <-keepalive.C
951 }
952 }()
953 for {
954 select {
955 case <-maxIdle.C:
956 t.mu.Lock()
957 idle := t.idle
958 if idle.IsZero() { // The connection is non-idle.
959 t.mu.Unlock()
960 maxIdle.Reset(t.kp.MaxConnectionIdle)
961 continue
962 }
963 val := t.kp.MaxConnectionIdle - time.Since(idle)
964 t.mu.Unlock()
965 if val <= 0 {
966 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
967 // Gracefully close the connection.
968 t.drain(http2.ErrCodeNo, []byte{})
969 // Resetting the timer so that the clean-up doesn't deadlock.
970 maxIdle.Reset(infinity)
971 return
972 }
973 maxIdle.Reset(val)
974 case <-maxAge.C:
975 t.drain(http2.ErrCodeNo, []byte{})
976 maxAge.Reset(t.kp.MaxConnectionAgeGrace)
977 select {
978 case <-maxAge.C:
979 // Close the connection after grace period.
980 infof("transport: closing server transport due to maximum connection age.")
981 t.Close()
982 // Resetting the timer so that the clean-up doesn't deadlock.
983 maxAge.Reset(infinity)
Don Newtone0d34a82019-11-14 10:58:06 -0500984 case <-t.done:
Don Newton98fd8812019-09-23 15:15:02 -0400985 }
986 return
987 case <-keepalive.C:
988 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
989 pingSent = false
990 keepalive.Reset(t.kp.Time)
991 continue
992 }
993 if pingSent {
994 infof("transport: closing server transport due to idleness.")
995 t.Close()
996 // Resetting the timer so that the clean-up doesn't deadlock.
997 keepalive.Reset(infinity)
998 return
999 }
1000 pingSent = true
1001 if channelz.IsOn() {
1002 atomic.AddInt64(&t.czData.kpCount, 1)
1003 }
1004 t.controlBuf.put(p)
1005 keepalive.Reset(t.kp.Timeout)
Don Newtone0d34a82019-11-14 10:58:06 -05001006 case <-t.done:
Don Newton98fd8812019-09-23 15:15:02 -04001007 return
1008 }
1009 }
1010}
1011
1012// Close starts shutting down the http2Server transport.
1013// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1014// could cause some resource issue. Revisit this later.
1015func (t *http2Server) Close() error {
1016 t.mu.Lock()
1017 if t.state == closing {
1018 t.mu.Unlock()
1019 return errors.New("transport: Close() was already called")
1020 }
1021 t.state = closing
1022 streams := t.activeStreams
1023 t.activeStreams = nil
1024 t.mu.Unlock()
1025 t.controlBuf.finish()
Don Newtone0d34a82019-11-14 10:58:06 -05001026 close(t.done)
Don Newton98fd8812019-09-23 15:15:02 -04001027 err := t.conn.Close()
1028 if channelz.IsOn() {
1029 channelz.RemoveEntry(t.channelzID)
1030 }
1031 // Cancel all active streams.
1032 for _, s := range streams {
1033 s.cancel()
1034 }
1035 if t.stats != nil {
1036 connEnd := &stats.ConnEnd{}
1037 t.stats.HandleConn(t.ctx, connEnd)
1038 }
1039 return err
1040}
1041
1042// deleteStream deletes the stream s from transport's active streams.
1043func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1044 // In case stream sending and receiving are invoked in separate
1045 // goroutines (e.g., bi-directional streaming), cancel needs to be
1046 // called to interrupt the potential blocking on other goroutines.
1047 s.cancel()
1048
1049 t.mu.Lock()
1050 if _, ok := t.activeStreams[s.id]; ok {
1051 delete(t.activeStreams, s.id)
1052 if len(t.activeStreams) == 0 {
1053 t.idle = time.Now()
1054 }
1055 }
1056 t.mu.Unlock()
1057
1058 if channelz.IsOn() {
1059 if eosReceived {
1060 atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1061 } else {
1062 atomic.AddInt64(&t.czData.streamsFailed, 1)
1063 }
1064 }
1065}
1066
1067// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1068func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1069 oldState := s.swapState(streamDone)
1070 if oldState == streamDone {
1071 // If the stream was already done, return.
1072 return
1073 }
1074
1075 hdr.cleanup = &cleanupStream{
1076 streamID: s.id,
1077 rst: rst,
1078 rstCode: rstCode,
1079 onWrite: func() {
1080 t.deleteStream(s, eosReceived)
1081 },
1082 }
1083 t.controlBuf.put(hdr)
1084}
1085
1086// closeStream clears the footprint of a stream when the stream is not needed any more.
1087func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1088 s.swapState(streamDone)
1089 t.deleteStream(s, eosReceived)
1090
1091 t.controlBuf.put(&cleanupStream{
1092 streamID: s.id,
1093 rst: rst,
1094 rstCode: rstCode,
1095 onWrite: func() {},
1096 })
1097}
1098
1099func (t *http2Server) RemoteAddr() net.Addr {
1100 return t.remoteAddr
1101}
1102
1103func (t *http2Server) Drain() {
1104 t.drain(http2.ErrCodeNo, []byte{})
1105}
1106
1107func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1108 t.mu.Lock()
1109 defer t.mu.Unlock()
1110 if t.drainChan != nil {
1111 return
1112 }
1113 t.drainChan = make(chan struct{})
1114 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1115}
1116
1117var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1118
1119// Handles outgoing GoAway and returns true if loopy needs to put itself
1120// in draining mode.
1121func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1122 t.mu.Lock()
1123 if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1124 t.mu.Unlock()
1125 // The transport is closing.
1126 return false, ErrConnClosing
1127 }
1128 sid := t.maxStreamID
1129 if !g.headsUp {
1130 // Stop accepting more streams now.
1131 t.state = draining
1132 if len(t.activeStreams) == 0 {
1133 g.closeConn = true
1134 }
1135 t.mu.Unlock()
1136 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1137 return false, err
1138 }
1139 if g.closeConn {
1140 // Abruptly close the connection following the GoAway (via
1141 // loopywriter). But flush out what's inside the buffer first.
1142 t.framer.writer.Flush()
1143 return false, fmt.Errorf("transport: Connection closing")
1144 }
1145 return true, nil
1146 }
1147 t.mu.Unlock()
1148 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1149 // Follow that with a ping and wait for the ack to come back or a timer
1150 // to expire. During this time accept new streams since they might have
1151 // originated before the GoAway reaches the client.
1152 // After getting the ack or timer expiration send out another GoAway this
1153 // time with an ID of the max stream server intends to process.
1154 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1155 return false, err
1156 }
1157 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1158 return false, err
1159 }
1160 go func() {
1161 timer := time.NewTimer(time.Minute)
1162 defer timer.Stop()
1163 select {
1164 case <-t.drainChan:
1165 case <-timer.C:
Don Newtone0d34a82019-11-14 10:58:06 -05001166 case <-t.done:
Don Newton98fd8812019-09-23 15:15:02 -04001167 return
1168 }
1169 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1170 }()
1171 return false, nil
1172}
1173
1174func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1175 s := channelz.SocketInternalMetric{
1176 StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
1177 StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
1178 StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
1179 MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
1180 MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
1181 KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
1182 LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1183 LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1184 LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1185 LocalFlowControlWindow: int64(t.fc.getSize()),
1186 SocketOptions: channelz.GetSocketOption(t.conn),
1187 LocalAddr: t.localAddr,
1188 RemoteAddr: t.remoteAddr,
1189 // RemoteName :
1190 }
1191 if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1192 s.Security = au.GetSecurityValue()
1193 }
1194 s.RemoteFlowControlWindow = t.getOutFlowWindow()
1195 return &s
1196}
1197
1198func (t *http2Server) IncrMsgSent() {
1199 atomic.AddInt64(&t.czData.msgSent, 1)
1200 atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1201}
1202
1203func (t *http2Server) IncrMsgRecv() {
1204 atomic.AddInt64(&t.czData.msgRecv, 1)
1205 atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1206}
1207
1208func (t *http2Server) getOutFlowWindow() int64 {
1209 resp := make(chan uint32, 1)
1210 timer := time.NewTimer(time.Second)
1211 defer timer.Stop()
1212 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1213 select {
1214 case sz := <-resp:
1215 return int64(sz)
Don Newtone0d34a82019-11-14 10:58:06 -05001216 case <-t.done:
Don Newton98fd8812019-09-23 15:15:02 -04001217 return -1
1218 case <-timer.C:
1219 return -2
1220 }
1221}
1222
1223func getJitter(v time.Duration) time.Duration {
1224 if v == infinity {
1225 return 0
1226 }
1227 // Generate a jitter between +/- 10% of the value.
1228 r := int64(v / 10)
1229 j := grpcrand.Int63n(2*r) - r
1230 return time.Duration(j)
1231}