khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2014 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | package transport |
| 20 | |
| 21 | import ( |
| 22 | "bytes" |
| 23 | "context" |
| 24 | "errors" |
| 25 | "fmt" |
| 26 | "io" |
| 27 | "math" |
| 28 | "net" |
| 29 | "strconv" |
| 30 | "sync" |
| 31 | "sync/atomic" |
| 32 | "time" |
| 33 | |
| 34 | "github.com/golang/protobuf/proto" |
| 35 | "golang.org/x/net/http2" |
| 36 | "golang.org/x/net/http2/hpack" |
| 37 | |
| 38 | "google.golang.org/grpc/codes" |
| 39 | "google.golang.org/grpc/credentials" |
| 40 | "google.golang.org/grpc/grpclog" |
| 41 | "google.golang.org/grpc/internal/channelz" |
| 42 | "google.golang.org/grpc/internal/grpcrand" |
| 43 | "google.golang.org/grpc/keepalive" |
| 44 | "google.golang.org/grpc/metadata" |
| 45 | "google.golang.org/grpc/peer" |
| 46 | "google.golang.org/grpc/stats" |
| 47 | "google.golang.org/grpc/status" |
| 48 | "google.golang.org/grpc/tap" |
| 49 | ) |
| 50 | |
| 51 | var ( |
| 52 | // ErrIllegalHeaderWrite indicates that setting header is illegal because of |
| 53 | // the stream's state. |
| 54 | ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") |
| 55 | // ErrHeaderListSizeLimitViolation indicates that the header list size is larger |
| 56 | // than the limit set by peer. |
| 57 | ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer") |
| 58 | ) |
| 59 | |
| 60 | // http2Server implements the ServerTransport interface with HTTP2. |
| 61 | type http2Server struct { |
| 62 | ctx context.Context |
| 63 | ctxDone <-chan struct{} // Cache the context.Done() chan |
| 64 | cancel context.CancelFunc |
| 65 | conn net.Conn |
| 66 | loopy *loopyWriter |
| 67 | readerDone chan struct{} // sync point to enable testing. |
| 68 | writerDone chan struct{} // sync point to enable testing. |
| 69 | remoteAddr net.Addr |
| 70 | localAddr net.Addr |
| 71 | maxStreamID uint32 // max stream ID ever seen |
| 72 | authInfo credentials.AuthInfo // auth info about the connection |
| 73 | inTapHandle tap.ServerInHandle |
| 74 | framer *framer |
| 75 | // The max number of concurrent streams. |
| 76 | maxStreams uint32 |
| 77 | // controlBuf delivers all the control related tasks (e.g., window |
| 78 | // updates, reset streams, and various settings) to the controller. |
| 79 | controlBuf *controlBuffer |
| 80 | fc *trInFlow |
| 81 | stats stats.Handler |
| 82 | // Flag to keep track of reading activity on transport. |
| 83 | // 1 is true and 0 is false. |
| 84 | activity uint32 // Accessed atomically. |
| 85 | // Keepalive and max-age parameters for the server. |
| 86 | kp keepalive.ServerParameters |
| 87 | |
| 88 | // Keepalive enforcement policy. |
| 89 | kep keepalive.EnforcementPolicy |
| 90 | // The time instance last ping was received. |
| 91 | lastPingAt time.Time |
| 92 | // Number of times the client has violated keepalive ping policy so far. |
| 93 | pingStrikes uint8 |
| 94 | // Flag to signify that number of ping strikes should be reset to 0. |
| 95 | // This is set whenever data or header frames are sent. |
| 96 | // 1 means yes. |
| 97 | resetPingStrikes uint32 // Accessed atomically. |
| 98 | initialWindowSize int32 |
| 99 | bdpEst *bdpEstimator |
| 100 | maxSendHeaderListSize *uint32 |
| 101 | |
| 102 | mu sync.Mutex // guard the following |
| 103 | |
| 104 | // drainChan is initialized when drain(...) is called the first time. |
| 105 | // After which the server writes out the first GoAway(with ID 2^31-1) frame. |
| 106 | // Then an independent goroutine will be launched to later send the second GoAway. |
| 107 | // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. |
| 108 | // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is |
| 109 | // already underway. |
| 110 | drainChan chan struct{} |
| 111 | state transportState |
| 112 | activeStreams map[uint32]*Stream |
| 113 | // idle is the time instant when the connection went idle. |
| 114 | // This is either the beginning of the connection or when the number of |
| 115 | // RPCs go down to 0. |
| 116 | // When the connection is busy, this value is set to 0. |
| 117 | idle time.Time |
| 118 | |
| 119 | // Fields below are for channelz metric collection. |
| 120 | channelzID int64 // channelz unique identification number |
| 121 | czData *channelzData |
| 122 | } |
| 123 | |
| 124 | // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is |
| 125 | // returned if something goes wrong. |
| 126 | func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { |
| 127 | writeBufSize := config.WriteBufferSize |
| 128 | readBufSize := config.ReadBufferSize |
| 129 | maxHeaderListSize := defaultServerMaxHeaderListSize |
| 130 | if config.MaxHeaderListSize != nil { |
| 131 | maxHeaderListSize = *config.MaxHeaderListSize |
| 132 | } |
| 133 | framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize) |
| 134 | // Send initial settings as connection preface to client. |
| 135 | var isettings []http2.Setting |
| 136 | // TODO(zhaoq): Have a better way to signal "no limit" because 0 is |
| 137 | // permitted in the HTTP2 spec. |
| 138 | maxStreams := config.MaxStreams |
| 139 | if maxStreams == 0 { |
| 140 | maxStreams = math.MaxUint32 |
| 141 | } else { |
| 142 | isettings = append(isettings, http2.Setting{ |
| 143 | ID: http2.SettingMaxConcurrentStreams, |
| 144 | Val: maxStreams, |
| 145 | }) |
| 146 | } |
| 147 | dynamicWindow := true |
| 148 | iwz := int32(initialWindowSize) |
| 149 | if config.InitialWindowSize >= defaultWindowSize { |
| 150 | iwz = config.InitialWindowSize |
| 151 | dynamicWindow = false |
| 152 | } |
| 153 | icwz := int32(initialWindowSize) |
| 154 | if config.InitialConnWindowSize >= defaultWindowSize { |
| 155 | icwz = config.InitialConnWindowSize |
| 156 | dynamicWindow = false |
| 157 | } |
| 158 | if iwz != defaultWindowSize { |
| 159 | isettings = append(isettings, http2.Setting{ |
| 160 | ID: http2.SettingInitialWindowSize, |
| 161 | Val: uint32(iwz)}) |
| 162 | } |
| 163 | if config.MaxHeaderListSize != nil { |
| 164 | isettings = append(isettings, http2.Setting{ |
| 165 | ID: http2.SettingMaxHeaderListSize, |
| 166 | Val: *config.MaxHeaderListSize, |
| 167 | }) |
| 168 | } |
| 169 | if err := framer.fr.WriteSettings(isettings...); err != nil { |
| 170 | return nil, connectionErrorf(false, err, "transport: %v", err) |
| 171 | } |
| 172 | // Adjust the connection flow control window if needed. |
| 173 | if delta := uint32(icwz - defaultWindowSize); delta > 0 { |
| 174 | if err := framer.fr.WriteWindowUpdate(0, delta); err != nil { |
| 175 | return nil, connectionErrorf(false, err, "transport: %v", err) |
| 176 | } |
| 177 | } |
| 178 | kp := config.KeepaliveParams |
| 179 | if kp.MaxConnectionIdle == 0 { |
| 180 | kp.MaxConnectionIdle = defaultMaxConnectionIdle |
| 181 | } |
| 182 | if kp.MaxConnectionAge == 0 { |
| 183 | kp.MaxConnectionAge = defaultMaxConnectionAge |
| 184 | } |
| 185 | // Add a jitter to MaxConnectionAge. |
| 186 | kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) |
| 187 | if kp.MaxConnectionAgeGrace == 0 { |
| 188 | kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace |
| 189 | } |
| 190 | if kp.Time == 0 { |
| 191 | kp.Time = defaultServerKeepaliveTime |
| 192 | } |
| 193 | if kp.Timeout == 0 { |
| 194 | kp.Timeout = defaultServerKeepaliveTimeout |
| 195 | } |
| 196 | kep := config.KeepalivePolicy |
| 197 | if kep.MinTime == 0 { |
| 198 | kep.MinTime = defaultKeepalivePolicyMinTime |
| 199 | } |
| 200 | ctx, cancel := context.WithCancel(context.Background()) |
| 201 | t := &http2Server{ |
| 202 | ctx: ctx, |
| 203 | cancel: cancel, |
| 204 | ctxDone: ctx.Done(), |
| 205 | conn: conn, |
| 206 | remoteAddr: conn.RemoteAddr(), |
| 207 | localAddr: conn.LocalAddr(), |
| 208 | authInfo: config.AuthInfo, |
| 209 | framer: framer, |
| 210 | readerDone: make(chan struct{}), |
| 211 | writerDone: make(chan struct{}), |
| 212 | maxStreams: maxStreams, |
| 213 | inTapHandle: config.InTapHandle, |
| 214 | fc: &trInFlow{limit: uint32(icwz)}, |
| 215 | state: reachable, |
| 216 | activeStreams: make(map[uint32]*Stream), |
| 217 | stats: config.StatsHandler, |
| 218 | kp: kp, |
| 219 | idle: time.Now(), |
| 220 | kep: kep, |
| 221 | initialWindowSize: iwz, |
| 222 | czData: new(channelzData), |
| 223 | } |
| 224 | t.controlBuf = newControlBuffer(t.ctxDone) |
| 225 | if dynamicWindow { |
| 226 | t.bdpEst = &bdpEstimator{ |
| 227 | bdp: initialWindowSize, |
| 228 | updateFlowControl: t.updateFlowControl, |
| 229 | } |
| 230 | } |
| 231 | if t.stats != nil { |
| 232 | t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ |
| 233 | RemoteAddr: t.remoteAddr, |
| 234 | LocalAddr: t.localAddr, |
| 235 | }) |
| 236 | connBegin := &stats.ConnBegin{} |
| 237 | t.stats.HandleConn(t.ctx, connBegin) |
| 238 | } |
| 239 | if channelz.IsOn() { |
| 240 | t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) |
| 241 | } |
| 242 | t.framer.writer.Flush() |
| 243 | |
| 244 | defer func() { |
| 245 | if err != nil { |
| 246 | t.Close() |
| 247 | } |
| 248 | }() |
| 249 | |
| 250 | // Check the validity of client preface. |
| 251 | preface := make([]byte, len(clientPreface)) |
| 252 | if _, err := io.ReadFull(t.conn, preface); err != nil { |
| 253 | return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) |
| 254 | } |
| 255 | if !bytes.Equal(preface, clientPreface) { |
| 256 | return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) |
| 257 | } |
| 258 | |
| 259 | frame, err := t.framer.fr.ReadFrame() |
| 260 | if err == io.EOF || err == io.ErrUnexpectedEOF { |
| 261 | return nil, err |
| 262 | } |
| 263 | if err != nil { |
| 264 | return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err) |
| 265 | } |
| 266 | atomic.StoreUint32(&t.activity, 1) |
| 267 | sf, ok := frame.(*http2.SettingsFrame) |
| 268 | if !ok { |
| 269 | return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) |
| 270 | } |
| 271 | t.handleSettings(sf) |
| 272 | |
| 273 | go func() { |
| 274 | t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst) |
| 275 | t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler |
| 276 | if err := t.loopy.run(); err != nil { |
| 277 | errorf("transport: loopyWriter.run returning. Err: %v", err) |
| 278 | } |
| 279 | t.conn.Close() |
| 280 | close(t.writerDone) |
| 281 | }() |
| 282 | go t.keepalive() |
| 283 | return t, nil |
| 284 | } |
| 285 | |
| 286 | // operateHeader takes action on the decoded headers. |
| 287 | func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { |
| 288 | streamID := frame.Header().StreamID |
| 289 | state := decodeState{serverSide: true} |
| 290 | if err := state.decodeHeader(frame); err != nil { |
| 291 | if se, ok := status.FromError(err); ok { |
| 292 | t.controlBuf.put(&cleanupStream{ |
| 293 | streamID: streamID, |
| 294 | rst: true, |
| 295 | rstCode: statusCodeConvTab[se.Code()], |
| 296 | onWrite: func() {}, |
| 297 | }) |
| 298 | } |
| 299 | return false |
| 300 | } |
| 301 | |
| 302 | buf := newRecvBuffer() |
| 303 | s := &Stream{ |
| 304 | id: streamID, |
| 305 | st: t, |
| 306 | buf: buf, |
| 307 | fc: &inFlow{limit: uint32(t.initialWindowSize)}, |
| 308 | recvCompress: state.encoding, |
| 309 | method: state.method, |
| 310 | contentSubtype: state.contentSubtype, |
| 311 | } |
| 312 | if frame.StreamEnded() { |
| 313 | // s is just created by the caller. No lock needed. |
| 314 | s.state = streamReadDone |
| 315 | } |
| 316 | if state.timeoutSet { |
| 317 | s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout) |
| 318 | } else { |
| 319 | s.ctx, s.cancel = context.WithCancel(t.ctx) |
| 320 | } |
| 321 | pr := &peer.Peer{ |
| 322 | Addr: t.remoteAddr, |
| 323 | } |
| 324 | // Attach Auth info if there is any. |
| 325 | if t.authInfo != nil { |
| 326 | pr.AuthInfo = t.authInfo |
| 327 | } |
| 328 | s.ctx = peer.NewContext(s.ctx, pr) |
| 329 | // Attach the received metadata to the context. |
| 330 | if len(state.mdata) > 0 { |
| 331 | s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata) |
| 332 | } |
| 333 | if state.statsTags != nil { |
| 334 | s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags) |
| 335 | } |
| 336 | if state.statsTrace != nil { |
| 337 | s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace) |
| 338 | } |
| 339 | if t.inTapHandle != nil { |
| 340 | var err error |
| 341 | info := &tap.Info{ |
| 342 | FullMethodName: state.method, |
| 343 | } |
| 344 | s.ctx, err = t.inTapHandle(s.ctx, info) |
| 345 | if err != nil { |
| 346 | warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err) |
| 347 | t.controlBuf.put(&cleanupStream{ |
| 348 | streamID: s.id, |
| 349 | rst: true, |
| 350 | rstCode: http2.ErrCodeRefusedStream, |
| 351 | onWrite: func() {}, |
| 352 | }) |
| 353 | return false |
| 354 | } |
| 355 | } |
| 356 | t.mu.Lock() |
| 357 | if t.state != reachable { |
| 358 | t.mu.Unlock() |
| 359 | return false |
| 360 | } |
| 361 | if uint32(len(t.activeStreams)) >= t.maxStreams { |
| 362 | t.mu.Unlock() |
| 363 | t.controlBuf.put(&cleanupStream{ |
| 364 | streamID: streamID, |
| 365 | rst: true, |
| 366 | rstCode: http2.ErrCodeRefusedStream, |
| 367 | onWrite: func() {}, |
| 368 | }) |
| 369 | return false |
| 370 | } |
| 371 | if streamID%2 != 1 || streamID <= t.maxStreamID { |
| 372 | t.mu.Unlock() |
| 373 | // illegal gRPC stream id. |
| 374 | errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) |
| 375 | return true |
| 376 | } |
| 377 | t.maxStreamID = streamID |
| 378 | t.activeStreams[streamID] = s |
| 379 | if len(t.activeStreams) == 1 { |
| 380 | t.idle = time.Time{} |
| 381 | } |
| 382 | t.mu.Unlock() |
| 383 | if channelz.IsOn() { |
| 384 | atomic.AddInt64(&t.czData.streamsStarted, 1) |
| 385 | atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) |
| 386 | } |
| 387 | s.requestRead = func(n int) { |
| 388 | t.adjustWindow(s, uint32(n)) |
| 389 | } |
| 390 | s.ctx = traceCtx(s.ctx, s.method) |
| 391 | if t.stats != nil { |
| 392 | s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) |
| 393 | inHeader := &stats.InHeader{ |
| 394 | FullMethod: s.method, |
| 395 | RemoteAddr: t.remoteAddr, |
| 396 | LocalAddr: t.localAddr, |
| 397 | Compression: s.recvCompress, |
| 398 | WireLength: int(frame.Header().Length), |
| 399 | } |
| 400 | t.stats.HandleRPC(s.ctx, inHeader) |
| 401 | } |
| 402 | s.ctxDone = s.ctx.Done() |
| 403 | s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) |
| 404 | s.trReader = &transportReader{ |
| 405 | reader: &recvBufferReader{ |
| 406 | ctx: s.ctx, |
| 407 | ctxDone: s.ctxDone, |
| 408 | recv: s.buf, |
| 409 | }, |
| 410 | windowHandler: func(n int) { |
| 411 | t.updateWindow(s, uint32(n)) |
| 412 | }, |
| 413 | } |
| 414 | // Register the stream with loopy. |
| 415 | t.controlBuf.put(®isterStream{ |
| 416 | streamID: s.id, |
| 417 | wq: s.wq, |
| 418 | }) |
| 419 | handle(s) |
| 420 | return false |
| 421 | } |
| 422 | |
| 423 | // HandleStreams receives incoming streams using the given handler. This is |
| 424 | // typically run in a separate goroutine. |
| 425 | // traceCtx attaches trace to ctx and returns the new context. |
| 426 | func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { |
| 427 | defer close(t.readerDone) |
| 428 | for { |
| 429 | frame, err := t.framer.fr.ReadFrame() |
| 430 | atomic.StoreUint32(&t.activity, 1) |
| 431 | if err != nil { |
| 432 | if se, ok := err.(http2.StreamError); ok { |
| 433 | warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se) |
| 434 | t.mu.Lock() |
| 435 | s := t.activeStreams[se.StreamID] |
| 436 | t.mu.Unlock() |
| 437 | if s != nil { |
| 438 | t.closeStream(s, true, se.Code, nil, false) |
| 439 | } else { |
| 440 | t.controlBuf.put(&cleanupStream{ |
| 441 | streamID: se.StreamID, |
| 442 | rst: true, |
| 443 | rstCode: se.Code, |
| 444 | onWrite: func() {}, |
| 445 | }) |
| 446 | } |
| 447 | continue |
| 448 | } |
| 449 | if err == io.EOF || err == io.ErrUnexpectedEOF { |
| 450 | t.Close() |
| 451 | return |
| 452 | } |
| 453 | warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) |
| 454 | t.Close() |
| 455 | return |
| 456 | } |
| 457 | switch frame := frame.(type) { |
| 458 | case *http2.MetaHeadersFrame: |
| 459 | if t.operateHeaders(frame, handle, traceCtx) { |
| 460 | t.Close() |
| 461 | break |
| 462 | } |
| 463 | case *http2.DataFrame: |
| 464 | t.handleData(frame) |
| 465 | case *http2.RSTStreamFrame: |
| 466 | t.handleRSTStream(frame) |
| 467 | case *http2.SettingsFrame: |
| 468 | t.handleSettings(frame) |
| 469 | case *http2.PingFrame: |
| 470 | t.handlePing(frame) |
| 471 | case *http2.WindowUpdateFrame: |
| 472 | t.handleWindowUpdate(frame) |
| 473 | case *http2.GoAwayFrame: |
| 474 | // TODO: Handle GoAway from the client appropriately. |
| 475 | default: |
| 476 | errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) |
| 477 | } |
| 478 | } |
| 479 | } |
| 480 | |
| 481 | func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { |
| 482 | t.mu.Lock() |
| 483 | defer t.mu.Unlock() |
| 484 | if t.activeStreams == nil { |
| 485 | // The transport is closing. |
| 486 | return nil, false |
| 487 | } |
| 488 | s, ok := t.activeStreams[f.Header().StreamID] |
| 489 | if !ok { |
| 490 | // The stream is already done. |
| 491 | return nil, false |
| 492 | } |
| 493 | return s, true |
| 494 | } |
| 495 | |
| 496 | // adjustWindow sends out extra window update over the initial window size |
| 497 | // of stream if the application is requesting data larger in size than |
| 498 | // the window. |
| 499 | func (t *http2Server) adjustWindow(s *Stream, n uint32) { |
| 500 | if w := s.fc.maybeAdjust(n); w > 0 { |
| 501 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) |
| 502 | } |
| 503 | |
| 504 | } |
| 505 | |
| 506 | // updateWindow adjusts the inbound quota for the stream and the transport. |
| 507 | // Window updates will deliver to the controller for sending when |
| 508 | // the cumulative quota exceeds the corresponding threshold. |
| 509 | func (t *http2Server) updateWindow(s *Stream, n uint32) { |
| 510 | if w := s.fc.onRead(n); w > 0 { |
| 511 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, |
| 512 | increment: w, |
| 513 | }) |
| 514 | } |
| 515 | } |
| 516 | |
| 517 | // updateFlowControl updates the incoming flow control windows |
| 518 | // for the transport and the stream based on the current bdp |
| 519 | // estimation. |
| 520 | func (t *http2Server) updateFlowControl(n uint32) { |
| 521 | t.mu.Lock() |
| 522 | for _, s := range t.activeStreams { |
| 523 | s.fc.newLimit(n) |
| 524 | } |
| 525 | t.initialWindowSize = int32(n) |
| 526 | t.mu.Unlock() |
| 527 | t.controlBuf.put(&outgoingWindowUpdate{ |
| 528 | streamID: 0, |
| 529 | increment: t.fc.newLimit(n), |
| 530 | }) |
| 531 | t.controlBuf.put(&outgoingSettings{ |
| 532 | ss: []http2.Setting{ |
| 533 | { |
| 534 | ID: http2.SettingInitialWindowSize, |
| 535 | Val: n, |
| 536 | }, |
| 537 | }, |
| 538 | }) |
| 539 | |
| 540 | } |
| 541 | |
| 542 | func (t *http2Server) handleData(f *http2.DataFrame) { |
| 543 | size := f.Header().Length |
| 544 | var sendBDPPing bool |
| 545 | if t.bdpEst != nil { |
| 546 | sendBDPPing = t.bdpEst.add(size) |
| 547 | } |
| 548 | // Decouple connection's flow control from application's read. |
| 549 | // An update on connection's flow control should not depend on |
| 550 | // whether user application has read the data or not. Such a |
| 551 | // restriction is already imposed on the stream's flow control, |
| 552 | // and therefore the sender will be blocked anyways. |
| 553 | // Decoupling the connection flow control will prevent other |
| 554 | // active(fast) streams from starving in presence of slow or |
| 555 | // inactive streams. |
| 556 | if w := t.fc.onData(size); w > 0 { |
| 557 | t.controlBuf.put(&outgoingWindowUpdate{ |
| 558 | streamID: 0, |
| 559 | increment: w, |
| 560 | }) |
| 561 | } |
| 562 | if sendBDPPing { |
| 563 | // Avoid excessive ping detection (e.g. in an L7 proxy) |
| 564 | // by sending a window update prior to the BDP ping. |
| 565 | if w := t.fc.reset(); w > 0 { |
| 566 | t.controlBuf.put(&outgoingWindowUpdate{ |
| 567 | streamID: 0, |
| 568 | increment: w, |
| 569 | }) |
| 570 | } |
| 571 | t.controlBuf.put(bdpPing) |
| 572 | } |
| 573 | // Select the right stream to dispatch. |
| 574 | s, ok := t.getStream(f) |
| 575 | if !ok { |
| 576 | return |
| 577 | } |
| 578 | if size > 0 { |
| 579 | if err := s.fc.onData(size); err != nil { |
| 580 | t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false) |
| 581 | return |
| 582 | } |
| 583 | if f.Header().Flags.Has(http2.FlagDataPadded) { |
| 584 | if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { |
| 585 | t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) |
| 586 | } |
| 587 | } |
| 588 | // TODO(bradfitz, zhaoq): A copy is required here because there is no |
| 589 | // guarantee f.Data() is consumed before the arrival of next frame. |
| 590 | // Can this copy be eliminated? |
| 591 | if len(f.Data()) > 0 { |
| 592 | data := make([]byte, len(f.Data())) |
| 593 | copy(data, f.Data()) |
| 594 | s.write(recvMsg{data: data}) |
| 595 | } |
| 596 | } |
| 597 | if f.Header().Flags.Has(http2.FlagDataEndStream) { |
| 598 | // Received the end of stream from the client. |
| 599 | s.compareAndSwapState(streamActive, streamReadDone) |
| 600 | s.write(recvMsg{err: io.EOF}) |
| 601 | } |
| 602 | } |
| 603 | |
| 604 | func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { |
| 605 | s, ok := t.getStream(f) |
| 606 | if !ok { |
| 607 | return |
| 608 | } |
| 609 | t.closeStream(s, false, 0, nil, false) |
| 610 | } |
| 611 | |
| 612 | func (t *http2Server) handleSettings(f *http2.SettingsFrame) { |
| 613 | if f.IsAck() { |
| 614 | return |
| 615 | } |
| 616 | var ss []http2.Setting |
| 617 | var updateFuncs []func() |
| 618 | f.ForeachSetting(func(s http2.Setting) error { |
| 619 | switch s.ID { |
| 620 | case http2.SettingMaxHeaderListSize: |
| 621 | updateFuncs = append(updateFuncs, func() { |
| 622 | t.maxSendHeaderListSize = new(uint32) |
| 623 | *t.maxSendHeaderListSize = s.Val |
| 624 | }) |
| 625 | default: |
| 626 | ss = append(ss, s) |
| 627 | } |
| 628 | return nil |
| 629 | }) |
| 630 | t.controlBuf.executeAndPut(func(interface{}) bool { |
| 631 | for _, f := range updateFuncs { |
| 632 | f() |
| 633 | } |
| 634 | return true |
| 635 | }, &incomingSettings{ |
| 636 | ss: ss, |
| 637 | }) |
| 638 | } |
| 639 | |
| 640 | const ( |
| 641 | maxPingStrikes = 2 |
| 642 | defaultPingTimeout = 2 * time.Hour |
| 643 | ) |
| 644 | |
| 645 | func (t *http2Server) handlePing(f *http2.PingFrame) { |
| 646 | if f.IsAck() { |
| 647 | if f.Data == goAwayPing.data && t.drainChan != nil { |
| 648 | close(t.drainChan) |
| 649 | return |
| 650 | } |
| 651 | // Maybe it's a BDP ping. |
| 652 | if t.bdpEst != nil { |
| 653 | t.bdpEst.calculate(f.Data) |
| 654 | } |
| 655 | return |
| 656 | } |
| 657 | pingAck := &ping{ack: true} |
| 658 | copy(pingAck.data[:], f.Data[:]) |
| 659 | t.controlBuf.put(pingAck) |
| 660 | |
| 661 | now := time.Now() |
| 662 | defer func() { |
| 663 | t.lastPingAt = now |
| 664 | }() |
| 665 | // A reset ping strikes means that we don't need to check for policy |
| 666 | // violation for this ping and the pingStrikes counter should be set |
| 667 | // to 0. |
| 668 | if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { |
| 669 | t.pingStrikes = 0 |
| 670 | return |
| 671 | } |
| 672 | t.mu.Lock() |
| 673 | ns := len(t.activeStreams) |
| 674 | t.mu.Unlock() |
| 675 | if ns < 1 && !t.kep.PermitWithoutStream { |
| 676 | // Keepalive shouldn't be active thus, this new ping should |
| 677 | // have come after at least defaultPingTimeout. |
| 678 | if t.lastPingAt.Add(defaultPingTimeout).After(now) { |
| 679 | t.pingStrikes++ |
| 680 | } |
| 681 | } else { |
| 682 | // Check if keepalive policy is respected. |
| 683 | if t.lastPingAt.Add(t.kep.MinTime).After(now) { |
| 684 | t.pingStrikes++ |
| 685 | } |
| 686 | } |
| 687 | |
| 688 | if t.pingStrikes > maxPingStrikes { |
| 689 | // Send goaway and close the connection. |
| 690 | errorf("transport: Got too many pings from the client, closing the connection.") |
| 691 | t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) |
| 692 | } |
| 693 | } |
| 694 | |
| 695 | func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { |
| 696 | t.controlBuf.put(&incomingWindowUpdate{ |
| 697 | streamID: f.Header().StreamID, |
| 698 | increment: f.Increment, |
| 699 | }) |
| 700 | } |
| 701 | |
| 702 | func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField { |
| 703 | for k, vv := range md { |
| 704 | if isReservedHeader(k) { |
| 705 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. |
| 706 | continue |
| 707 | } |
| 708 | for _, v := range vv { |
| 709 | headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) |
| 710 | } |
| 711 | } |
| 712 | return headerFields |
| 713 | } |
| 714 | |
| 715 | func (t *http2Server) checkForHeaderListSize(it interface{}) bool { |
| 716 | if t.maxSendHeaderListSize == nil { |
| 717 | return true |
| 718 | } |
| 719 | hdrFrame := it.(*headerFrame) |
| 720 | var sz int64 |
| 721 | for _, f := range hdrFrame.hf { |
| 722 | if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { |
| 723 | errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) |
| 724 | return false |
| 725 | } |
| 726 | } |
| 727 | return true |
| 728 | } |
| 729 | |
| 730 | // WriteHeader sends the header metedata md back to the client. |
| 731 | func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { |
| 732 | if s.updateHeaderSent() || s.getState() == streamDone { |
| 733 | return ErrIllegalHeaderWrite |
| 734 | } |
| 735 | s.hdrMu.Lock() |
| 736 | if md.Len() > 0 { |
| 737 | if s.header.Len() > 0 { |
| 738 | s.header = metadata.Join(s.header, md) |
| 739 | } else { |
| 740 | s.header = md |
| 741 | } |
| 742 | } |
| 743 | if err := t.writeHeaderLocked(s); err != nil { |
| 744 | s.hdrMu.Unlock() |
| 745 | return err |
| 746 | } |
| 747 | s.hdrMu.Unlock() |
| 748 | return nil |
| 749 | } |
| 750 | |
| 751 | func (t *http2Server) writeHeaderLocked(s *Stream) error { |
| 752 | // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields |
| 753 | // first and create a slice of that exact size. |
| 754 | headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. |
| 755 | headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) |
| 756 | headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) |
| 757 | if s.sendCompress != "" { |
| 758 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) |
| 759 | } |
| 760 | headerFields = appendHeaderFieldsFromMD(headerFields, s.header) |
| 761 | success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{ |
| 762 | streamID: s.id, |
| 763 | hf: headerFields, |
| 764 | endStream: false, |
| 765 | onWrite: func() { |
| 766 | atomic.StoreUint32(&t.resetPingStrikes, 1) |
| 767 | }, |
| 768 | }) |
| 769 | if !success { |
| 770 | if err != nil { |
| 771 | return err |
| 772 | } |
| 773 | t.closeStream(s, true, http2.ErrCodeInternal, nil, false) |
| 774 | return ErrHeaderListSizeLimitViolation |
| 775 | } |
| 776 | if t.stats != nil { |
| 777 | // Note: WireLength is not set in outHeader. |
| 778 | // TODO(mmukhi): Revisit this later, if needed. |
| 779 | outHeader := &stats.OutHeader{} |
| 780 | t.stats.HandleRPC(s.Context(), outHeader) |
| 781 | } |
| 782 | return nil |
| 783 | } |
| 784 | |
| 785 | // WriteStatus sends stream status to the client and terminates the stream. |
| 786 | // There is no further I/O operations being able to perform on this stream. |
| 787 | // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early |
| 788 | // OK is adopted. |
| 789 | func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { |
| 790 | if s.getState() == streamDone { |
| 791 | return nil |
| 792 | } |
| 793 | s.hdrMu.Lock() |
| 794 | // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields |
| 795 | // first and create a slice of that exact size. |
| 796 | headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. |
| 797 | if !s.updateHeaderSent() { // No headers have been sent. |
| 798 | if len(s.header) > 0 { // Send a separate header frame. |
| 799 | if err := t.writeHeaderLocked(s); err != nil { |
| 800 | s.hdrMu.Unlock() |
| 801 | return err |
| 802 | } |
| 803 | } else { // Send a trailer only response. |
| 804 | headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) |
| 805 | headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) |
| 806 | } |
| 807 | } |
| 808 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) |
| 809 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) |
| 810 | |
| 811 | if p := st.Proto(); p != nil && len(p.Details) > 0 { |
| 812 | stBytes, err := proto.Marshal(p) |
| 813 | if err != nil { |
| 814 | // TODO: return error instead, when callers are able to handle it. |
| 815 | grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err) |
| 816 | } else { |
| 817 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)}) |
| 818 | } |
| 819 | } |
| 820 | |
| 821 | // Attach the trailer metadata. |
| 822 | headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer) |
| 823 | trailingHeader := &headerFrame{ |
| 824 | streamID: s.id, |
| 825 | hf: headerFields, |
| 826 | endStream: true, |
| 827 | onWrite: func() { |
| 828 | atomic.StoreUint32(&t.resetPingStrikes, 1) |
| 829 | }, |
| 830 | } |
| 831 | s.hdrMu.Unlock() |
| 832 | success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader) |
| 833 | if !success { |
| 834 | if err != nil { |
| 835 | return err |
| 836 | } |
| 837 | t.closeStream(s, true, http2.ErrCodeInternal, nil, false) |
| 838 | return ErrHeaderListSizeLimitViolation |
| 839 | } |
| 840 | t.closeStream(s, false, 0, trailingHeader, true) |
| 841 | if t.stats != nil { |
| 842 | t.stats.HandleRPC(s.Context(), &stats.OutTrailer{}) |
| 843 | } |
| 844 | return nil |
| 845 | } |
| 846 | |
| 847 | // Write converts the data into HTTP2 data frame and sends it out. Non-nil error |
| 848 | // is returns if it fails (e.g., framing error, transport error). |
| 849 | func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { |
| 850 | if !s.isHeaderSent() { // Headers haven't been written yet. |
| 851 | if err := t.WriteHeader(s, nil); err != nil { |
| 852 | // TODO(mmukhi, dfawley): Make sure this is the right code to return. |
| 853 | return status.Errorf(codes.Internal, "transport: %v", err) |
| 854 | } |
| 855 | } else { |
| 856 | // Writing headers checks for this condition. |
| 857 | if s.getState() == streamDone { |
| 858 | // TODO(mmukhi, dfawley): Should the server write also return io.EOF? |
| 859 | s.cancel() |
| 860 | select { |
| 861 | case <-t.ctx.Done(): |
| 862 | return ErrConnClosing |
| 863 | default: |
| 864 | } |
| 865 | return ContextErr(s.ctx.Err()) |
| 866 | } |
| 867 | } |
| 868 | // Add some data to header frame so that we can equally distribute bytes across frames. |
| 869 | emptyLen := http2MaxFrameLen - len(hdr) |
| 870 | if emptyLen > len(data) { |
| 871 | emptyLen = len(data) |
| 872 | } |
| 873 | hdr = append(hdr, data[:emptyLen]...) |
| 874 | data = data[emptyLen:] |
| 875 | df := &dataFrame{ |
| 876 | streamID: s.id, |
| 877 | h: hdr, |
| 878 | d: data, |
| 879 | onEachWrite: func() { |
| 880 | atomic.StoreUint32(&t.resetPingStrikes, 1) |
| 881 | }, |
| 882 | } |
| 883 | if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { |
| 884 | select { |
| 885 | case <-t.ctx.Done(): |
| 886 | return ErrConnClosing |
| 887 | default: |
| 888 | } |
| 889 | return ContextErr(s.ctx.Err()) |
| 890 | } |
| 891 | return t.controlBuf.put(df) |
| 892 | } |
| 893 | |
| 894 | // keepalive running in a separate goroutine does the following: |
| 895 | // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. |
| 896 | // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. |
| 897 | // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. |
| 898 | // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection |
| 899 | // after an additional duration of keepalive.Timeout. |
| 900 | func (t *http2Server) keepalive() { |
| 901 | p := &ping{} |
| 902 | var pingSent bool |
| 903 | maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) |
| 904 | maxAge := time.NewTimer(t.kp.MaxConnectionAge) |
| 905 | keepalive := time.NewTimer(t.kp.Time) |
| 906 | // NOTE: All exit paths of this function should reset their |
| 907 | // respective timers. A failure to do so will cause the |
| 908 | // following clean-up to deadlock and eventually leak. |
| 909 | defer func() { |
| 910 | if !maxIdle.Stop() { |
| 911 | <-maxIdle.C |
| 912 | } |
| 913 | if !maxAge.Stop() { |
| 914 | <-maxAge.C |
| 915 | } |
| 916 | if !keepalive.Stop() { |
| 917 | <-keepalive.C |
| 918 | } |
| 919 | }() |
| 920 | for { |
| 921 | select { |
| 922 | case <-maxIdle.C: |
| 923 | t.mu.Lock() |
| 924 | idle := t.idle |
| 925 | if idle.IsZero() { // The connection is non-idle. |
| 926 | t.mu.Unlock() |
| 927 | maxIdle.Reset(t.kp.MaxConnectionIdle) |
| 928 | continue |
| 929 | } |
| 930 | val := t.kp.MaxConnectionIdle - time.Since(idle) |
| 931 | t.mu.Unlock() |
| 932 | if val <= 0 { |
| 933 | // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. |
| 934 | // Gracefully close the connection. |
| 935 | t.drain(http2.ErrCodeNo, []byte{}) |
| 936 | // Resetting the timer so that the clean-up doesn't deadlock. |
| 937 | maxIdle.Reset(infinity) |
| 938 | return |
| 939 | } |
| 940 | maxIdle.Reset(val) |
| 941 | case <-maxAge.C: |
| 942 | t.drain(http2.ErrCodeNo, []byte{}) |
| 943 | maxAge.Reset(t.kp.MaxConnectionAgeGrace) |
| 944 | select { |
| 945 | case <-maxAge.C: |
| 946 | // Close the connection after grace period. |
| 947 | t.Close() |
| 948 | // Resetting the timer so that the clean-up doesn't deadlock. |
| 949 | maxAge.Reset(infinity) |
| 950 | case <-t.ctx.Done(): |
| 951 | } |
| 952 | return |
| 953 | case <-keepalive.C: |
| 954 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { |
| 955 | pingSent = false |
| 956 | keepalive.Reset(t.kp.Time) |
| 957 | continue |
| 958 | } |
| 959 | if pingSent { |
| 960 | t.Close() |
| 961 | // Resetting the timer so that the clean-up doesn't deadlock. |
| 962 | keepalive.Reset(infinity) |
| 963 | return |
| 964 | } |
| 965 | pingSent = true |
| 966 | if channelz.IsOn() { |
| 967 | atomic.AddInt64(&t.czData.kpCount, 1) |
| 968 | } |
| 969 | t.controlBuf.put(p) |
| 970 | keepalive.Reset(t.kp.Timeout) |
| 971 | case <-t.ctx.Done(): |
| 972 | return |
| 973 | } |
| 974 | } |
| 975 | } |
| 976 | |
| 977 | // Close starts shutting down the http2Server transport. |
| 978 | // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This |
| 979 | // could cause some resource issue. Revisit this later. |
| 980 | func (t *http2Server) Close() error { |
| 981 | t.mu.Lock() |
| 982 | if t.state == closing { |
| 983 | t.mu.Unlock() |
| 984 | return errors.New("transport: Close() was already called") |
| 985 | } |
| 986 | t.state = closing |
| 987 | streams := t.activeStreams |
| 988 | t.activeStreams = nil |
| 989 | t.mu.Unlock() |
| 990 | t.controlBuf.finish() |
| 991 | t.cancel() |
| 992 | err := t.conn.Close() |
| 993 | if channelz.IsOn() { |
| 994 | channelz.RemoveEntry(t.channelzID) |
| 995 | } |
| 996 | // Cancel all active streams. |
| 997 | for _, s := range streams { |
| 998 | s.cancel() |
| 999 | } |
| 1000 | if t.stats != nil { |
| 1001 | connEnd := &stats.ConnEnd{} |
| 1002 | t.stats.HandleConn(t.ctx, connEnd) |
| 1003 | } |
| 1004 | return err |
| 1005 | } |
| 1006 | |
William Kurkian | daa6bb2 | 2019-03-07 12:26:28 -0500 | [diff] [blame] | 1007 | // deleteStream deletes the stream s from transport's active streams. |
| 1008 | func (t *http2Server) deleteStream(s *Stream, eosReceived bool) { |
| 1009 | t.mu.Lock() |
| 1010 | if _, ok := t.activeStreams[s.id]; !ok { |
| 1011 | t.mu.Unlock() |
| 1012 | return |
| 1013 | } |
| 1014 | |
| 1015 | delete(t.activeStreams, s.id) |
| 1016 | if len(t.activeStreams) == 0 { |
| 1017 | t.idle = time.Now() |
| 1018 | } |
| 1019 | t.mu.Unlock() |
| 1020 | |
| 1021 | if channelz.IsOn() { |
| 1022 | if eosReceived { |
| 1023 | atomic.AddInt64(&t.czData.streamsSucceeded, 1) |
| 1024 | } else { |
| 1025 | atomic.AddInt64(&t.czData.streamsFailed, 1) |
| 1026 | } |
| 1027 | } |
| 1028 | } |
| 1029 | |
khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame] | 1030 | // closeStream clears the footprint of a stream when the stream is not needed |
| 1031 | // any more. |
| 1032 | func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) { |
William Kurkian | daa6bb2 | 2019-03-07 12:26:28 -0500 | [diff] [blame] | 1033 | // Mark the stream as done |
| 1034 | oldState := s.swapState(streamDone) |
| 1035 | |
khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame] | 1036 | // In case stream sending and receiving are invoked in separate |
| 1037 | // goroutines (e.g., bi-directional streaming), cancel needs to be |
| 1038 | // called to interrupt the potential blocking on other goroutines. |
| 1039 | s.cancel() |
William Kurkian | daa6bb2 | 2019-03-07 12:26:28 -0500 | [diff] [blame] | 1040 | |
| 1041 | // Deletes the stream from active streams |
| 1042 | t.deleteStream(s, eosReceived) |
| 1043 | |
khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame] | 1044 | cleanup := &cleanupStream{ |
| 1045 | streamID: s.id, |
| 1046 | rst: rst, |
| 1047 | rstCode: rstCode, |
William Kurkian | daa6bb2 | 2019-03-07 12:26:28 -0500 | [diff] [blame] | 1048 | onWrite: func() {}, |
khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame] | 1049 | } |
William Kurkian | daa6bb2 | 2019-03-07 12:26:28 -0500 | [diff] [blame] | 1050 | |
| 1051 | // No trailer. Puts cleanupFrame into transport's control buffer. |
| 1052 | if hdr == nil { |
khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame] | 1053 | t.controlBuf.put(cleanup) |
William Kurkian | daa6bb2 | 2019-03-07 12:26:28 -0500 | [diff] [blame] | 1054 | return |
khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame] | 1055 | } |
William Kurkian | daa6bb2 | 2019-03-07 12:26:28 -0500 | [diff] [blame] | 1056 | |
| 1057 | // We do the check here, because of the following scenario: |
| 1058 | // 1. closeStream is called first with a trailer. A trailer item with a piggybacked cleanup item |
| 1059 | // is put to control buffer. |
| 1060 | // 2. Loopy writer is waiting on a stream quota. It will never get it because client errored at |
| 1061 | // some point. So loopy can't act on trailer |
| 1062 | // 3. Client sends a RST_STREAM due to the error. Then closeStream is called without a trailer as |
| 1063 | // the result of the received RST_STREAM. |
| 1064 | // If we do this check at the beginning of the closeStream, then we won't put a cleanup item in |
| 1065 | // response to received RST_STREAM into the control buffer and outStream in loopy writer will |
| 1066 | // never get cleaned up. |
| 1067 | |
| 1068 | // If the stream is already done, don't send the trailer. |
| 1069 | if oldState == streamDone { |
| 1070 | return |
| 1071 | } |
| 1072 | |
| 1073 | hdr.cleanup = cleanup |
| 1074 | t.controlBuf.put(hdr) |
khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame] | 1075 | } |
| 1076 | |
| 1077 | func (t *http2Server) RemoteAddr() net.Addr { |
| 1078 | return t.remoteAddr |
| 1079 | } |
| 1080 | |
| 1081 | func (t *http2Server) Drain() { |
| 1082 | t.drain(http2.ErrCodeNo, []byte{}) |
| 1083 | } |
| 1084 | |
| 1085 | func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { |
| 1086 | t.mu.Lock() |
| 1087 | defer t.mu.Unlock() |
| 1088 | if t.drainChan != nil { |
| 1089 | return |
| 1090 | } |
| 1091 | t.drainChan = make(chan struct{}) |
| 1092 | t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) |
| 1093 | } |
| 1094 | |
| 1095 | var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} |
| 1096 | |
| 1097 | // Handles outgoing GoAway and returns true if loopy needs to put itself |
| 1098 | // in draining mode. |
| 1099 | func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { |
| 1100 | t.mu.Lock() |
| 1101 | if t.state == closing { // TODO(mmukhi): This seems unnecessary. |
| 1102 | t.mu.Unlock() |
| 1103 | // The transport is closing. |
| 1104 | return false, ErrConnClosing |
| 1105 | } |
| 1106 | sid := t.maxStreamID |
| 1107 | if !g.headsUp { |
| 1108 | // Stop accepting more streams now. |
| 1109 | t.state = draining |
| 1110 | if len(t.activeStreams) == 0 { |
| 1111 | g.closeConn = true |
| 1112 | } |
| 1113 | t.mu.Unlock() |
| 1114 | if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil { |
| 1115 | return false, err |
| 1116 | } |
| 1117 | if g.closeConn { |
| 1118 | // Abruptly close the connection following the GoAway (via |
| 1119 | // loopywriter). But flush out what's inside the buffer first. |
| 1120 | t.framer.writer.Flush() |
| 1121 | return false, fmt.Errorf("transport: Connection closing") |
| 1122 | } |
| 1123 | return true, nil |
| 1124 | } |
| 1125 | t.mu.Unlock() |
| 1126 | // For a graceful close, send out a GoAway with stream ID of MaxUInt32, |
| 1127 | // Follow that with a ping and wait for the ack to come back or a timer |
| 1128 | // to expire. During this time accept new streams since they might have |
| 1129 | // originated before the GoAway reaches the client. |
| 1130 | // After getting the ack or timer expiration send out another GoAway this |
| 1131 | // time with an ID of the max stream server intends to process. |
| 1132 | if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil { |
| 1133 | return false, err |
| 1134 | } |
| 1135 | if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil { |
| 1136 | return false, err |
| 1137 | } |
| 1138 | go func() { |
| 1139 | timer := time.NewTimer(time.Minute) |
| 1140 | defer timer.Stop() |
| 1141 | select { |
| 1142 | case <-t.drainChan: |
| 1143 | case <-timer.C: |
| 1144 | case <-t.ctx.Done(): |
| 1145 | return |
| 1146 | } |
| 1147 | t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData}) |
| 1148 | }() |
| 1149 | return false, nil |
| 1150 | } |
| 1151 | |
| 1152 | func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric { |
| 1153 | s := channelz.SocketInternalMetric{ |
| 1154 | StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted), |
| 1155 | StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded), |
| 1156 | StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed), |
| 1157 | MessagesSent: atomic.LoadInt64(&t.czData.msgSent), |
| 1158 | MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv), |
| 1159 | KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount), |
| 1160 | LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), |
| 1161 | LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), |
| 1162 | LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), |
| 1163 | LocalFlowControlWindow: int64(t.fc.getSize()), |
| 1164 | SocketOptions: channelz.GetSocketOption(t.conn), |
| 1165 | LocalAddr: t.localAddr, |
| 1166 | RemoteAddr: t.remoteAddr, |
| 1167 | // RemoteName : |
| 1168 | } |
| 1169 | if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { |
| 1170 | s.Security = au.GetSecurityValue() |
| 1171 | } |
| 1172 | s.RemoteFlowControlWindow = t.getOutFlowWindow() |
| 1173 | return &s |
| 1174 | } |
| 1175 | |
| 1176 | func (t *http2Server) IncrMsgSent() { |
| 1177 | atomic.AddInt64(&t.czData.msgSent, 1) |
| 1178 | atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) |
| 1179 | } |
| 1180 | |
| 1181 | func (t *http2Server) IncrMsgRecv() { |
| 1182 | atomic.AddInt64(&t.czData.msgRecv, 1) |
| 1183 | atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) |
| 1184 | } |
| 1185 | |
| 1186 | func (t *http2Server) getOutFlowWindow() int64 { |
William Kurkian | daa6bb2 | 2019-03-07 12:26:28 -0500 | [diff] [blame] | 1187 | resp := make(chan uint32, 1) |
khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame] | 1188 | timer := time.NewTimer(time.Second) |
| 1189 | defer timer.Stop() |
| 1190 | t.controlBuf.put(&outFlowControlSizeRequest{resp}) |
| 1191 | select { |
| 1192 | case sz := <-resp: |
| 1193 | return int64(sz) |
| 1194 | case <-t.ctxDone: |
| 1195 | return -1 |
| 1196 | case <-timer.C: |
| 1197 | return -2 |
| 1198 | } |
| 1199 | } |
| 1200 | |
| 1201 | func getJitter(v time.Duration) time.Duration { |
| 1202 | if v == infinity { |
| 1203 | return 0 |
| 1204 | } |
| 1205 | // Generate a jitter between +/- 10% of the value. |
| 1206 | r := int64(v / 10) |
| 1207 | j := grpcrand.Int63n(2*r) - r |
| 1208 | return time.Duration(j) |
| 1209 | } |