onkarkundargi | 72cfd36 | 2020-02-27 12:34:37 +0530 | [diff] [blame^] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2014 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | package transport |
| 20 | |
| 21 | import ( |
| 22 | "bytes" |
| 23 | "context" |
| 24 | "errors" |
| 25 | "fmt" |
| 26 | "io" |
| 27 | "math" |
| 28 | "net" |
| 29 | "strconv" |
| 30 | "sync" |
| 31 | "sync/atomic" |
| 32 | "time" |
| 33 | |
| 34 | "github.com/golang/protobuf/proto" |
| 35 | "golang.org/x/net/http2" |
| 36 | "golang.org/x/net/http2/hpack" |
| 37 | |
| 38 | spb "google.golang.org/genproto/googleapis/rpc/status" |
| 39 | "google.golang.org/grpc/codes" |
| 40 | "google.golang.org/grpc/credentials" |
| 41 | "google.golang.org/grpc/grpclog" |
| 42 | "google.golang.org/grpc/internal" |
| 43 | "google.golang.org/grpc/internal/channelz" |
| 44 | "google.golang.org/grpc/internal/grpcrand" |
| 45 | "google.golang.org/grpc/keepalive" |
| 46 | "google.golang.org/grpc/metadata" |
| 47 | "google.golang.org/grpc/peer" |
| 48 | "google.golang.org/grpc/stats" |
| 49 | "google.golang.org/grpc/status" |
| 50 | "google.golang.org/grpc/tap" |
| 51 | ) |
| 52 | |
| 53 | var ( |
| 54 | // ErrIllegalHeaderWrite indicates that setting header is illegal because of |
| 55 | // the stream's state. |
| 56 | ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") |
| 57 | // ErrHeaderListSizeLimitViolation indicates that the header list size is larger |
| 58 | // than the limit set by peer. |
| 59 | ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer") |
| 60 | // statusRawProto is a function to get to the raw status proto wrapped in a |
| 61 | // status.Status without a proto.Clone(). |
| 62 | statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status) |
| 63 | ) |
| 64 | |
| 65 | // serverConnectionCounter counts the number of connections a server has seen |
| 66 | // (equal to the number of http2Servers created). Must be accessed atomically. |
| 67 | var serverConnectionCounter uint64 |
| 68 | |
| 69 | // http2Server implements the ServerTransport interface with HTTP2. |
| 70 | type http2Server struct { |
| 71 | lastRead int64 // Keep this field 64-bit aligned. Accessed atomically. |
| 72 | ctx context.Context |
| 73 | done chan struct{} |
| 74 | conn net.Conn |
| 75 | loopy *loopyWriter |
| 76 | readerDone chan struct{} // sync point to enable testing. |
| 77 | writerDone chan struct{} // sync point to enable testing. |
| 78 | remoteAddr net.Addr |
| 79 | localAddr net.Addr |
| 80 | maxStreamID uint32 // max stream ID ever seen |
| 81 | authInfo credentials.AuthInfo // auth info about the connection |
| 82 | inTapHandle tap.ServerInHandle |
| 83 | framer *framer |
| 84 | // The max number of concurrent streams. |
| 85 | maxStreams uint32 |
| 86 | // controlBuf delivers all the control related tasks (e.g., window |
| 87 | // updates, reset streams, and various settings) to the controller. |
| 88 | controlBuf *controlBuffer |
| 89 | fc *trInFlow |
| 90 | stats stats.Handler |
| 91 | // Keepalive and max-age parameters for the server. |
| 92 | kp keepalive.ServerParameters |
| 93 | // Keepalive enforcement policy. |
| 94 | kep keepalive.EnforcementPolicy |
| 95 | // The time instance last ping was received. |
| 96 | lastPingAt time.Time |
| 97 | // Number of times the client has violated keepalive ping policy so far. |
| 98 | pingStrikes uint8 |
| 99 | // Flag to signify that number of ping strikes should be reset to 0. |
| 100 | // This is set whenever data or header frames are sent. |
| 101 | // 1 means yes. |
| 102 | resetPingStrikes uint32 // Accessed atomically. |
| 103 | initialWindowSize int32 |
| 104 | bdpEst *bdpEstimator |
| 105 | maxSendHeaderListSize *uint32 |
| 106 | |
| 107 | mu sync.Mutex // guard the following |
| 108 | |
| 109 | // drainChan is initialized when drain(...) is called the first time. |
| 110 | // After which the server writes out the first GoAway(with ID 2^31-1) frame. |
| 111 | // Then an independent goroutine will be launched to later send the second GoAway. |
| 112 | // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. |
| 113 | // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is |
| 114 | // already underway. |
| 115 | drainChan chan struct{} |
| 116 | state transportState |
| 117 | activeStreams map[uint32]*Stream |
| 118 | // idle is the time instant when the connection went idle. |
| 119 | // This is either the beginning of the connection or when the number of |
| 120 | // RPCs go down to 0. |
| 121 | // When the connection is busy, this value is set to 0. |
| 122 | idle time.Time |
| 123 | |
| 124 | // Fields below are for channelz metric collection. |
| 125 | channelzID int64 // channelz unique identification number |
| 126 | czData *channelzData |
| 127 | bufferPool *bufferPool |
| 128 | |
| 129 | connectionID uint64 |
| 130 | } |
| 131 | |
| 132 | // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is |
| 133 | // returned if something goes wrong. |
| 134 | func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { |
| 135 | writeBufSize := config.WriteBufferSize |
| 136 | readBufSize := config.ReadBufferSize |
| 137 | maxHeaderListSize := defaultServerMaxHeaderListSize |
| 138 | if config.MaxHeaderListSize != nil { |
| 139 | maxHeaderListSize = *config.MaxHeaderListSize |
| 140 | } |
| 141 | framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize) |
| 142 | // Send initial settings as connection preface to client. |
| 143 | isettings := []http2.Setting{{ |
| 144 | ID: http2.SettingMaxFrameSize, |
| 145 | Val: http2MaxFrameLen, |
| 146 | }} |
| 147 | // TODO(zhaoq): Have a better way to signal "no limit" because 0 is |
| 148 | // permitted in the HTTP2 spec. |
| 149 | maxStreams := config.MaxStreams |
| 150 | if maxStreams == 0 { |
| 151 | maxStreams = math.MaxUint32 |
| 152 | } else { |
| 153 | isettings = append(isettings, http2.Setting{ |
| 154 | ID: http2.SettingMaxConcurrentStreams, |
| 155 | Val: maxStreams, |
| 156 | }) |
| 157 | } |
| 158 | dynamicWindow := true |
| 159 | iwz := int32(initialWindowSize) |
| 160 | if config.InitialWindowSize >= defaultWindowSize { |
| 161 | iwz = config.InitialWindowSize |
| 162 | dynamicWindow = false |
| 163 | } |
| 164 | icwz := int32(initialWindowSize) |
| 165 | if config.InitialConnWindowSize >= defaultWindowSize { |
| 166 | icwz = config.InitialConnWindowSize |
| 167 | dynamicWindow = false |
| 168 | } |
| 169 | if iwz != defaultWindowSize { |
| 170 | isettings = append(isettings, http2.Setting{ |
| 171 | ID: http2.SettingInitialWindowSize, |
| 172 | Val: uint32(iwz)}) |
| 173 | } |
| 174 | if config.MaxHeaderListSize != nil { |
| 175 | isettings = append(isettings, http2.Setting{ |
| 176 | ID: http2.SettingMaxHeaderListSize, |
| 177 | Val: *config.MaxHeaderListSize, |
| 178 | }) |
| 179 | } |
| 180 | if config.HeaderTableSize != nil { |
| 181 | isettings = append(isettings, http2.Setting{ |
| 182 | ID: http2.SettingHeaderTableSize, |
| 183 | Val: *config.HeaderTableSize, |
| 184 | }) |
| 185 | } |
| 186 | if err := framer.fr.WriteSettings(isettings...); err != nil { |
| 187 | return nil, connectionErrorf(false, err, "transport: %v", err) |
| 188 | } |
| 189 | // Adjust the connection flow control window if needed. |
| 190 | if delta := uint32(icwz - defaultWindowSize); delta > 0 { |
| 191 | if err := framer.fr.WriteWindowUpdate(0, delta); err != nil { |
| 192 | return nil, connectionErrorf(false, err, "transport: %v", err) |
| 193 | } |
| 194 | } |
| 195 | kp := config.KeepaliveParams |
| 196 | if kp.MaxConnectionIdle == 0 { |
| 197 | kp.MaxConnectionIdle = defaultMaxConnectionIdle |
| 198 | } |
| 199 | if kp.MaxConnectionAge == 0 { |
| 200 | kp.MaxConnectionAge = defaultMaxConnectionAge |
| 201 | } |
| 202 | // Add a jitter to MaxConnectionAge. |
| 203 | kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) |
| 204 | if kp.MaxConnectionAgeGrace == 0 { |
| 205 | kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace |
| 206 | } |
| 207 | if kp.Time == 0 { |
| 208 | kp.Time = defaultServerKeepaliveTime |
| 209 | } |
| 210 | if kp.Timeout == 0 { |
| 211 | kp.Timeout = defaultServerKeepaliveTimeout |
| 212 | } |
| 213 | kep := config.KeepalivePolicy |
| 214 | if kep.MinTime == 0 { |
| 215 | kep.MinTime = defaultKeepalivePolicyMinTime |
| 216 | } |
| 217 | done := make(chan struct{}) |
| 218 | t := &http2Server{ |
| 219 | ctx: context.Background(), |
| 220 | done: done, |
| 221 | conn: conn, |
| 222 | remoteAddr: conn.RemoteAddr(), |
| 223 | localAddr: conn.LocalAddr(), |
| 224 | authInfo: config.AuthInfo, |
| 225 | framer: framer, |
| 226 | readerDone: make(chan struct{}), |
| 227 | writerDone: make(chan struct{}), |
| 228 | maxStreams: maxStreams, |
| 229 | inTapHandle: config.InTapHandle, |
| 230 | fc: &trInFlow{limit: uint32(icwz)}, |
| 231 | state: reachable, |
| 232 | activeStreams: make(map[uint32]*Stream), |
| 233 | stats: config.StatsHandler, |
| 234 | kp: kp, |
| 235 | idle: time.Now(), |
| 236 | kep: kep, |
| 237 | initialWindowSize: iwz, |
| 238 | czData: new(channelzData), |
| 239 | bufferPool: newBufferPool(), |
| 240 | } |
| 241 | t.controlBuf = newControlBuffer(t.done) |
| 242 | if dynamicWindow { |
| 243 | t.bdpEst = &bdpEstimator{ |
| 244 | bdp: initialWindowSize, |
| 245 | updateFlowControl: t.updateFlowControl, |
| 246 | } |
| 247 | } |
| 248 | if t.stats != nil { |
| 249 | t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ |
| 250 | RemoteAddr: t.remoteAddr, |
| 251 | LocalAddr: t.localAddr, |
| 252 | }) |
| 253 | connBegin := &stats.ConnBegin{} |
| 254 | t.stats.HandleConn(t.ctx, connBegin) |
| 255 | } |
| 256 | if channelz.IsOn() { |
| 257 | t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) |
| 258 | } |
| 259 | |
| 260 | t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1) |
| 261 | |
| 262 | t.framer.writer.Flush() |
| 263 | |
| 264 | defer func() { |
| 265 | if err != nil { |
| 266 | t.Close() |
| 267 | } |
| 268 | }() |
| 269 | |
| 270 | // Check the validity of client preface. |
| 271 | preface := make([]byte, len(clientPreface)) |
| 272 | if _, err := io.ReadFull(t.conn, preface); err != nil { |
| 273 | return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) |
| 274 | } |
| 275 | if !bytes.Equal(preface, clientPreface) { |
| 276 | return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) |
| 277 | } |
| 278 | |
| 279 | frame, err := t.framer.fr.ReadFrame() |
| 280 | if err == io.EOF || err == io.ErrUnexpectedEOF { |
| 281 | return nil, err |
| 282 | } |
| 283 | if err != nil { |
| 284 | return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err) |
| 285 | } |
| 286 | atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) |
| 287 | sf, ok := frame.(*http2.SettingsFrame) |
| 288 | if !ok { |
| 289 | return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) |
| 290 | } |
| 291 | t.handleSettings(sf) |
| 292 | |
| 293 | go func() { |
| 294 | t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst) |
| 295 | t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler |
| 296 | if err := t.loopy.run(); err != nil { |
| 297 | errorf("transport: loopyWriter.run returning. Err: %v", err) |
| 298 | } |
| 299 | t.conn.Close() |
| 300 | close(t.writerDone) |
| 301 | }() |
| 302 | go t.keepalive() |
| 303 | return t, nil |
| 304 | } |
| 305 | |
| 306 | // operateHeader takes action on the decoded headers. |
| 307 | func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { |
| 308 | streamID := frame.Header().StreamID |
| 309 | state := &decodeState{ |
| 310 | serverSide: true, |
| 311 | } |
| 312 | if err := state.decodeHeader(frame); err != nil { |
| 313 | if se, ok := status.FromError(err); ok { |
| 314 | t.controlBuf.put(&cleanupStream{ |
| 315 | streamID: streamID, |
| 316 | rst: true, |
| 317 | rstCode: statusCodeConvTab[se.Code()], |
| 318 | onWrite: func() {}, |
| 319 | }) |
| 320 | } |
| 321 | return false |
| 322 | } |
| 323 | |
| 324 | buf := newRecvBuffer() |
| 325 | s := &Stream{ |
| 326 | id: streamID, |
| 327 | st: t, |
| 328 | buf: buf, |
| 329 | fc: &inFlow{limit: uint32(t.initialWindowSize)}, |
| 330 | recvCompress: state.data.encoding, |
| 331 | method: state.data.method, |
| 332 | contentSubtype: state.data.contentSubtype, |
| 333 | } |
| 334 | if frame.StreamEnded() { |
| 335 | // s is just created by the caller. No lock needed. |
| 336 | s.state = streamReadDone |
| 337 | } |
| 338 | if state.data.timeoutSet { |
| 339 | s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout) |
| 340 | } else { |
| 341 | s.ctx, s.cancel = context.WithCancel(t.ctx) |
| 342 | } |
| 343 | pr := &peer.Peer{ |
| 344 | Addr: t.remoteAddr, |
| 345 | } |
| 346 | // Attach Auth info if there is any. |
| 347 | if t.authInfo != nil { |
| 348 | pr.AuthInfo = t.authInfo |
| 349 | } |
| 350 | s.ctx = peer.NewContext(s.ctx, pr) |
| 351 | // Attach the received metadata to the context. |
| 352 | if len(state.data.mdata) > 0 { |
| 353 | s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata) |
| 354 | } |
| 355 | if state.data.statsTags != nil { |
| 356 | s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags) |
| 357 | } |
| 358 | if state.data.statsTrace != nil { |
| 359 | s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace) |
| 360 | } |
| 361 | if t.inTapHandle != nil { |
| 362 | var err error |
| 363 | info := &tap.Info{ |
| 364 | FullMethodName: state.data.method, |
| 365 | } |
| 366 | s.ctx, err = t.inTapHandle(s.ctx, info) |
| 367 | if err != nil { |
| 368 | warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err) |
| 369 | t.controlBuf.put(&cleanupStream{ |
| 370 | streamID: s.id, |
| 371 | rst: true, |
| 372 | rstCode: http2.ErrCodeRefusedStream, |
| 373 | onWrite: func() {}, |
| 374 | }) |
| 375 | s.cancel() |
| 376 | return false |
| 377 | } |
| 378 | } |
| 379 | t.mu.Lock() |
| 380 | if t.state != reachable { |
| 381 | t.mu.Unlock() |
| 382 | s.cancel() |
| 383 | return false |
| 384 | } |
| 385 | if uint32(len(t.activeStreams)) >= t.maxStreams { |
| 386 | t.mu.Unlock() |
| 387 | t.controlBuf.put(&cleanupStream{ |
| 388 | streamID: streamID, |
| 389 | rst: true, |
| 390 | rstCode: http2.ErrCodeRefusedStream, |
| 391 | onWrite: func() {}, |
| 392 | }) |
| 393 | s.cancel() |
| 394 | return false |
| 395 | } |
| 396 | if streamID%2 != 1 || streamID <= t.maxStreamID { |
| 397 | t.mu.Unlock() |
| 398 | // illegal gRPC stream id. |
| 399 | errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) |
| 400 | s.cancel() |
| 401 | return true |
| 402 | } |
| 403 | t.maxStreamID = streamID |
| 404 | t.activeStreams[streamID] = s |
| 405 | if len(t.activeStreams) == 1 { |
| 406 | t.idle = time.Time{} |
| 407 | } |
| 408 | t.mu.Unlock() |
| 409 | if channelz.IsOn() { |
| 410 | atomic.AddInt64(&t.czData.streamsStarted, 1) |
| 411 | atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) |
| 412 | } |
| 413 | s.requestRead = func(n int) { |
| 414 | t.adjustWindow(s, uint32(n)) |
| 415 | } |
| 416 | s.ctx = traceCtx(s.ctx, s.method) |
| 417 | if t.stats != nil { |
| 418 | s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) |
| 419 | inHeader := &stats.InHeader{ |
| 420 | FullMethod: s.method, |
| 421 | RemoteAddr: t.remoteAddr, |
| 422 | LocalAddr: t.localAddr, |
| 423 | Compression: s.recvCompress, |
| 424 | WireLength: int(frame.Header().Length), |
| 425 | Header: metadata.MD(state.data.mdata).Copy(), |
| 426 | } |
| 427 | t.stats.HandleRPC(s.ctx, inHeader) |
| 428 | } |
| 429 | s.ctxDone = s.ctx.Done() |
| 430 | s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) |
| 431 | s.trReader = &transportReader{ |
| 432 | reader: &recvBufferReader{ |
| 433 | ctx: s.ctx, |
| 434 | ctxDone: s.ctxDone, |
| 435 | recv: s.buf, |
| 436 | freeBuffer: t.bufferPool.put, |
| 437 | }, |
| 438 | windowHandler: func(n int) { |
| 439 | t.updateWindow(s, uint32(n)) |
| 440 | }, |
| 441 | } |
| 442 | // Register the stream with loopy. |
| 443 | t.controlBuf.put(®isterStream{ |
| 444 | streamID: s.id, |
| 445 | wq: s.wq, |
| 446 | }) |
| 447 | handle(s) |
| 448 | return false |
| 449 | } |
| 450 | |
| 451 | // HandleStreams receives incoming streams using the given handler. This is |
| 452 | // typically run in a separate goroutine. |
| 453 | // traceCtx attaches trace to ctx and returns the new context. |
| 454 | func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { |
| 455 | defer close(t.readerDone) |
| 456 | for { |
| 457 | t.controlBuf.throttle() |
| 458 | frame, err := t.framer.fr.ReadFrame() |
| 459 | atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) |
| 460 | if err != nil { |
| 461 | if se, ok := err.(http2.StreamError); ok { |
| 462 | warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se) |
| 463 | t.mu.Lock() |
| 464 | s := t.activeStreams[se.StreamID] |
| 465 | t.mu.Unlock() |
| 466 | if s != nil { |
| 467 | t.closeStream(s, true, se.Code, false) |
| 468 | } else { |
| 469 | t.controlBuf.put(&cleanupStream{ |
| 470 | streamID: se.StreamID, |
| 471 | rst: true, |
| 472 | rstCode: se.Code, |
| 473 | onWrite: func() {}, |
| 474 | }) |
| 475 | } |
| 476 | continue |
| 477 | } |
| 478 | if err == io.EOF || err == io.ErrUnexpectedEOF { |
| 479 | t.Close() |
| 480 | return |
| 481 | } |
| 482 | warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) |
| 483 | t.Close() |
| 484 | return |
| 485 | } |
| 486 | switch frame := frame.(type) { |
| 487 | case *http2.MetaHeadersFrame: |
| 488 | if t.operateHeaders(frame, handle, traceCtx) { |
| 489 | t.Close() |
| 490 | break |
| 491 | } |
| 492 | case *http2.DataFrame: |
| 493 | t.handleData(frame) |
| 494 | case *http2.RSTStreamFrame: |
| 495 | t.handleRSTStream(frame) |
| 496 | case *http2.SettingsFrame: |
| 497 | t.handleSettings(frame) |
| 498 | case *http2.PingFrame: |
| 499 | t.handlePing(frame) |
| 500 | case *http2.WindowUpdateFrame: |
| 501 | t.handleWindowUpdate(frame) |
| 502 | case *http2.GoAwayFrame: |
| 503 | // TODO: Handle GoAway from the client appropriately. |
| 504 | default: |
| 505 | errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) |
| 506 | } |
| 507 | } |
| 508 | } |
| 509 | |
| 510 | func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { |
| 511 | t.mu.Lock() |
| 512 | defer t.mu.Unlock() |
| 513 | if t.activeStreams == nil { |
| 514 | // The transport is closing. |
| 515 | return nil, false |
| 516 | } |
| 517 | s, ok := t.activeStreams[f.Header().StreamID] |
| 518 | if !ok { |
| 519 | // The stream is already done. |
| 520 | return nil, false |
| 521 | } |
| 522 | return s, true |
| 523 | } |
| 524 | |
| 525 | // adjustWindow sends out extra window update over the initial window size |
| 526 | // of stream if the application is requesting data larger in size than |
| 527 | // the window. |
| 528 | func (t *http2Server) adjustWindow(s *Stream, n uint32) { |
| 529 | if w := s.fc.maybeAdjust(n); w > 0 { |
| 530 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) |
| 531 | } |
| 532 | |
| 533 | } |
| 534 | |
| 535 | // updateWindow adjusts the inbound quota for the stream and the transport. |
| 536 | // Window updates will deliver to the controller for sending when |
| 537 | // the cumulative quota exceeds the corresponding threshold. |
| 538 | func (t *http2Server) updateWindow(s *Stream, n uint32) { |
| 539 | if w := s.fc.onRead(n); w > 0 { |
| 540 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, |
| 541 | increment: w, |
| 542 | }) |
| 543 | } |
| 544 | } |
| 545 | |
| 546 | // updateFlowControl updates the incoming flow control windows |
| 547 | // for the transport and the stream based on the current bdp |
| 548 | // estimation. |
| 549 | func (t *http2Server) updateFlowControl(n uint32) { |
| 550 | t.mu.Lock() |
| 551 | for _, s := range t.activeStreams { |
| 552 | s.fc.newLimit(n) |
| 553 | } |
| 554 | t.initialWindowSize = int32(n) |
| 555 | t.mu.Unlock() |
| 556 | t.controlBuf.put(&outgoingWindowUpdate{ |
| 557 | streamID: 0, |
| 558 | increment: t.fc.newLimit(n), |
| 559 | }) |
| 560 | t.controlBuf.put(&outgoingSettings{ |
| 561 | ss: []http2.Setting{ |
| 562 | { |
| 563 | ID: http2.SettingInitialWindowSize, |
| 564 | Val: n, |
| 565 | }, |
| 566 | }, |
| 567 | }) |
| 568 | |
| 569 | } |
| 570 | |
| 571 | func (t *http2Server) handleData(f *http2.DataFrame) { |
| 572 | size := f.Header().Length |
| 573 | var sendBDPPing bool |
| 574 | if t.bdpEst != nil { |
| 575 | sendBDPPing = t.bdpEst.add(size) |
| 576 | } |
| 577 | // Decouple connection's flow control from application's read. |
| 578 | // An update on connection's flow control should not depend on |
| 579 | // whether user application has read the data or not. Such a |
| 580 | // restriction is already imposed on the stream's flow control, |
| 581 | // and therefore the sender will be blocked anyways. |
| 582 | // Decoupling the connection flow control will prevent other |
| 583 | // active(fast) streams from starving in presence of slow or |
| 584 | // inactive streams. |
| 585 | if w := t.fc.onData(size); w > 0 { |
| 586 | t.controlBuf.put(&outgoingWindowUpdate{ |
| 587 | streamID: 0, |
| 588 | increment: w, |
| 589 | }) |
| 590 | } |
| 591 | if sendBDPPing { |
| 592 | // Avoid excessive ping detection (e.g. in an L7 proxy) |
| 593 | // by sending a window update prior to the BDP ping. |
| 594 | if w := t.fc.reset(); w > 0 { |
| 595 | t.controlBuf.put(&outgoingWindowUpdate{ |
| 596 | streamID: 0, |
| 597 | increment: w, |
| 598 | }) |
| 599 | } |
| 600 | t.controlBuf.put(bdpPing) |
| 601 | } |
| 602 | // Select the right stream to dispatch. |
| 603 | s, ok := t.getStream(f) |
| 604 | if !ok { |
| 605 | return |
| 606 | } |
| 607 | if size > 0 { |
| 608 | if err := s.fc.onData(size); err != nil { |
| 609 | t.closeStream(s, true, http2.ErrCodeFlowControl, false) |
| 610 | return |
| 611 | } |
| 612 | if f.Header().Flags.Has(http2.FlagDataPadded) { |
| 613 | if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { |
| 614 | t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) |
| 615 | } |
| 616 | } |
| 617 | // TODO(bradfitz, zhaoq): A copy is required here because there is no |
| 618 | // guarantee f.Data() is consumed before the arrival of next frame. |
| 619 | // Can this copy be eliminated? |
| 620 | if len(f.Data()) > 0 { |
| 621 | buffer := t.bufferPool.get() |
| 622 | buffer.Reset() |
| 623 | buffer.Write(f.Data()) |
| 624 | s.write(recvMsg{buffer: buffer}) |
| 625 | } |
| 626 | } |
| 627 | if f.Header().Flags.Has(http2.FlagDataEndStream) { |
| 628 | // Received the end of stream from the client. |
| 629 | s.compareAndSwapState(streamActive, streamReadDone) |
| 630 | s.write(recvMsg{err: io.EOF}) |
| 631 | } |
| 632 | } |
| 633 | |
| 634 | func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { |
| 635 | // If the stream is not deleted from the transport's active streams map, then do a regular close stream. |
| 636 | if s, ok := t.getStream(f); ok { |
| 637 | t.closeStream(s, false, 0, false) |
| 638 | return |
| 639 | } |
| 640 | // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map. |
| 641 | t.controlBuf.put(&cleanupStream{ |
| 642 | streamID: f.Header().StreamID, |
| 643 | rst: false, |
| 644 | rstCode: 0, |
| 645 | onWrite: func() {}, |
| 646 | }) |
| 647 | } |
| 648 | |
| 649 | func (t *http2Server) handleSettings(f *http2.SettingsFrame) { |
| 650 | if f.IsAck() { |
| 651 | return |
| 652 | } |
| 653 | var ss []http2.Setting |
| 654 | var updateFuncs []func() |
| 655 | f.ForeachSetting(func(s http2.Setting) error { |
| 656 | switch s.ID { |
| 657 | case http2.SettingMaxHeaderListSize: |
| 658 | updateFuncs = append(updateFuncs, func() { |
| 659 | t.maxSendHeaderListSize = new(uint32) |
| 660 | *t.maxSendHeaderListSize = s.Val |
| 661 | }) |
| 662 | default: |
| 663 | ss = append(ss, s) |
| 664 | } |
| 665 | return nil |
| 666 | }) |
| 667 | t.controlBuf.executeAndPut(func(interface{}) bool { |
| 668 | for _, f := range updateFuncs { |
| 669 | f() |
| 670 | } |
| 671 | return true |
| 672 | }, &incomingSettings{ |
| 673 | ss: ss, |
| 674 | }) |
| 675 | } |
| 676 | |
| 677 | const ( |
| 678 | maxPingStrikes = 2 |
| 679 | defaultPingTimeout = 2 * time.Hour |
| 680 | ) |
| 681 | |
| 682 | func (t *http2Server) handlePing(f *http2.PingFrame) { |
| 683 | if f.IsAck() { |
| 684 | if f.Data == goAwayPing.data && t.drainChan != nil { |
| 685 | close(t.drainChan) |
| 686 | return |
| 687 | } |
| 688 | // Maybe it's a BDP ping. |
| 689 | if t.bdpEst != nil { |
| 690 | t.bdpEst.calculate(f.Data) |
| 691 | } |
| 692 | return |
| 693 | } |
| 694 | pingAck := &ping{ack: true} |
| 695 | copy(pingAck.data[:], f.Data[:]) |
| 696 | t.controlBuf.put(pingAck) |
| 697 | |
| 698 | now := time.Now() |
| 699 | defer func() { |
| 700 | t.lastPingAt = now |
| 701 | }() |
| 702 | // A reset ping strikes means that we don't need to check for policy |
| 703 | // violation for this ping and the pingStrikes counter should be set |
| 704 | // to 0. |
| 705 | if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { |
| 706 | t.pingStrikes = 0 |
| 707 | return |
| 708 | } |
| 709 | t.mu.Lock() |
| 710 | ns := len(t.activeStreams) |
| 711 | t.mu.Unlock() |
| 712 | if ns < 1 && !t.kep.PermitWithoutStream { |
| 713 | // Keepalive shouldn't be active thus, this new ping should |
| 714 | // have come after at least defaultPingTimeout. |
| 715 | if t.lastPingAt.Add(defaultPingTimeout).After(now) { |
| 716 | t.pingStrikes++ |
| 717 | } |
| 718 | } else { |
| 719 | // Check if keepalive policy is respected. |
| 720 | if t.lastPingAt.Add(t.kep.MinTime).After(now) { |
| 721 | t.pingStrikes++ |
| 722 | } |
| 723 | } |
| 724 | |
| 725 | if t.pingStrikes > maxPingStrikes { |
| 726 | // Send goaway and close the connection. |
| 727 | errorf("transport: Got too many pings from the client, closing the connection.") |
| 728 | t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) |
| 729 | } |
| 730 | } |
| 731 | |
| 732 | func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { |
| 733 | t.controlBuf.put(&incomingWindowUpdate{ |
| 734 | streamID: f.Header().StreamID, |
| 735 | increment: f.Increment, |
| 736 | }) |
| 737 | } |
| 738 | |
| 739 | func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField { |
| 740 | for k, vv := range md { |
| 741 | if isReservedHeader(k) { |
| 742 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. |
| 743 | continue |
| 744 | } |
| 745 | for _, v := range vv { |
| 746 | headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) |
| 747 | } |
| 748 | } |
| 749 | return headerFields |
| 750 | } |
| 751 | |
| 752 | func (t *http2Server) checkForHeaderListSize(it interface{}) bool { |
| 753 | if t.maxSendHeaderListSize == nil { |
| 754 | return true |
| 755 | } |
| 756 | hdrFrame := it.(*headerFrame) |
| 757 | var sz int64 |
| 758 | for _, f := range hdrFrame.hf { |
| 759 | if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { |
| 760 | errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) |
| 761 | return false |
| 762 | } |
| 763 | } |
| 764 | return true |
| 765 | } |
| 766 | |
| 767 | // WriteHeader sends the header metadata md back to the client. |
| 768 | func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { |
| 769 | if s.updateHeaderSent() || s.getState() == streamDone { |
| 770 | return ErrIllegalHeaderWrite |
| 771 | } |
| 772 | s.hdrMu.Lock() |
| 773 | if md.Len() > 0 { |
| 774 | if s.header.Len() > 0 { |
| 775 | s.header = metadata.Join(s.header, md) |
| 776 | } else { |
| 777 | s.header = md |
| 778 | } |
| 779 | } |
| 780 | if err := t.writeHeaderLocked(s); err != nil { |
| 781 | s.hdrMu.Unlock() |
| 782 | return err |
| 783 | } |
| 784 | s.hdrMu.Unlock() |
| 785 | return nil |
| 786 | } |
| 787 | |
| 788 | func (t *http2Server) setResetPingStrikes() { |
| 789 | atomic.StoreUint32(&t.resetPingStrikes, 1) |
| 790 | } |
| 791 | |
| 792 | func (t *http2Server) writeHeaderLocked(s *Stream) error { |
| 793 | // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields |
| 794 | // first and create a slice of that exact size. |
| 795 | headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. |
| 796 | headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) |
| 797 | headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) |
| 798 | if s.sendCompress != "" { |
| 799 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) |
| 800 | } |
| 801 | headerFields = appendHeaderFieldsFromMD(headerFields, s.header) |
| 802 | success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{ |
| 803 | streamID: s.id, |
| 804 | hf: headerFields, |
| 805 | endStream: false, |
| 806 | onWrite: t.setResetPingStrikes, |
| 807 | }) |
| 808 | if !success { |
| 809 | if err != nil { |
| 810 | return err |
| 811 | } |
| 812 | t.closeStream(s, true, http2.ErrCodeInternal, false) |
| 813 | return ErrHeaderListSizeLimitViolation |
| 814 | } |
| 815 | if t.stats != nil { |
| 816 | // Note: WireLength is not set in outHeader. |
| 817 | // TODO(mmukhi): Revisit this later, if needed. |
| 818 | outHeader := &stats.OutHeader{ |
| 819 | Header: s.header.Copy(), |
| 820 | } |
| 821 | t.stats.HandleRPC(s.Context(), outHeader) |
| 822 | } |
| 823 | return nil |
| 824 | } |
| 825 | |
| 826 | // WriteStatus sends stream status to the client and terminates the stream. |
| 827 | // There is no further I/O operations being able to perform on this stream. |
| 828 | // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early |
| 829 | // OK is adopted. |
| 830 | func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { |
| 831 | if s.getState() == streamDone { |
| 832 | return nil |
| 833 | } |
| 834 | s.hdrMu.Lock() |
| 835 | // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields |
| 836 | // first and create a slice of that exact size. |
| 837 | headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. |
| 838 | if !s.updateHeaderSent() { // No headers have been sent. |
| 839 | if len(s.header) > 0 { // Send a separate header frame. |
| 840 | if err := t.writeHeaderLocked(s); err != nil { |
| 841 | s.hdrMu.Unlock() |
| 842 | return err |
| 843 | } |
| 844 | } else { // Send a trailer only response. |
| 845 | headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) |
| 846 | headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) |
| 847 | } |
| 848 | } |
| 849 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) |
| 850 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) |
| 851 | |
| 852 | if p := statusRawProto(st); p != nil && len(p.Details) > 0 { |
| 853 | stBytes, err := proto.Marshal(p) |
| 854 | if err != nil { |
| 855 | // TODO: return error instead, when callers are able to handle it. |
| 856 | grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err) |
| 857 | } else { |
| 858 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)}) |
| 859 | } |
| 860 | } |
| 861 | |
| 862 | // Attach the trailer metadata. |
| 863 | headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer) |
| 864 | trailingHeader := &headerFrame{ |
| 865 | streamID: s.id, |
| 866 | hf: headerFields, |
| 867 | endStream: true, |
| 868 | onWrite: t.setResetPingStrikes, |
| 869 | } |
| 870 | s.hdrMu.Unlock() |
| 871 | success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader) |
| 872 | if !success { |
| 873 | if err != nil { |
| 874 | return err |
| 875 | } |
| 876 | t.closeStream(s, true, http2.ErrCodeInternal, false) |
| 877 | return ErrHeaderListSizeLimitViolation |
| 878 | } |
| 879 | // Send a RST_STREAM after the trailers if the client has not already half-closed. |
| 880 | rst := s.getState() == streamActive |
| 881 | t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true) |
| 882 | if t.stats != nil { |
| 883 | t.stats.HandleRPC(s.Context(), &stats.OutTrailer{ |
| 884 | Trailer: s.trailer.Copy(), |
| 885 | }) |
| 886 | } |
| 887 | return nil |
| 888 | } |
| 889 | |
| 890 | // Write converts the data into HTTP2 data frame and sends it out. Non-nil error |
| 891 | // is returns if it fails (e.g., framing error, transport error). |
| 892 | func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { |
| 893 | if !s.isHeaderSent() { // Headers haven't been written yet. |
| 894 | if err := t.WriteHeader(s, nil); err != nil { |
| 895 | if _, ok := err.(ConnectionError); ok { |
| 896 | return err |
| 897 | } |
| 898 | // TODO(mmukhi, dfawley): Make sure this is the right code to return. |
| 899 | return status.Errorf(codes.Internal, "transport: %v", err) |
| 900 | } |
| 901 | } else { |
| 902 | // Writing headers checks for this condition. |
| 903 | if s.getState() == streamDone { |
| 904 | // TODO(mmukhi, dfawley): Should the server write also return io.EOF? |
| 905 | s.cancel() |
| 906 | select { |
| 907 | case <-t.done: |
| 908 | return ErrConnClosing |
| 909 | default: |
| 910 | } |
| 911 | return ContextErr(s.ctx.Err()) |
| 912 | } |
| 913 | } |
| 914 | // Add some data to header frame so that we can equally distribute bytes across frames. |
| 915 | emptyLen := http2MaxFrameLen - len(hdr) |
| 916 | if emptyLen > len(data) { |
| 917 | emptyLen = len(data) |
| 918 | } |
| 919 | hdr = append(hdr, data[:emptyLen]...) |
| 920 | data = data[emptyLen:] |
| 921 | df := &dataFrame{ |
| 922 | streamID: s.id, |
| 923 | h: hdr, |
| 924 | d: data, |
| 925 | onEachWrite: t.setResetPingStrikes, |
| 926 | } |
| 927 | if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { |
| 928 | select { |
| 929 | case <-t.done: |
| 930 | return ErrConnClosing |
| 931 | default: |
| 932 | } |
| 933 | return ContextErr(s.ctx.Err()) |
| 934 | } |
| 935 | return t.controlBuf.put(df) |
| 936 | } |
| 937 | |
| 938 | // keepalive running in a separate goroutine does the following: |
| 939 | // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. |
| 940 | // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. |
| 941 | // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. |
| 942 | // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection |
| 943 | // after an additional duration of keepalive.Timeout. |
| 944 | func (t *http2Server) keepalive() { |
| 945 | p := &ping{} |
| 946 | // True iff a ping has been sent, and no data has been received since then. |
| 947 | outstandingPing := false |
| 948 | // Amount of time remaining before which we should receive an ACK for the |
| 949 | // last sent ping. |
| 950 | kpTimeoutLeft := time.Duration(0) |
| 951 | // Records the last value of t.lastRead before we go block on the timer. |
| 952 | // This is required to check for read activity since then. |
| 953 | prevNano := time.Now().UnixNano() |
| 954 | // Initialize the different timers to their default values. |
| 955 | idleTimer := time.NewTimer(t.kp.MaxConnectionIdle) |
| 956 | ageTimer := time.NewTimer(t.kp.MaxConnectionAge) |
| 957 | kpTimer := time.NewTimer(t.kp.Time) |
| 958 | defer func() { |
| 959 | // We need to drain the underlying channel in these timers after a call |
| 960 | // to Stop(), only if we are interested in resetting them. Clearly we |
| 961 | // are not interested in resetting them here. |
| 962 | idleTimer.Stop() |
| 963 | ageTimer.Stop() |
| 964 | kpTimer.Stop() |
| 965 | }() |
| 966 | |
| 967 | for { |
| 968 | select { |
| 969 | case <-idleTimer.C: |
| 970 | t.mu.Lock() |
| 971 | idle := t.idle |
| 972 | if idle.IsZero() { // The connection is non-idle. |
| 973 | t.mu.Unlock() |
| 974 | idleTimer.Reset(t.kp.MaxConnectionIdle) |
| 975 | continue |
| 976 | } |
| 977 | val := t.kp.MaxConnectionIdle - time.Since(idle) |
| 978 | t.mu.Unlock() |
| 979 | if val <= 0 { |
| 980 | // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. |
| 981 | // Gracefully close the connection. |
| 982 | t.drain(http2.ErrCodeNo, []byte{}) |
| 983 | return |
| 984 | } |
| 985 | idleTimer.Reset(val) |
| 986 | case <-ageTimer.C: |
| 987 | t.drain(http2.ErrCodeNo, []byte{}) |
| 988 | ageTimer.Reset(t.kp.MaxConnectionAgeGrace) |
| 989 | select { |
| 990 | case <-ageTimer.C: |
| 991 | // Close the connection after grace period. |
| 992 | infof("transport: closing server transport due to maximum connection age.") |
| 993 | t.Close() |
| 994 | case <-t.done: |
| 995 | } |
| 996 | return |
| 997 | case <-kpTimer.C: |
| 998 | lastRead := atomic.LoadInt64(&t.lastRead) |
| 999 | if lastRead > prevNano { |
| 1000 | // There has been read activity since the last time we were |
| 1001 | // here. Setup the timer to fire at kp.Time seconds from |
| 1002 | // lastRead time and continue. |
| 1003 | outstandingPing = false |
| 1004 | kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano())) |
| 1005 | prevNano = lastRead |
| 1006 | continue |
| 1007 | } |
| 1008 | if outstandingPing && kpTimeoutLeft <= 0 { |
| 1009 | infof("transport: closing server transport due to idleness.") |
| 1010 | t.Close() |
| 1011 | return |
| 1012 | } |
| 1013 | if !outstandingPing { |
| 1014 | if channelz.IsOn() { |
| 1015 | atomic.AddInt64(&t.czData.kpCount, 1) |
| 1016 | } |
| 1017 | t.controlBuf.put(p) |
| 1018 | kpTimeoutLeft = t.kp.Timeout |
| 1019 | outstandingPing = true |
| 1020 | } |
| 1021 | // The amount of time to sleep here is the minimum of kp.Time and |
| 1022 | // timeoutLeft. This will ensure that we wait only for kp.Time |
| 1023 | // before sending out the next ping (for cases where the ping is |
| 1024 | // acked). |
| 1025 | sleepDuration := minTime(t.kp.Time, kpTimeoutLeft) |
| 1026 | kpTimeoutLeft -= sleepDuration |
| 1027 | kpTimer.Reset(sleepDuration) |
| 1028 | case <-t.done: |
| 1029 | return |
| 1030 | } |
| 1031 | } |
| 1032 | } |
| 1033 | |
| 1034 | // Close starts shutting down the http2Server transport. |
| 1035 | // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This |
| 1036 | // could cause some resource issue. Revisit this later. |
| 1037 | func (t *http2Server) Close() error { |
| 1038 | t.mu.Lock() |
| 1039 | if t.state == closing { |
| 1040 | t.mu.Unlock() |
| 1041 | return errors.New("transport: Close() was already called") |
| 1042 | } |
| 1043 | t.state = closing |
| 1044 | streams := t.activeStreams |
| 1045 | t.activeStreams = nil |
| 1046 | t.mu.Unlock() |
| 1047 | t.controlBuf.finish() |
| 1048 | close(t.done) |
| 1049 | err := t.conn.Close() |
| 1050 | if channelz.IsOn() { |
| 1051 | channelz.RemoveEntry(t.channelzID) |
| 1052 | } |
| 1053 | // Cancel all active streams. |
| 1054 | for _, s := range streams { |
| 1055 | s.cancel() |
| 1056 | } |
| 1057 | if t.stats != nil { |
| 1058 | connEnd := &stats.ConnEnd{} |
| 1059 | t.stats.HandleConn(t.ctx, connEnd) |
| 1060 | } |
| 1061 | return err |
| 1062 | } |
| 1063 | |
| 1064 | // deleteStream deletes the stream s from transport's active streams. |
| 1065 | func (t *http2Server) deleteStream(s *Stream, eosReceived bool) { |
| 1066 | // In case stream sending and receiving are invoked in separate |
| 1067 | // goroutines (e.g., bi-directional streaming), cancel needs to be |
| 1068 | // called to interrupt the potential blocking on other goroutines. |
| 1069 | s.cancel() |
| 1070 | |
| 1071 | t.mu.Lock() |
| 1072 | if _, ok := t.activeStreams[s.id]; ok { |
| 1073 | delete(t.activeStreams, s.id) |
| 1074 | if len(t.activeStreams) == 0 { |
| 1075 | t.idle = time.Now() |
| 1076 | } |
| 1077 | } |
| 1078 | t.mu.Unlock() |
| 1079 | |
| 1080 | if channelz.IsOn() { |
| 1081 | if eosReceived { |
| 1082 | atomic.AddInt64(&t.czData.streamsSucceeded, 1) |
| 1083 | } else { |
| 1084 | atomic.AddInt64(&t.czData.streamsFailed, 1) |
| 1085 | } |
| 1086 | } |
| 1087 | } |
| 1088 | |
| 1089 | // finishStream closes the stream and puts the trailing headerFrame into controlbuf. |
| 1090 | func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) { |
| 1091 | oldState := s.swapState(streamDone) |
| 1092 | if oldState == streamDone { |
| 1093 | // If the stream was already done, return. |
| 1094 | return |
| 1095 | } |
| 1096 | |
| 1097 | hdr.cleanup = &cleanupStream{ |
| 1098 | streamID: s.id, |
| 1099 | rst: rst, |
| 1100 | rstCode: rstCode, |
| 1101 | onWrite: func() { |
| 1102 | t.deleteStream(s, eosReceived) |
| 1103 | }, |
| 1104 | } |
| 1105 | t.controlBuf.put(hdr) |
| 1106 | } |
| 1107 | |
| 1108 | // closeStream clears the footprint of a stream when the stream is not needed any more. |
| 1109 | func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) { |
| 1110 | s.swapState(streamDone) |
| 1111 | t.deleteStream(s, eosReceived) |
| 1112 | |
| 1113 | t.controlBuf.put(&cleanupStream{ |
| 1114 | streamID: s.id, |
| 1115 | rst: rst, |
| 1116 | rstCode: rstCode, |
| 1117 | onWrite: func() {}, |
| 1118 | }) |
| 1119 | } |
| 1120 | |
| 1121 | func (t *http2Server) RemoteAddr() net.Addr { |
| 1122 | return t.remoteAddr |
| 1123 | } |
| 1124 | |
| 1125 | func (t *http2Server) Drain() { |
| 1126 | t.drain(http2.ErrCodeNo, []byte{}) |
| 1127 | } |
| 1128 | |
| 1129 | func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { |
| 1130 | t.mu.Lock() |
| 1131 | defer t.mu.Unlock() |
| 1132 | if t.drainChan != nil { |
| 1133 | return |
| 1134 | } |
| 1135 | t.drainChan = make(chan struct{}) |
| 1136 | t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) |
| 1137 | } |
| 1138 | |
| 1139 | var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} |
| 1140 | |
| 1141 | // Handles outgoing GoAway and returns true if loopy needs to put itself |
| 1142 | // in draining mode. |
| 1143 | func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { |
| 1144 | t.mu.Lock() |
| 1145 | if t.state == closing { // TODO(mmukhi): This seems unnecessary. |
| 1146 | t.mu.Unlock() |
| 1147 | // The transport is closing. |
| 1148 | return false, ErrConnClosing |
| 1149 | } |
| 1150 | sid := t.maxStreamID |
| 1151 | if !g.headsUp { |
| 1152 | // Stop accepting more streams now. |
| 1153 | t.state = draining |
| 1154 | if len(t.activeStreams) == 0 { |
| 1155 | g.closeConn = true |
| 1156 | } |
| 1157 | t.mu.Unlock() |
| 1158 | if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil { |
| 1159 | return false, err |
| 1160 | } |
| 1161 | if g.closeConn { |
| 1162 | // Abruptly close the connection following the GoAway (via |
| 1163 | // loopywriter). But flush out what's inside the buffer first. |
| 1164 | t.framer.writer.Flush() |
| 1165 | return false, fmt.Errorf("transport: Connection closing") |
| 1166 | } |
| 1167 | return true, nil |
| 1168 | } |
| 1169 | t.mu.Unlock() |
| 1170 | // For a graceful close, send out a GoAway with stream ID of MaxUInt32, |
| 1171 | // Follow that with a ping and wait for the ack to come back or a timer |
| 1172 | // to expire. During this time accept new streams since they might have |
| 1173 | // originated before the GoAway reaches the client. |
| 1174 | // After getting the ack or timer expiration send out another GoAway this |
| 1175 | // time with an ID of the max stream server intends to process. |
| 1176 | if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil { |
| 1177 | return false, err |
| 1178 | } |
| 1179 | if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil { |
| 1180 | return false, err |
| 1181 | } |
| 1182 | go func() { |
| 1183 | timer := time.NewTimer(time.Minute) |
| 1184 | defer timer.Stop() |
| 1185 | select { |
| 1186 | case <-t.drainChan: |
| 1187 | case <-timer.C: |
| 1188 | case <-t.done: |
| 1189 | return |
| 1190 | } |
| 1191 | t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData}) |
| 1192 | }() |
| 1193 | return false, nil |
| 1194 | } |
| 1195 | |
| 1196 | func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric { |
| 1197 | s := channelz.SocketInternalMetric{ |
| 1198 | StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted), |
| 1199 | StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded), |
| 1200 | StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed), |
| 1201 | MessagesSent: atomic.LoadInt64(&t.czData.msgSent), |
| 1202 | MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv), |
| 1203 | KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount), |
| 1204 | LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), |
| 1205 | LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), |
| 1206 | LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), |
| 1207 | LocalFlowControlWindow: int64(t.fc.getSize()), |
| 1208 | SocketOptions: channelz.GetSocketOption(t.conn), |
| 1209 | LocalAddr: t.localAddr, |
| 1210 | RemoteAddr: t.remoteAddr, |
| 1211 | // RemoteName : |
| 1212 | } |
| 1213 | if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { |
| 1214 | s.Security = au.GetSecurityValue() |
| 1215 | } |
| 1216 | s.RemoteFlowControlWindow = t.getOutFlowWindow() |
| 1217 | return &s |
| 1218 | } |
| 1219 | |
| 1220 | func (t *http2Server) IncrMsgSent() { |
| 1221 | atomic.AddInt64(&t.czData.msgSent, 1) |
| 1222 | atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) |
| 1223 | } |
| 1224 | |
| 1225 | func (t *http2Server) IncrMsgRecv() { |
| 1226 | atomic.AddInt64(&t.czData.msgRecv, 1) |
| 1227 | atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) |
| 1228 | } |
| 1229 | |
| 1230 | func (t *http2Server) getOutFlowWindow() int64 { |
| 1231 | resp := make(chan uint32, 1) |
| 1232 | timer := time.NewTimer(time.Second) |
| 1233 | defer timer.Stop() |
| 1234 | t.controlBuf.put(&outFlowControlSizeRequest{resp}) |
| 1235 | select { |
| 1236 | case sz := <-resp: |
| 1237 | return int64(sz) |
| 1238 | case <-t.done: |
| 1239 | return -1 |
| 1240 | case <-timer.C: |
| 1241 | return -2 |
| 1242 | } |
| 1243 | } |
| 1244 | |
| 1245 | func getJitter(v time.Duration) time.Duration { |
| 1246 | if v == infinity { |
| 1247 | return 0 |
| 1248 | } |
| 1249 | // Generate a jitter between +/- 10% of the value. |
| 1250 | r := int64(v / 10) |
| 1251 | j := grpcrand.Int63n(2*r) - r |
| 1252 | return time.Duration(j) |
| 1253 | } |