[VOL-4442] grpc streaming connection monitoring
Change-Id: Id787e94cf28745d36e72f8ed2f5c316312714db4
diff --git a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
index f262edd..97198c5 100644
--- a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
+++ b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
@@ -136,12 +136,10 @@
// newLimit updates the inflow window to a new value n.
// It assumes that n is always greater than the old limit.
-func (f *inFlow) newLimit(n uint32) uint32 {
+func (f *inFlow) newLimit(n uint32) {
f.mu.Lock()
- d := n - f.limit
f.limit = n
f.mu.Unlock()
- return d
}
func (f *inFlow) maybeAdjust(n uint32) uint32 {
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 2521a7d..f0c72d3 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -201,6 +201,12 @@
}
}()
+ // gRPC, resolver, balancer etc. can specify arbitrary data in the
+ // Attributes field of resolver.Address, which is shoved into connectCtx
+ // and passed to the dialer and credential handshaker. This makes it possible for
+ // address specific arbitrary data to reach custom dialers and credential handshakers.
+ connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
+
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
if err != nil {
if opts.FailOnNonTempDialError {
@@ -245,11 +251,6 @@
}
}
if transportCreds != nil {
- // gRPC, resolver, balancer etc. can specify arbitrary data in the
- // Attributes field of resolver.Address, which is shoved into connectCtx
- // and passed to the credential handshaker. This makes it possible for
- // address specific arbitrary data to reach the credential handshaker.
- connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
rawConn := conn
// Pull the deadline from the connectCtx, which will be used for
// timeouts in the authentication protocol handshake. Can ignore the
@@ -587,7 +588,7 @@
return nil, err
}
- return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
+ return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", err)
}
for k, v := range data {
// Capital header names are illegal in HTTP/2.
@@ -1556,7 +1557,7 @@
return b
}
-// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
+// keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
// True iff a ping has been sent, and no data has been received since then.
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index f2cad9e..2c6eaf0 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -73,7 +73,6 @@
writerDone chan struct{} // sync point to enable testing.
remoteAddr net.Addr
localAddr net.Addr
- maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle
framer *framer
@@ -123,6 +122,11 @@
bufferPool *bufferPool
connectionID uint64
+
+ // maxStreamMu guards the maximum stream ID
+ // This lock may not be taken if mu is already held.
+ maxStreamMu sync.Mutex
+ maxStreamID uint32 // max stream ID ever seen
}
// NewServerTransport creates a http2 transport with conn and configuration
@@ -334,6 +338,10 @@
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
+ // Acquire max stream ID lock for entire duration
+ t.maxStreamMu.Lock()
+ defer t.maxStreamMu.Unlock()
+
streamID := frame.Header().StreamID
// frame.Truncated is set to true when framer detects that the current header
@@ -348,6 +356,15 @@
return false
}
+ if streamID%2 != 1 || streamID <= t.maxStreamID {
+ // illegal gRPC stream id.
+ if logger.V(logLevel) {
+ logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
+ }
+ return true
+ }
+ t.maxStreamID = streamID
+
buf := newRecvBuffer()
s := &Stream{
id: streamID,
@@ -355,7 +372,6 @@
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
}
-
var (
// If a gRPC Response-Headers has already been received, then it means
// that the peer is speaking gRPC and we are in gRPC mode.
@@ -498,16 +514,6 @@
s.cancel()
return false
}
- if streamID%2 != 1 || streamID <= t.maxStreamID {
- t.mu.Unlock()
- // illegal gRPC stream id.
- if logger.V(logLevel) {
- logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
- }
- s.cancel()
- return true
- }
- t.maxStreamID = streamID
if httpMethod != http.MethodPost {
t.mu.Unlock()
if logger.V(logLevel) {
@@ -1293,20 +1299,23 @@
// Handles outgoing GoAway and returns true if loopy needs to put itself
// in draining mode.
func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
+ t.maxStreamMu.Lock()
t.mu.Lock()
if t.state == closing { // TODO(mmukhi): This seems unnecessary.
t.mu.Unlock()
+ t.maxStreamMu.Unlock()
// The transport is closing.
return false, ErrConnClosing
}
- sid := t.maxStreamID
if !g.headsUp {
// Stop accepting more streams now.
t.state = draining
+ sid := t.maxStreamID
if len(t.activeStreams) == 0 {
g.closeConn = true
}
t.mu.Unlock()
+ t.maxStreamMu.Unlock()
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err
}
@@ -1319,6 +1328,7 @@
return true, nil
}
t.mu.Unlock()
+ t.maxStreamMu.Unlock()
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
// Follow that with a ping and wait for the ack to come back or a timer
// to expire. During this time accept new streams since they might have
diff --git a/vendor/google.golang.org/grpc/internal/transport/proxy.go b/vendor/google.golang.org/grpc/internal/transport/proxy.go
index a662bf3..4159619 100644
--- a/vendor/google.golang.org/grpc/internal/transport/proxy.go
+++ b/vendor/google.golang.org/grpc/internal/transport/proxy.go
@@ -37,7 +37,7 @@
httpProxyFromEnvironment = http.ProxyFromEnvironment
)
-func mapAddress(ctx context.Context, address string) (*url.URL, error) {
+func mapAddress(address string) (*url.URL, error) {
req := &http.Request{
URL: &url.URL{
Scheme: "https",
@@ -114,7 +114,7 @@
// connection.
func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) {
newAddr := addr
- proxyURL, err := mapAddress(ctx, addr)
+ proxyURL, err := mapAddress(addr)
if err != nil {
return nil, err
}