khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2014 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | package transport |
| 20 | |
| 21 | import ( |
| 22 | "bytes" |
| 23 | "context" |
| 24 | "errors" |
| 25 | "fmt" |
| 26 | "io" |
| 27 | "math" |
| 28 | "net" |
| 29 | "net/http" |
| 30 | "strconv" |
| 31 | "sync" |
| 32 | "sync/atomic" |
| 33 | "time" |
| 34 | |
| 35 | "github.com/golang/protobuf/proto" |
| 36 | "golang.org/x/net/http2" |
| 37 | "golang.org/x/net/http2/hpack" |
| 38 | "google.golang.org/grpc/internal/grpcutil" |
| 39 | |
| 40 | "google.golang.org/grpc/codes" |
| 41 | "google.golang.org/grpc/credentials" |
| 42 | "google.golang.org/grpc/internal/channelz" |
| 43 | "google.golang.org/grpc/internal/grpcrand" |
| 44 | "google.golang.org/grpc/keepalive" |
| 45 | "google.golang.org/grpc/metadata" |
| 46 | "google.golang.org/grpc/peer" |
| 47 | "google.golang.org/grpc/stats" |
| 48 | "google.golang.org/grpc/status" |
| 49 | "google.golang.org/grpc/tap" |
| 50 | ) |
| 51 | |
| 52 | var ( |
| 53 | // ErrIllegalHeaderWrite indicates that setting header is illegal because of |
| 54 | // the stream's state. |
| 55 | ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") |
| 56 | // ErrHeaderListSizeLimitViolation indicates that the header list size is larger |
| 57 | // than the limit set by peer. |
| 58 | ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer") |
| 59 | ) |
| 60 | |
| 61 | // serverConnectionCounter counts the number of connections a server has seen |
| 62 | // (equal to the number of http2Servers created). Must be accessed atomically. |
| 63 | var serverConnectionCounter uint64 |
| 64 | |
| 65 | // http2Server implements the ServerTransport interface with HTTP2. |
| 66 | type http2Server struct { |
| 67 | lastRead int64 // Keep this field 64-bit aligned. Accessed atomically. |
| 68 | ctx context.Context |
| 69 | done chan struct{} |
| 70 | conn net.Conn |
| 71 | loopy *loopyWriter |
| 72 | readerDone chan struct{} // sync point to enable testing. |
| 73 | writerDone chan struct{} // sync point to enable testing. |
| 74 | remoteAddr net.Addr |
| 75 | localAddr net.Addr |
| 76 | maxStreamID uint32 // max stream ID ever seen |
| 77 | authInfo credentials.AuthInfo // auth info about the connection |
| 78 | inTapHandle tap.ServerInHandle |
| 79 | framer *framer |
| 80 | // The max number of concurrent streams. |
| 81 | maxStreams uint32 |
| 82 | // controlBuf delivers all the control related tasks (e.g., window |
| 83 | // updates, reset streams, and various settings) to the controller. |
| 84 | controlBuf *controlBuffer |
| 85 | fc *trInFlow |
| 86 | stats stats.Handler |
| 87 | // Keepalive and max-age parameters for the server. |
| 88 | kp keepalive.ServerParameters |
| 89 | // Keepalive enforcement policy. |
| 90 | kep keepalive.EnforcementPolicy |
| 91 | // The time instance last ping was received. |
| 92 | lastPingAt time.Time |
| 93 | // Number of times the client has violated keepalive ping policy so far. |
| 94 | pingStrikes uint8 |
| 95 | // Flag to signify that number of ping strikes should be reset to 0. |
| 96 | // This is set whenever data or header frames are sent. |
| 97 | // 1 means yes. |
| 98 | resetPingStrikes uint32 // Accessed atomically. |
| 99 | initialWindowSize int32 |
| 100 | bdpEst *bdpEstimator |
| 101 | maxSendHeaderListSize *uint32 |
| 102 | |
| 103 | mu sync.Mutex // guard the following |
| 104 | |
| 105 | // drainChan is initialized when Drain() is called the first time. |
| 106 | // After which the server writes out the first GoAway(with ID 2^31-1) frame. |
| 107 | // Then an independent goroutine will be launched to later send the second GoAway. |
| 108 | // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. |
| 109 | // Thus call to Drain() will be a no-op if drainChan is already initialized since draining is |
| 110 | // already underway. |
| 111 | drainChan chan struct{} |
| 112 | state transportState |
| 113 | activeStreams map[uint32]*Stream |
| 114 | // idle is the time instant when the connection went idle. |
| 115 | // This is either the beginning of the connection or when the number of |
| 116 | // RPCs go down to 0. |
| 117 | // When the connection is busy, this value is set to 0. |
| 118 | idle time.Time |
| 119 | |
| 120 | // Fields below are for channelz metric collection. |
| 121 | channelzID int64 // channelz unique identification number |
| 122 | czData *channelzData |
| 123 | bufferPool *bufferPool |
| 124 | |
| 125 | connectionID uint64 |
| 126 | } |
| 127 | |
| 128 | // NewServerTransport creates a http2 transport with conn and configuration |
| 129 | // options from config. |
| 130 | // |
| 131 | // It returns a non-nil transport and a nil error on success. On failure, it |
khenaidoo | 5cb0d40 | 2021-12-08 14:09:16 -0500 | [diff] [blame] | 132 | // returns a nil transport and a non-nil error. For a special case where the |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 133 | // underlying conn gets closed before the client preface could be read, it |
| 134 | // returns a nil transport and a nil error. |
| 135 | func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { |
| 136 | var authInfo credentials.AuthInfo |
| 137 | rawConn := conn |
| 138 | if config.Credentials != nil { |
| 139 | var err error |
| 140 | conn, authInfo, err = config.Credentials.ServerHandshake(rawConn) |
| 141 | if err != nil { |
| 142 | // ErrConnDispatched means that the connection was dispatched away |
| 143 | // from gRPC; those connections should be left open. io.EOF means |
| 144 | // the connection was closed before handshaking completed, which can |
| 145 | // happen naturally from probers. Return these errors directly. |
| 146 | if err == credentials.ErrConnDispatched || err == io.EOF { |
| 147 | return nil, err |
| 148 | } |
| 149 | return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) |
| 150 | } |
| 151 | } |
| 152 | writeBufSize := config.WriteBufferSize |
| 153 | readBufSize := config.ReadBufferSize |
| 154 | maxHeaderListSize := defaultServerMaxHeaderListSize |
| 155 | if config.MaxHeaderListSize != nil { |
| 156 | maxHeaderListSize = *config.MaxHeaderListSize |
| 157 | } |
| 158 | framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize) |
| 159 | // Send initial settings as connection preface to client. |
| 160 | isettings := []http2.Setting{{ |
| 161 | ID: http2.SettingMaxFrameSize, |
| 162 | Val: http2MaxFrameLen, |
| 163 | }} |
| 164 | // TODO(zhaoq): Have a better way to signal "no limit" because 0 is |
| 165 | // permitted in the HTTP2 spec. |
| 166 | maxStreams := config.MaxStreams |
| 167 | if maxStreams == 0 { |
| 168 | maxStreams = math.MaxUint32 |
| 169 | } else { |
| 170 | isettings = append(isettings, http2.Setting{ |
| 171 | ID: http2.SettingMaxConcurrentStreams, |
| 172 | Val: maxStreams, |
| 173 | }) |
| 174 | } |
| 175 | dynamicWindow := true |
| 176 | iwz := int32(initialWindowSize) |
| 177 | if config.InitialWindowSize >= defaultWindowSize { |
| 178 | iwz = config.InitialWindowSize |
| 179 | dynamicWindow = false |
| 180 | } |
| 181 | icwz := int32(initialWindowSize) |
| 182 | if config.InitialConnWindowSize >= defaultWindowSize { |
| 183 | icwz = config.InitialConnWindowSize |
| 184 | dynamicWindow = false |
| 185 | } |
| 186 | if iwz != defaultWindowSize { |
| 187 | isettings = append(isettings, http2.Setting{ |
| 188 | ID: http2.SettingInitialWindowSize, |
| 189 | Val: uint32(iwz)}) |
| 190 | } |
| 191 | if config.MaxHeaderListSize != nil { |
| 192 | isettings = append(isettings, http2.Setting{ |
| 193 | ID: http2.SettingMaxHeaderListSize, |
| 194 | Val: *config.MaxHeaderListSize, |
| 195 | }) |
| 196 | } |
| 197 | if config.HeaderTableSize != nil { |
| 198 | isettings = append(isettings, http2.Setting{ |
| 199 | ID: http2.SettingHeaderTableSize, |
| 200 | Val: *config.HeaderTableSize, |
| 201 | }) |
| 202 | } |
| 203 | if err := framer.fr.WriteSettings(isettings...); err != nil { |
| 204 | return nil, connectionErrorf(false, err, "transport: %v", err) |
| 205 | } |
| 206 | // Adjust the connection flow control window if needed. |
| 207 | if delta := uint32(icwz - defaultWindowSize); delta > 0 { |
| 208 | if err := framer.fr.WriteWindowUpdate(0, delta); err != nil { |
| 209 | return nil, connectionErrorf(false, err, "transport: %v", err) |
| 210 | } |
| 211 | } |
| 212 | kp := config.KeepaliveParams |
| 213 | if kp.MaxConnectionIdle == 0 { |
| 214 | kp.MaxConnectionIdle = defaultMaxConnectionIdle |
| 215 | } |
| 216 | if kp.MaxConnectionAge == 0 { |
| 217 | kp.MaxConnectionAge = defaultMaxConnectionAge |
| 218 | } |
| 219 | // Add a jitter to MaxConnectionAge. |
| 220 | kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) |
| 221 | if kp.MaxConnectionAgeGrace == 0 { |
| 222 | kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace |
| 223 | } |
| 224 | if kp.Time == 0 { |
| 225 | kp.Time = defaultServerKeepaliveTime |
| 226 | } |
| 227 | if kp.Timeout == 0 { |
| 228 | kp.Timeout = defaultServerKeepaliveTimeout |
| 229 | } |
| 230 | kep := config.KeepalivePolicy |
| 231 | if kep.MinTime == 0 { |
| 232 | kep.MinTime = defaultKeepalivePolicyMinTime |
| 233 | } |
| 234 | |
| 235 | done := make(chan struct{}) |
| 236 | t := &http2Server{ |
| 237 | ctx: setConnection(context.Background(), rawConn), |
| 238 | done: done, |
| 239 | conn: conn, |
| 240 | remoteAddr: conn.RemoteAddr(), |
| 241 | localAddr: conn.LocalAddr(), |
| 242 | authInfo: authInfo, |
| 243 | framer: framer, |
| 244 | readerDone: make(chan struct{}), |
| 245 | writerDone: make(chan struct{}), |
| 246 | maxStreams: maxStreams, |
| 247 | inTapHandle: config.InTapHandle, |
| 248 | fc: &trInFlow{limit: uint32(icwz)}, |
| 249 | state: reachable, |
| 250 | activeStreams: make(map[uint32]*Stream), |
| 251 | stats: config.StatsHandler, |
| 252 | kp: kp, |
| 253 | idle: time.Now(), |
| 254 | kep: kep, |
| 255 | initialWindowSize: iwz, |
| 256 | czData: new(channelzData), |
| 257 | bufferPool: newBufferPool(), |
| 258 | } |
| 259 | t.controlBuf = newControlBuffer(t.done) |
| 260 | if dynamicWindow { |
| 261 | t.bdpEst = &bdpEstimator{ |
| 262 | bdp: initialWindowSize, |
| 263 | updateFlowControl: t.updateFlowControl, |
| 264 | } |
| 265 | } |
| 266 | if t.stats != nil { |
| 267 | t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ |
| 268 | RemoteAddr: t.remoteAddr, |
| 269 | LocalAddr: t.localAddr, |
| 270 | }) |
| 271 | connBegin := &stats.ConnBegin{} |
| 272 | t.stats.HandleConn(t.ctx, connBegin) |
| 273 | } |
| 274 | if channelz.IsOn() { |
| 275 | t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) |
| 276 | } |
| 277 | |
| 278 | t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1) |
| 279 | |
| 280 | t.framer.writer.Flush() |
| 281 | |
| 282 | defer func() { |
| 283 | if err != nil { |
| 284 | t.Close() |
| 285 | } |
| 286 | }() |
| 287 | |
| 288 | // Check the validity of client preface. |
| 289 | preface := make([]byte, len(clientPreface)) |
| 290 | if _, err := io.ReadFull(t.conn, preface); err != nil { |
| 291 | // In deployments where a gRPC server runs behind a cloud load balancer |
| 292 | // which performs regular TCP level health checks, the connection is |
khenaidoo | 5cb0d40 | 2021-12-08 14:09:16 -0500 | [diff] [blame] | 293 | // closed immediately by the latter. Returning io.EOF here allows the |
| 294 | // grpc server implementation to recognize this scenario and suppress |
| 295 | // logging to reduce spam. |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 296 | if err == io.EOF { |
khenaidoo | 5cb0d40 | 2021-12-08 14:09:16 -0500 | [diff] [blame] | 297 | return nil, io.EOF |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 298 | } |
| 299 | return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) |
| 300 | } |
| 301 | if !bytes.Equal(preface, clientPreface) { |
| 302 | return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) |
| 303 | } |
| 304 | |
| 305 | frame, err := t.framer.fr.ReadFrame() |
| 306 | if err == io.EOF || err == io.ErrUnexpectedEOF { |
| 307 | return nil, err |
| 308 | } |
| 309 | if err != nil { |
| 310 | return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err) |
| 311 | } |
| 312 | atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) |
| 313 | sf, ok := frame.(*http2.SettingsFrame) |
| 314 | if !ok { |
| 315 | return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) |
| 316 | } |
| 317 | t.handleSettings(sf) |
| 318 | |
| 319 | go func() { |
| 320 | t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst) |
| 321 | t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler |
| 322 | if err := t.loopy.run(); err != nil { |
| 323 | if logger.V(logLevel) { |
| 324 | logger.Errorf("transport: loopyWriter.run returning. Err: %v", err) |
| 325 | } |
| 326 | } |
| 327 | t.conn.Close() |
| 328 | t.controlBuf.finish() |
| 329 | close(t.writerDone) |
| 330 | }() |
| 331 | go t.keepalive() |
| 332 | return t, nil |
| 333 | } |
| 334 | |
| 335 | // operateHeader takes action on the decoded headers. |
| 336 | func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { |
| 337 | streamID := frame.Header().StreamID |
| 338 | |
| 339 | // frame.Truncated is set to true when framer detects that the current header |
| 340 | // list size hits MaxHeaderListSize limit. |
| 341 | if frame.Truncated { |
| 342 | t.controlBuf.put(&cleanupStream{ |
| 343 | streamID: streamID, |
| 344 | rst: true, |
| 345 | rstCode: http2.ErrCodeFrameSize, |
| 346 | onWrite: func() {}, |
| 347 | }) |
| 348 | return false |
| 349 | } |
| 350 | |
| 351 | buf := newRecvBuffer() |
| 352 | s := &Stream{ |
| 353 | id: streamID, |
| 354 | st: t, |
| 355 | buf: buf, |
| 356 | fc: &inFlow{limit: uint32(t.initialWindowSize)}, |
| 357 | } |
| 358 | |
| 359 | var ( |
| 360 | // If a gRPC Response-Headers has already been received, then it means |
| 361 | // that the peer is speaking gRPC and we are in gRPC mode. |
| 362 | isGRPC = false |
| 363 | mdata = make(map[string][]string) |
| 364 | httpMethod string |
| 365 | // headerError is set if an error is encountered while parsing the headers |
| 366 | headerError bool |
| 367 | |
| 368 | timeoutSet bool |
| 369 | timeout time.Duration |
| 370 | ) |
| 371 | |
| 372 | for _, hf := range frame.Fields { |
| 373 | switch hf.Name { |
| 374 | case "content-type": |
| 375 | contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value) |
| 376 | if !validContentType { |
| 377 | break |
| 378 | } |
| 379 | mdata[hf.Name] = append(mdata[hf.Name], hf.Value) |
| 380 | s.contentSubtype = contentSubtype |
| 381 | isGRPC = true |
| 382 | case "grpc-encoding": |
| 383 | s.recvCompress = hf.Value |
| 384 | case ":method": |
| 385 | httpMethod = hf.Value |
| 386 | case ":path": |
| 387 | s.method = hf.Value |
| 388 | case "grpc-timeout": |
| 389 | timeoutSet = true |
| 390 | var err error |
| 391 | if timeout, err = decodeTimeout(hf.Value); err != nil { |
| 392 | headerError = true |
| 393 | } |
khenaidoo | 5cb0d40 | 2021-12-08 14:09:16 -0500 | [diff] [blame] | 394 | // "Transports must consider requests containing the Connection header |
| 395 | // as malformed." - A41 |
| 396 | case "connection": |
| 397 | if logger.V(logLevel) { |
| 398 | logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec") |
| 399 | } |
| 400 | headerError = true |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 401 | default: |
| 402 | if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) { |
| 403 | break |
| 404 | } |
| 405 | v, err := decodeMetadataHeader(hf.Name, hf.Value) |
| 406 | if err != nil { |
| 407 | headerError = true |
| 408 | logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) |
| 409 | break |
| 410 | } |
| 411 | mdata[hf.Name] = append(mdata[hf.Name], v) |
| 412 | } |
| 413 | } |
| 414 | |
khenaidoo | 5cb0d40 | 2021-12-08 14:09:16 -0500 | [diff] [blame] | 415 | // "If multiple Host headers or multiple :authority headers are present, the |
| 416 | // request must be rejected with an HTTP status code 400 as required by Host |
| 417 | // validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM |
| 418 | // with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2 |
| 419 | // error, this takes precedence over a client not speaking gRPC. |
| 420 | if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 { |
| 421 | errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"])) |
| 422 | if logger.V(logLevel) { |
| 423 | logger.Errorf("transport: %v", errMsg) |
| 424 | } |
| 425 | t.controlBuf.put(&earlyAbortStream{ |
| 426 | httpStatus: 400, |
| 427 | streamID: streamID, |
| 428 | contentSubtype: s.contentSubtype, |
| 429 | status: status.New(codes.Internal, errMsg), |
| 430 | }) |
| 431 | return false |
| 432 | } |
| 433 | |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 434 | if !isGRPC || headerError { |
| 435 | t.controlBuf.put(&cleanupStream{ |
| 436 | streamID: streamID, |
| 437 | rst: true, |
| 438 | rstCode: http2.ErrCodeProtocol, |
| 439 | onWrite: func() {}, |
| 440 | }) |
| 441 | return false |
| 442 | } |
| 443 | |
khenaidoo | 5cb0d40 | 2021-12-08 14:09:16 -0500 | [diff] [blame] | 444 | // "If :authority is missing, Host must be renamed to :authority." - A41 |
| 445 | if len(mdata[":authority"]) == 0 { |
| 446 | // No-op if host isn't present, no eventual :authority header is a valid |
| 447 | // RPC. |
| 448 | if host, ok := mdata["host"]; ok { |
| 449 | mdata[":authority"] = host |
| 450 | delete(mdata, "host") |
| 451 | } |
| 452 | } else { |
| 453 | // "If :authority is present, Host must be discarded" - A41 |
| 454 | delete(mdata, "host") |
| 455 | } |
| 456 | |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 457 | if frame.StreamEnded() { |
| 458 | // s is just created by the caller. No lock needed. |
| 459 | s.state = streamReadDone |
| 460 | } |
| 461 | if timeoutSet { |
| 462 | s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout) |
| 463 | } else { |
| 464 | s.ctx, s.cancel = context.WithCancel(t.ctx) |
| 465 | } |
| 466 | pr := &peer.Peer{ |
| 467 | Addr: t.remoteAddr, |
| 468 | } |
| 469 | // Attach Auth info if there is any. |
| 470 | if t.authInfo != nil { |
| 471 | pr.AuthInfo = t.authInfo |
| 472 | } |
| 473 | s.ctx = peer.NewContext(s.ctx, pr) |
| 474 | // Attach the received metadata to the context. |
| 475 | if len(mdata) > 0 { |
| 476 | s.ctx = metadata.NewIncomingContext(s.ctx, mdata) |
| 477 | if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 { |
| 478 | s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1])) |
| 479 | } |
| 480 | if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 { |
| 481 | s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1])) |
| 482 | } |
| 483 | } |
| 484 | t.mu.Lock() |
| 485 | if t.state != reachable { |
| 486 | t.mu.Unlock() |
| 487 | s.cancel() |
| 488 | return false |
| 489 | } |
| 490 | if uint32(len(t.activeStreams)) >= t.maxStreams { |
| 491 | t.mu.Unlock() |
| 492 | t.controlBuf.put(&cleanupStream{ |
| 493 | streamID: streamID, |
| 494 | rst: true, |
| 495 | rstCode: http2.ErrCodeRefusedStream, |
| 496 | onWrite: func() {}, |
| 497 | }) |
| 498 | s.cancel() |
| 499 | return false |
| 500 | } |
| 501 | if streamID%2 != 1 || streamID <= t.maxStreamID { |
| 502 | t.mu.Unlock() |
| 503 | // illegal gRPC stream id. |
| 504 | if logger.V(logLevel) { |
| 505 | logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) |
| 506 | } |
| 507 | s.cancel() |
| 508 | return true |
| 509 | } |
| 510 | t.maxStreamID = streamID |
| 511 | if httpMethod != http.MethodPost { |
| 512 | t.mu.Unlock() |
| 513 | if logger.V(logLevel) { |
| 514 | logger.Infof("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod) |
| 515 | } |
| 516 | t.controlBuf.put(&cleanupStream{ |
| 517 | streamID: streamID, |
| 518 | rst: true, |
| 519 | rstCode: http2.ErrCodeProtocol, |
| 520 | onWrite: func() {}, |
| 521 | }) |
| 522 | s.cancel() |
| 523 | return false |
| 524 | } |
| 525 | if t.inTapHandle != nil { |
| 526 | var err error |
| 527 | if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil { |
| 528 | t.mu.Unlock() |
| 529 | if logger.V(logLevel) { |
| 530 | logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err) |
| 531 | } |
| 532 | stat, ok := status.FromError(err) |
| 533 | if !ok { |
| 534 | stat = status.New(codes.PermissionDenied, err.Error()) |
| 535 | } |
| 536 | t.controlBuf.put(&earlyAbortStream{ |
khenaidoo | 5cb0d40 | 2021-12-08 14:09:16 -0500 | [diff] [blame] | 537 | httpStatus: 200, |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 538 | streamID: s.id, |
| 539 | contentSubtype: s.contentSubtype, |
| 540 | status: stat, |
| 541 | }) |
| 542 | return false |
| 543 | } |
| 544 | } |
| 545 | t.activeStreams[streamID] = s |
| 546 | if len(t.activeStreams) == 1 { |
| 547 | t.idle = time.Time{} |
| 548 | } |
| 549 | t.mu.Unlock() |
| 550 | if channelz.IsOn() { |
| 551 | atomic.AddInt64(&t.czData.streamsStarted, 1) |
| 552 | atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) |
| 553 | } |
| 554 | s.requestRead = func(n int) { |
| 555 | t.adjustWindow(s, uint32(n)) |
| 556 | } |
| 557 | s.ctx = traceCtx(s.ctx, s.method) |
| 558 | if t.stats != nil { |
| 559 | s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) |
| 560 | inHeader := &stats.InHeader{ |
| 561 | FullMethod: s.method, |
| 562 | RemoteAddr: t.remoteAddr, |
| 563 | LocalAddr: t.localAddr, |
| 564 | Compression: s.recvCompress, |
| 565 | WireLength: int(frame.Header().Length), |
| 566 | Header: metadata.MD(mdata).Copy(), |
| 567 | } |
| 568 | t.stats.HandleRPC(s.ctx, inHeader) |
| 569 | } |
| 570 | s.ctxDone = s.ctx.Done() |
| 571 | s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) |
| 572 | s.trReader = &transportReader{ |
| 573 | reader: &recvBufferReader{ |
| 574 | ctx: s.ctx, |
| 575 | ctxDone: s.ctxDone, |
| 576 | recv: s.buf, |
| 577 | freeBuffer: t.bufferPool.put, |
| 578 | }, |
| 579 | windowHandler: func(n int) { |
| 580 | t.updateWindow(s, uint32(n)) |
| 581 | }, |
| 582 | } |
| 583 | // Register the stream with loopy. |
| 584 | t.controlBuf.put(®isterStream{ |
| 585 | streamID: s.id, |
| 586 | wq: s.wq, |
| 587 | }) |
| 588 | handle(s) |
| 589 | return false |
| 590 | } |
| 591 | |
| 592 | // HandleStreams receives incoming streams using the given handler. This is |
| 593 | // typically run in a separate goroutine. |
| 594 | // traceCtx attaches trace to ctx and returns the new context. |
| 595 | func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { |
| 596 | defer close(t.readerDone) |
| 597 | for { |
| 598 | t.controlBuf.throttle() |
| 599 | frame, err := t.framer.fr.ReadFrame() |
| 600 | atomic.StoreInt64(&t.lastRead, time.Now().UnixNano()) |
| 601 | if err != nil { |
| 602 | if se, ok := err.(http2.StreamError); ok { |
| 603 | if logger.V(logLevel) { |
| 604 | logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se) |
| 605 | } |
| 606 | t.mu.Lock() |
| 607 | s := t.activeStreams[se.StreamID] |
| 608 | t.mu.Unlock() |
| 609 | if s != nil { |
| 610 | t.closeStream(s, true, se.Code, false) |
| 611 | } else { |
| 612 | t.controlBuf.put(&cleanupStream{ |
| 613 | streamID: se.StreamID, |
| 614 | rst: true, |
| 615 | rstCode: se.Code, |
| 616 | onWrite: func() {}, |
| 617 | }) |
| 618 | } |
| 619 | continue |
| 620 | } |
| 621 | if err == io.EOF || err == io.ErrUnexpectedEOF { |
| 622 | t.Close() |
| 623 | return |
| 624 | } |
| 625 | if logger.V(logLevel) { |
| 626 | logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) |
| 627 | } |
| 628 | t.Close() |
| 629 | return |
| 630 | } |
| 631 | switch frame := frame.(type) { |
| 632 | case *http2.MetaHeadersFrame: |
| 633 | if t.operateHeaders(frame, handle, traceCtx) { |
| 634 | t.Close() |
| 635 | break |
| 636 | } |
| 637 | case *http2.DataFrame: |
| 638 | t.handleData(frame) |
| 639 | case *http2.RSTStreamFrame: |
| 640 | t.handleRSTStream(frame) |
| 641 | case *http2.SettingsFrame: |
| 642 | t.handleSettings(frame) |
| 643 | case *http2.PingFrame: |
| 644 | t.handlePing(frame) |
| 645 | case *http2.WindowUpdateFrame: |
| 646 | t.handleWindowUpdate(frame) |
| 647 | case *http2.GoAwayFrame: |
| 648 | // TODO: Handle GoAway from the client appropriately. |
| 649 | default: |
| 650 | if logger.V(logLevel) { |
| 651 | logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) |
| 652 | } |
| 653 | } |
| 654 | } |
| 655 | } |
| 656 | |
| 657 | func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { |
| 658 | t.mu.Lock() |
| 659 | defer t.mu.Unlock() |
| 660 | if t.activeStreams == nil { |
| 661 | // The transport is closing. |
| 662 | return nil, false |
| 663 | } |
| 664 | s, ok := t.activeStreams[f.Header().StreamID] |
| 665 | if !ok { |
| 666 | // The stream is already done. |
| 667 | return nil, false |
| 668 | } |
| 669 | return s, true |
| 670 | } |
| 671 | |
| 672 | // adjustWindow sends out extra window update over the initial window size |
| 673 | // of stream if the application is requesting data larger in size than |
| 674 | // the window. |
| 675 | func (t *http2Server) adjustWindow(s *Stream, n uint32) { |
| 676 | if w := s.fc.maybeAdjust(n); w > 0 { |
| 677 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) |
| 678 | } |
| 679 | |
| 680 | } |
| 681 | |
| 682 | // updateWindow adjusts the inbound quota for the stream and the transport. |
| 683 | // Window updates will deliver to the controller for sending when |
| 684 | // the cumulative quota exceeds the corresponding threshold. |
| 685 | func (t *http2Server) updateWindow(s *Stream, n uint32) { |
| 686 | if w := s.fc.onRead(n); w > 0 { |
| 687 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, |
| 688 | increment: w, |
| 689 | }) |
| 690 | } |
| 691 | } |
| 692 | |
| 693 | // updateFlowControl updates the incoming flow control windows |
| 694 | // for the transport and the stream based on the current bdp |
| 695 | // estimation. |
| 696 | func (t *http2Server) updateFlowControl(n uint32) { |
| 697 | t.mu.Lock() |
| 698 | for _, s := range t.activeStreams { |
| 699 | s.fc.newLimit(n) |
| 700 | } |
| 701 | t.initialWindowSize = int32(n) |
| 702 | t.mu.Unlock() |
| 703 | t.controlBuf.put(&outgoingWindowUpdate{ |
| 704 | streamID: 0, |
| 705 | increment: t.fc.newLimit(n), |
| 706 | }) |
| 707 | t.controlBuf.put(&outgoingSettings{ |
| 708 | ss: []http2.Setting{ |
| 709 | { |
| 710 | ID: http2.SettingInitialWindowSize, |
| 711 | Val: n, |
| 712 | }, |
| 713 | }, |
| 714 | }) |
| 715 | |
| 716 | } |
| 717 | |
| 718 | func (t *http2Server) handleData(f *http2.DataFrame) { |
| 719 | size := f.Header().Length |
| 720 | var sendBDPPing bool |
| 721 | if t.bdpEst != nil { |
| 722 | sendBDPPing = t.bdpEst.add(size) |
| 723 | } |
| 724 | // Decouple connection's flow control from application's read. |
| 725 | // An update on connection's flow control should not depend on |
| 726 | // whether user application has read the data or not. Such a |
| 727 | // restriction is already imposed on the stream's flow control, |
| 728 | // and therefore the sender will be blocked anyways. |
| 729 | // Decoupling the connection flow control will prevent other |
| 730 | // active(fast) streams from starving in presence of slow or |
| 731 | // inactive streams. |
| 732 | if w := t.fc.onData(size); w > 0 { |
| 733 | t.controlBuf.put(&outgoingWindowUpdate{ |
| 734 | streamID: 0, |
| 735 | increment: w, |
| 736 | }) |
| 737 | } |
| 738 | if sendBDPPing { |
| 739 | // Avoid excessive ping detection (e.g. in an L7 proxy) |
| 740 | // by sending a window update prior to the BDP ping. |
| 741 | if w := t.fc.reset(); w > 0 { |
| 742 | t.controlBuf.put(&outgoingWindowUpdate{ |
| 743 | streamID: 0, |
| 744 | increment: w, |
| 745 | }) |
| 746 | } |
| 747 | t.controlBuf.put(bdpPing) |
| 748 | } |
| 749 | // Select the right stream to dispatch. |
| 750 | s, ok := t.getStream(f) |
| 751 | if !ok { |
| 752 | return |
| 753 | } |
| 754 | if s.getState() == streamReadDone { |
| 755 | t.closeStream(s, true, http2.ErrCodeStreamClosed, false) |
| 756 | return |
| 757 | } |
| 758 | if size > 0 { |
| 759 | if err := s.fc.onData(size); err != nil { |
| 760 | t.closeStream(s, true, http2.ErrCodeFlowControl, false) |
| 761 | return |
| 762 | } |
| 763 | if f.Header().Flags.Has(http2.FlagDataPadded) { |
| 764 | if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { |
| 765 | t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) |
| 766 | } |
| 767 | } |
| 768 | // TODO(bradfitz, zhaoq): A copy is required here because there is no |
| 769 | // guarantee f.Data() is consumed before the arrival of next frame. |
| 770 | // Can this copy be eliminated? |
| 771 | if len(f.Data()) > 0 { |
| 772 | buffer := t.bufferPool.get() |
| 773 | buffer.Reset() |
| 774 | buffer.Write(f.Data()) |
| 775 | s.write(recvMsg{buffer: buffer}) |
| 776 | } |
| 777 | } |
khenaidoo | 5cb0d40 | 2021-12-08 14:09:16 -0500 | [diff] [blame] | 778 | if f.StreamEnded() { |
khenaidoo | 5fc5cea | 2021-08-11 17:39:16 -0400 | [diff] [blame] | 779 | // Received the end of stream from the client. |
| 780 | s.compareAndSwapState(streamActive, streamReadDone) |
| 781 | s.write(recvMsg{err: io.EOF}) |
| 782 | } |
| 783 | } |
| 784 | |
| 785 | func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { |
| 786 | // If the stream is not deleted from the transport's active streams map, then do a regular close stream. |
| 787 | if s, ok := t.getStream(f); ok { |
| 788 | t.closeStream(s, false, 0, false) |
| 789 | return |
| 790 | } |
| 791 | // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map. |
| 792 | t.controlBuf.put(&cleanupStream{ |
| 793 | streamID: f.Header().StreamID, |
| 794 | rst: false, |
| 795 | rstCode: 0, |
| 796 | onWrite: func() {}, |
| 797 | }) |
| 798 | } |
| 799 | |
| 800 | func (t *http2Server) handleSettings(f *http2.SettingsFrame) { |
| 801 | if f.IsAck() { |
| 802 | return |
| 803 | } |
| 804 | var ss []http2.Setting |
| 805 | var updateFuncs []func() |
| 806 | f.ForeachSetting(func(s http2.Setting) error { |
| 807 | switch s.ID { |
| 808 | case http2.SettingMaxHeaderListSize: |
| 809 | updateFuncs = append(updateFuncs, func() { |
| 810 | t.maxSendHeaderListSize = new(uint32) |
| 811 | *t.maxSendHeaderListSize = s.Val |
| 812 | }) |
| 813 | default: |
| 814 | ss = append(ss, s) |
| 815 | } |
| 816 | return nil |
| 817 | }) |
| 818 | t.controlBuf.executeAndPut(func(interface{}) bool { |
| 819 | for _, f := range updateFuncs { |
| 820 | f() |
| 821 | } |
| 822 | return true |
| 823 | }, &incomingSettings{ |
| 824 | ss: ss, |
| 825 | }) |
| 826 | } |
| 827 | |
| 828 | const ( |
| 829 | maxPingStrikes = 2 |
| 830 | defaultPingTimeout = 2 * time.Hour |
| 831 | ) |
| 832 | |
| 833 | func (t *http2Server) handlePing(f *http2.PingFrame) { |
| 834 | if f.IsAck() { |
| 835 | if f.Data == goAwayPing.data && t.drainChan != nil { |
| 836 | close(t.drainChan) |
| 837 | return |
| 838 | } |
| 839 | // Maybe it's a BDP ping. |
| 840 | if t.bdpEst != nil { |
| 841 | t.bdpEst.calculate(f.Data) |
| 842 | } |
| 843 | return |
| 844 | } |
| 845 | pingAck := &ping{ack: true} |
| 846 | copy(pingAck.data[:], f.Data[:]) |
| 847 | t.controlBuf.put(pingAck) |
| 848 | |
| 849 | now := time.Now() |
| 850 | defer func() { |
| 851 | t.lastPingAt = now |
| 852 | }() |
| 853 | // A reset ping strikes means that we don't need to check for policy |
| 854 | // violation for this ping and the pingStrikes counter should be set |
| 855 | // to 0. |
| 856 | if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { |
| 857 | t.pingStrikes = 0 |
| 858 | return |
| 859 | } |
| 860 | t.mu.Lock() |
| 861 | ns := len(t.activeStreams) |
| 862 | t.mu.Unlock() |
| 863 | if ns < 1 && !t.kep.PermitWithoutStream { |
| 864 | // Keepalive shouldn't be active thus, this new ping should |
| 865 | // have come after at least defaultPingTimeout. |
| 866 | if t.lastPingAt.Add(defaultPingTimeout).After(now) { |
| 867 | t.pingStrikes++ |
| 868 | } |
| 869 | } else { |
| 870 | // Check if keepalive policy is respected. |
| 871 | if t.lastPingAt.Add(t.kep.MinTime).After(now) { |
| 872 | t.pingStrikes++ |
| 873 | } |
| 874 | } |
| 875 | |
| 876 | if t.pingStrikes > maxPingStrikes { |
| 877 | // Send goaway and close the connection. |
| 878 | if logger.V(logLevel) { |
| 879 | logger.Errorf("transport: Got too many pings from the client, closing the connection.") |
| 880 | } |
| 881 | t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) |
| 882 | } |
| 883 | } |
| 884 | |
| 885 | func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { |
| 886 | t.controlBuf.put(&incomingWindowUpdate{ |
| 887 | streamID: f.Header().StreamID, |
| 888 | increment: f.Increment, |
| 889 | }) |
| 890 | } |
| 891 | |
| 892 | func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField { |
| 893 | for k, vv := range md { |
| 894 | if isReservedHeader(k) { |
| 895 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. |
| 896 | continue |
| 897 | } |
| 898 | for _, v := range vv { |
| 899 | headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) |
| 900 | } |
| 901 | } |
| 902 | return headerFields |
| 903 | } |
| 904 | |
| 905 | func (t *http2Server) checkForHeaderListSize(it interface{}) bool { |
| 906 | if t.maxSendHeaderListSize == nil { |
| 907 | return true |
| 908 | } |
| 909 | hdrFrame := it.(*headerFrame) |
| 910 | var sz int64 |
| 911 | for _, f := range hdrFrame.hf { |
| 912 | if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { |
| 913 | if logger.V(logLevel) { |
| 914 | logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) |
| 915 | } |
| 916 | return false |
| 917 | } |
| 918 | } |
| 919 | return true |
| 920 | } |
| 921 | |
| 922 | // WriteHeader sends the header metadata md back to the client. |
| 923 | func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { |
| 924 | if s.updateHeaderSent() || s.getState() == streamDone { |
| 925 | return ErrIllegalHeaderWrite |
| 926 | } |
| 927 | s.hdrMu.Lock() |
| 928 | if md.Len() > 0 { |
| 929 | if s.header.Len() > 0 { |
| 930 | s.header = metadata.Join(s.header, md) |
| 931 | } else { |
| 932 | s.header = md |
| 933 | } |
| 934 | } |
| 935 | if err := t.writeHeaderLocked(s); err != nil { |
| 936 | s.hdrMu.Unlock() |
| 937 | return err |
| 938 | } |
| 939 | s.hdrMu.Unlock() |
| 940 | return nil |
| 941 | } |
| 942 | |
| 943 | func (t *http2Server) setResetPingStrikes() { |
| 944 | atomic.StoreUint32(&t.resetPingStrikes, 1) |
| 945 | } |
| 946 | |
| 947 | func (t *http2Server) writeHeaderLocked(s *Stream) error { |
| 948 | // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields |
| 949 | // first and create a slice of that exact size. |
| 950 | headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. |
| 951 | headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) |
| 952 | headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)}) |
| 953 | if s.sendCompress != "" { |
| 954 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) |
| 955 | } |
| 956 | headerFields = appendHeaderFieldsFromMD(headerFields, s.header) |
| 957 | success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{ |
| 958 | streamID: s.id, |
| 959 | hf: headerFields, |
| 960 | endStream: false, |
| 961 | onWrite: t.setResetPingStrikes, |
| 962 | }) |
| 963 | if !success { |
| 964 | if err != nil { |
| 965 | return err |
| 966 | } |
| 967 | t.closeStream(s, true, http2.ErrCodeInternal, false) |
| 968 | return ErrHeaderListSizeLimitViolation |
| 969 | } |
| 970 | if t.stats != nil { |
| 971 | // Note: Headers are compressed with hpack after this call returns. |
| 972 | // No WireLength field is set here. |
| 973 | outHeader := &stats.OutHeader{ |
| 974 | Header: s.header.Copy(), |
| 975 | Compression: s.sendCompress, |
| 976 | } |
| 977 | t.stats.HandleRPC(s.Context(), outHeader) |
| 978 | } |
| 979 | return nil |
| 980 | } |
| 981 | |
| 982 | // WriteStatus sends stream status to the client and terminates the stream. |
| 983 | // There is no further I/O operations being able to perform on this stream. |
| 984 | // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early |
| 985 | // OK is adopted. |
| 986 | func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { |
| 987 | if s.getState() == streamDone { |
| 988 | return nil |
| 989 | } |
| 990 | s.hdrMu.Lock() |
| 991 | // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields |
| 992 | // first and create a slice of that exact size. |
| 993 | headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. |
| 994 | if !s.updateHeaderSent() { // No headers have been sent. |
| 995 | if len(s.header) > 0 { // Send a separate header frame. |
| 996 | if err := t.writeHeaderLocked(s); err != nil { |
| 997 | s.hdrMu.Unlock() |
| 998 | return err |
| 999 | } |
| 1000 | } else { // Send a trailer only response. |
| 1001 | headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) |
| 1002 | headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)}) |
| 1003 | } |
| 1004 | } |
| 1005 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) |
| 1006 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) |
| 1007 | |
| 1008 | if p := st.Proto(); p != nil && len(p.Details) > 0 { |
| 1009 | stBytes, err := proto.Marshal(p) |
| 1010 | if err != nil { |
| 1011 | // TODO: return error instead, when callers are able to handle it. |
| 1012 | logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err) |
| 1013 | } else { |
| 1014 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)}) |
| 1015 | } |
| 1016 | } |
| 1017 | |
| 1018 | // Attach the trailer metadata. |
| 1019 | headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer) |
| 1020 | trailingHeader := &headerFrame{ |
| 1021 | streamID: s.id, |
| 1022 | hf: headerFields, |
| 1023 | endStream: true, |
| 1024 | onWrite: t.setResetPingStrikes, |
| 1025 | } |
| 1026 | s.hdrMu.Unlock() |
| 1027 | success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader) |
| 1028 | if !success { |
| 1029 | if err != nil { |
| 1030 | return err |
| 1031 | } |
| 1032 | t.closeStream(s, true, http2.ErrCodeInternal, false) |
| 1033 | return ErrHeaderListSizeLimitViolation |
| 1034 | } |
| 1035 | // Send a RST_STREAM after the trailers if the client has not already half-closed. |
| 1036 | rst := s.getState() == streamActive |
| 1037 | t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true) |
| 1038 | if t.stats != nil { |
| 1039 | // Note: The trailer fields are compressed with hpack after this call returns. |
| 1040 | // No WireLength field is set here. |
| 1041 | t.stats.HandleRPC(s.Context(), &stats.OutTrailer{ |
| 1042 | Trailer: s.trailer.Copy(), |
| 1043 | }) |
| 1044 | } |
| 1045 | return nil |
| 1046 | } |
| 1047 | |
| 1048 | // Write converts the data into HTTP2 data frame and sends it out. Non-nil error |
| 1049 | // is returns if it fails (e.g., framing error, transport error). |
| 1050 | func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { |
| 1051 | if !s.isHeaderSent() { // Headers haven't been written yet. |
| 1052 | if err := t.WriteHeader(s, nil); err != nil { |
| 1053 | if _, ok := err.(ConnectionError); ok { |
| 1054 | return err |
| 1055 | } |
| 1056 | // TODO(mmukhi, dfawley): Make sure this is the right code to return. |
| 1057 | return status.Errorf(codes.Internal, "transport: %v", err) |
| 1058 | } |
| 1059 | } else { |
| 1060 | // Writing headers checks for this condition. |
| 1061 | if s.getState() == streamDone { |
| 1062 | // TODO(mmukhi, dfawley): Should the server write also return io.EOF? |
| 1063 | s.cancel() |
| 1064 | select { |
| 1065 | case <-t.done: |
| 1066 | return ErrConnClosing |
| 1067 | default: |
| 1068 | } |
| 1069 | return ContextErr(s.ctx.Err()) |
| 1070 | } |
| 1071 | } |
| 1072 | df := &dataFrame{ |
| 1073 | streamID: s.id, |
| 1074 | h: hdr, |
| 1075 | d: data, |
| 1076 | onEachWrite: t.setResetPingStrikes, |
| 1077 | } |
| 1078 | if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { |
| 1079 | select { |
| 1080 | case <-t.done: |
| 1081 | return ErrConnClosing |
| 1082 | default: |
| 1083 | } |
| 1084 | return ContextErr(s.ctx.Err()) |
| 1085 | } |
| 1086 | return t.controlBuf.put(df) |
| 1087 | } |
| 1088 | |
| 1089 | // keepalive running in a separate goroutine does the following: |
| 1090 | // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. |
| 1091 | // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. |
| 1092 | // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. |
| 1093 | // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection |
| 1094 | // after an additional duration of keepalive.Timeout. |
| 1095 | func (t *http2Server) keepalive() { |
| 1096 | p := &ping{} |
| 1097 | // True iff a ping has been sent, and no data has been received since then. |
| 1098 | outstandingPing := false |
| 1099 | // Amount of time remaining before which we should receive an ACK for the |
| 1100 | // last sent ping. |
| 1101 | kpTimeoutLeft := time.Duration(0) |
| 1102 | // Records the last value of t.lastRead before we go block on the timer. |
| 1103 | // This is required to check for read activity since then. |
| 1104 | prevNano := time.Now().UnixNano() |
| 1105 | // Initialize the different timers to their default values. |
| 1106 | idleTimer := time.NewTimer(t.kp.MaxConnectionIdle) |
| 1107 | ageTimer := time.NewTimer(t.kp.MaxConnectionAge) |
| 1108 | kpTimer := time.NewTimer(t.kp.Time) |
| 1109 | defer func() { |
| 1110 | // We need to drain the underlying channel in these timers after a call |
| 1111 | // to Stop(), only if we are interested in resetting them. Clearly we |
| 1112 | // are not interested in resetting them here. |
| 1113 | idleTimer.Stop() |
| 1114 | ageTimer.Stop() |
| 1115 | kpTimer.Stop() |
| 1116 | }() |
| 1117 | |
| 1118 | for { |
| 1119 | select { |
| 1120 | case <-idleTimer.C: |
| 1121 | t.mu.Lock() |
| 1122 | idle := t.idle |
| 1123 | if idle.IsZero() { // The connection is non-idle. |
| 1124 | t.mu.Unlock() |
| 1125 | idleTimer.Reset(t.kp.MaxConnectionIdle) |
| 1126 | continue |
| 1127 | } |
| 1128 | val := t.kp.MaxConnectionIdle - time.Since(idle) |
| 1129 | t.mu.Unlock() |
| 1130 | if val <= 0 { |
| 1131 | // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. |
| 1132 | // Gracefully close the connection. |
| 1133 | t.Drain() |
| 1134 | return |
| 1135 | } |
| 1136 | idleTimer.Reset(val) |
| 1137 | case <-ageTimer.C: |
| 1138 | t.Drain() |
| 1139 | ageTimer.Reset(t.kp.MaxConnectionAgeGrace) |
| 1140 | select { |
| 1141 | case <-ageTimer.C: |
| 1142 | // Close the connection after grace period. |
| 1143 | if logger.V(logLevel) { |
| 1144 | logger.Infof("transport: closing server transport due to maximum connection age.") |
| 1145 | } |
| 1146 | t.Close() |
| 1147 | case <-t.done: |
| 1148 | } |
| 1149 | return |
| 1150 | case <-kpTimer.C: |
| 1151 | lastRead := atomic.LoadInt64(&t.lastRead) |
| 1152 | if lastRead > prevNano { |
| 1153 | // There has been read activity since the last time we were |
| 1154 | // here. Setup the timer to fire at kp.Time seconds from |
| 1155 | // lastRead time and continue. |
| 1156 | outstandingPing = false |
| 1157 | kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano())) |
| 1158 | prevNano = lastRead |
| 1159 | continue |
| 1160 | } |
| 1161 | if outstandingPing && kpTimeoutLeft <= 0 { |
| 1162 | if logger.V(logLevel) { |
| 1163 | logger.Infof("transport: closing server transport due to idleness.") |
| 1164 | } |
| 1165 | t.Close() |
| 1166 | return |
| 1167 | } |
| 1168 | if !outstandingPing { |
| 1169 | if channelz.IsOn() { |
| 1170 | atomic.AddInt64(&t.czData.kpCount, 1) |
| 1171 | } |
| 1172 | t.controlBuf.put(p) |
| 1173 | kpTimeoutLeft = t.kp.Timeout |
| 1174 | outstandingPing = true |
| 1175 | } |
| 1176 | // The amount of time to sleep here is the minimum of kp.Time and |
| 1177 | // timeoutLeft. This will ensure that we wait only for kp.Time |
| 1178 | // before sending out the next ping (for cases where the ping is |
| 1179 | // acked). |
| 1180 | sleepDuration := minTime(t.kp.Time, kpTimeoutLeft) |
| 1181 | kpTimeoutLeft -= sleepDuration |
| 1182 | kpTimer.Reset(sleepDuration) |
| 1183 | case <-t.done: |
| 1184 | return |
| 1185 | } |
| 1186 | } |
| 1187 | } |
| 1188 | |
| 1189 | // Close starts shutting down the http2Server transport. |
| 1190 | // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This |
| 1191 | // could cause some resource issue. Revisit this later. |
| 1192 | func (t *http2Server) Close() { |
| 1193 | t.mu.Lock() |
| 1194 | if t.state == closing { |
| 1195 | t.mu.Unlock() |
| 1196 | return |
| 1197 | } |
| 1198 | t.state = closing |
| 1199 | streams := t.activeStreams |
| 1200 | t.activeStreams = nil |
| 1201 | t.mu.Unlock() |
| 1202 | t.controlBuf.finish() |
| 1203 | close(t.done) |
| 1204 | if err := t.conn.Close(); err != nil && logger.V(logLevel) { |
| 1205 | logger.Infof("transport: error closing conn during Close: %v", err) |
| 1206 | } |
| 1207 | if channelz.IsOn() { |
| 1208 | channelz.RemoveEntry(t.channelzID) |
| 1209 | } |
| 1210 | // Cancel all active streams. |
| 1211 | for _, s := range streams { |
| 1212 | s.cancel() |
| 1213 | } |
| 1214 | if t.stats != nil { |
| 1215 | connEnd := &stats.ConnEnd{} |
| 1216 | t.stats.HandleConn(t.ctx, connEnd) |
| 1217 | } |
| 1218 | } |
| 1219 | |
| 1220 | // deleteStream deletes the stream s from transport's active streams. |
| 1221 | func (t *http2Server) deleteStream(s *Stream, eosReceived bool) { |
| 1222 | // In case stream sending and receiving are invoked in separate |
| 1223 | // goroutines (e.g., bi-directional streaming), cancel needs to be |
| 1224 | // called to interrupt the potential blocking on other goroutines. |
| 1225 | s.cancel() |
| 1226 | |
| 1227 | t.mu.Lock() |
| 1228 | if _, ok := t.activeStreams[s.id]; ok { |
| 1229 | delete(t.activeStreams, s.id) |
| 1230 | if len(t.activeStreams) == 0 { |
| 1231 | t.idle = time.Now() |
| 1232 | } |
| 1233 | } |
| 1234 | t.mu.Unlock() |
| 1235 | |
| 1236 | if channelz.IsOn() { |
| 1237 | if eosReceived { |
| 1238 | atomic.AddInt64(&t.czData.streamsSucceeded, 1) |
| 1239 | } else { |
| 1240 | atomic.AddInt64(&t.czData.streamsFailed, 1) |
| 1241 | } |
| 1242 | } |
| 1243 | } |
| 1244 | |
| 1245 | // finishStream closes the stream and puts the trailing headerFrame into controlbuf. |
| 1246 | func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) { |
| 1247 | oldState := s.swapState(streamDone) |
| 1248 | if oldState == streamDone { |
| 1249 | // If the stream was already done, return. |
| 1250 | return |
| 1251 | } |
| 1252 | |
| 1253 | hdr.cleanup = &cleanupStream{ |
| 1254 | streamID: s.id, |
| 1255 | rst: rst, |
| 1256 | rstCode: rstCode, |
| 1257 | onWrite: func() { |
| 1258 | t.deleteStream(s, eosReceived) |
| 1259 | }, |
| 1260 | } |
| 1261 | t.controlBuf.put(hdr) |
| 1262 | } |
| 1263 | |
| 1264 | // closeStream clears the footprint of a stream when the stream is not needed any more. |
| 1265 | func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) { |
| 1266 | s.swapState(streamDone) |
| 1267 | t.deleteStream(s, eosReceived) |
| 1268 | |
| 1269 | t.controlBuf.put(&cleanupStream{ |
| 1270 | streamID: s.id, |
| 1271 | rst: rst, |
| 1272 | rstCode: rstCode, |
| 1273 | onWrite: func() {}, |
| 1274 | }) |
| 1275 | } |
| 1276 | |
| 1277 | func (t *http2Server) RemoteAddr() net.Addr { |
| 1278 | return t.remoteAddr |
| 1279 | } |
| 1280 | |
| 1281 | func (t *http2Server) Drain() { |
| 1282 | t.mu.Lock() |
| 1283 | defer t.mu.Unlock() |
| 1284 | if t.drainChan != nil { |
| 1285 | return |
| 1286 | } |
| 1287 | t.drainChan = make(chan struct{}) |
| 1288 | t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true}) |
| 1289 | } |
| 1290 | |
| 1291 | var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} |
| 1292 | |
| 1293 | // Handles outgoing GoAway and returns true if loopy needs to put itself |
| 1294 | // in draining mode. |
| 1295 | func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { |
| 1296 | t.mu.Lock() |
| 1297 | if t.state == closing { // TODO(mmukhi): This seems unnecessary. |
| 1298 | t.mu.Unlock() |
| 1299 | // The transport is closing. |
| 1300 | return false, ErrConnClosing |
| 1301 | } |
| 1302 | sid := t.maxStreamID |
| 1303 | if !g.headsUp { |
| 1304 | // Stop accepting more streams now. |
| 1305 | t.state = draining |
| 1306 | if len(t.activeStreams) == 0 { |
| 1307 | g.closeConn = true |
| 1308 | } |
| 1309 | t.mu.Unlock() |
| 1310 | if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil { |
| 1311 | return false, err |
| 1312 | } |
| 1313 | if g.closeConn { |
| 1314 | // Abruptly close the connection following the GoAway (via |
| 1315 | // loopywriter). But flush out what's inside the buffer first. |
| 1316 | t.framer.writer.Flush() |
| 1317 | return false, fmt.Errorf("transport: Connection closing") |
| 1318 | } |
| 1319 | return true, nil |
| 1320 | } |
| 1321 | t.mu.Unlock() |
| 1322 | // For a graceful close, send out a GoAway with stream ID of MaxUInt32, |
| 1323 | // Follow that with a ping and wait for the ack to come back or a timer |
| 1324 | // to expire. During this time accept new streams since they might have |
| 1325 | // originated before the GoAway reaches the client. |
| 1326 | // After getting the ack or timer expiration send out another GoAway this |
| 1327 | // time with an ID of the max stream server intends to process. |
| 1328 | if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil { |
| 1329 | return false, err |
| 1330 | } |
| 1331 | if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil { |
| 1332 | return false, err |
| 1333 | } |
| 1334 | go func() { |
| 1335 | timer := time.NewTimer(time.Minute) |
| 1336 | defer timer.Stop() |
| 1337 | select { |
| 1338 | case <-t.drainChan: |
| 1339 | case <-timer.C: |
| 1340 | case <-t.done: |
| 1341 | return |
| 1342 | } |
| 1343 | t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData}) |
| 1344 | }() |
| 1345 | return false, nil |
| 1346 | } |
| 1347 | |
| 1348 | func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric { |
| 1349 | s := channelz.SocketInternalMetric{ |
| 1350 | StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted), |
| 1351 | StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded), |
| 1352 | StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed), |
| 1353 | MessagesSent: atomic.LoadInt64(&t.czData.msgSent), |
| 1354 | MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv), |
| 1355 | KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount), |
| 1356 | LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), |
| 1357 | LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), |
| 1358 | LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), |
| 1359 | LocalFlowControlWindow: int64(t.fc.getSize()), |
| 1360 | SocketOptions: channelz.GetSocketOption(t.conn), |
| 1361 | LocalAddr: t.localAddr, |
| 1362 | RemoteAddr: t.remoteAddr, |
| 1363 | // RemoteName : |
| 1364 | } |
| 1365 | if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { |
| 1366 | s.Security = au.GetSecurityValue() |
| 1367 | } |
| 1368 | s.RemoteFlowControlWindow = t.getOutFlowWindow() |
| 1369 | return &s |
| 1370 | } |
| 1371 | |
| 1372 | func (t *http2Server) IncrMsgSent() { |
| 1373 | atomic.AddInt64(&t.czData.msgSent, 1) |
| 1374 | atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) |
| 1375 | } |
| 1376 | |
| 1377 | func (t *http2Server) IncrMsgRecv() { |
| 1378 | atomic.AddInt64(&t.czData.msgRecv, 1) |
| 1379 | atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) |
| 1380 | } |
| 1381 | |
| 1382 | func (t *http2Server) getOutFlowWindow() int64 { |
| 1383 | resp := make(chan uint32, 1) |
| 1384 | timer := time.NewTimer(time.Second) |
| 1385 | defer timer.Stop() |
| 1386 | t.controlBuf.put(&outFlowControlSizeRequest{resp}) |
| 1387 | select { |
| 1388 | case sz := <-resp: |
| 1389 | return int64(sz) |
| 1390 | case <-t.done: |
| 1391 | return -1 |
| 1392 | case <-timer.C: |
| 1393 | return -2 |
| 1394 | } |
| 1395 | } |
| 1396 | |
| 1397 | func getJitter(v time.Duration) time.Duration { |
| 1398 | if v == infinity { |
| 1399 | return 0 |
| 1400 | } |
| 1401 | // Generate a jitter between +/- 10% of the value. |
| 1402 | r := int64(v / 10) |
| 1403 | j := grpcrand.Int63n(2*r) - r |
| 1404 | return time.Duration(j) |
| 1405 | } |
| 1406 | |
| 1407 | type connectionKey struct{} |
| 1408 | |
| 1409 | // GetConnection gets the connection from the context. |
| 1410 | func GetConnection(ctx context.Context) net.Conn { |
| 1411 | conn, _ := ctx.Value(connectionKey{}).(net.Conn) |
| 1412 | return conn |
| 1413 | } |
| 1414 | |
| 1415 | // SetConnection adds the connection to the context to be able to get |
| 1416 | // information about the destination ip and port for an incoming RPC. This also |
| 1417 | // allows any unary or streaming interceptors to see the connection. |
| 1418 | func setConnection(ctx context.Context, conn net.Conn) context.Context { |
| 1419 | return context.WithValue(ctx, connectionKey{}, conn) |
| 1420 | } |