VOL-1921 - updated to use go mod
Change-Id: I8d5187fa91fa619494f972bc29d3bd61e5be3a82
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/regenerate.sh b/vendor/google.golang.org/grpc/internal/binarylog/regenerate.sh
old mode 100755
new mode 100644
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index 11be7cd..3ee8740 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -25,40 +25,11 @@
)
const (
- prefix = "GRPC_GO_"
- retryStr = prefix + "RETRY"
- requireHandshakeStr = prefix + "REQUIRE_HANDSHAKE"
-)
-
-// RequireHandshakeSetting describes the settings for handshaking.
-type RequireHandshakeSetting int
-
-const (
- // RequireHandshakeOn indicates to wait for handshake before considering a
- // connection ready/successful.
- RequireHandshakeOn RequireHandshakeSetting = iota
- // RequireHandshakeOff indicates to not wait for handshake before
- // considering a connection ready/successful.
- RequireHandshakeOff
+ prefix = "GRPC_GO_"
+ retryStr = prefix + "RETRY"
)
var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
- // RequireHandshake is set based upon the GRPC_GO_REQUIRE_HANDSHAKE
- // environment variable.
- //
- // Will be removed after the 1.18 release.
- RequireHandshake = RequireHandshakeOn
)
-
-func init() {
- switch strings.ToLower(os.Getenv(requireHandshakeStr)) {
- case "on":
- fallthrough
- default:
- RequireHandshake = RequireHandshakeOn
- case "off":
- RequireHandshake = RequireHandshakeOff
- }
-}
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index 204ba15..ddee20b 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -23,6 +23,7 @@
"fmt"
"runtime"
"sync"
+ "sync/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
@@ -84,24 +85,40 @@
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.
+// maxQueuedTransportResponseFrames is the most queued "transport response"
+// frames we will buffer before preventing new reads from occurring on the
+// transport. These are control frames sent in response to client requests,
+// such as RST_STREAM due to bad headers or settings acks.
+const maxQueuedTransportResponseFrames = 50
+
+type cbItem interface {
+ isTransportResponseFrame() bool
+}
+
// registerStream is used to register an incoming stream with loopy writer.
type registerStream struct {
streamID uint32
wq *writeQuota
}
+func (*registerStream) isTransportResponseFrame() bool { return false }
+
// headerFrame is also used to register stream on the client-side.
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
- endStream bool // Valid on server side.
- initStream func(uint32) (bool, error) // Used only on the client side.
+ endStream bool // Valid on server side.
+ initStream func(uint32) error // Used only on the client side.
onWrite func()
wq *writeQuota // write quota for the stream created.
cleanup *cleanupStream // Valid on the server side.
onOrphaned func(error) // Valid on client-side
}
+func (h *headerFrame) isTransportResponseFrame() bool {
+ return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
+}
+
type cleanupStream struct {
streamID uint32
rst bool
@@ -109,6 +126,8 @@
onWrite func()
}
+func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
+
type dataFrame struct {
streamID uint32
endStream bool
@@ -119,27 +138,41 @@
onEachWrite func()
}
+func (*dataFrame) isTransportResponseFrame() bool { return false }
+
type incomingWindowUpdate struct {
streamID uint32
increment uint32
}
+func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
+
type outgoingWindowUpdate struct {
streamID uint32
increment uint32
}
+func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
+ return false // window updates are throttled by thresholds
+}
+
type incomingSettings struct {
ss []http2.Setting
}
+func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
+
type outgoingSettings struct {
ss []http2.Setting
}
+func (*outgoingSettings) isTransportResponseFrame() bool { return false }
+
type incomingGoAway struct {
}
+func (*incomingGoAway) isTransportResponseFrame() bool { return false }
+
type goAway struct {
code http2.ErrCode
debugData []byte
@@ -147,15 +180,21 @@
closeConn bool
}
+func (*goAway) isTransportResponseFrame() bool { return false }
+
type ping struct {
ack bool
data [8]byte
}
+func (*ping) isTransportResponseFrame() bool { return true }
+
type outFlowControlSizeRequest struct {
resp chan uint32
}
+func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
+
type outStreamState int
const (
@@ -238,6 +277,14 @@
consumerWaiting bool
list *itemList
err error
+
+ // transportResponseFrames counts the number of queued items that represent
+ // the response of an action initiated by the peer. trfChan is created
+ // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
+ // closed and nilled when transportResponseFrames drops below the
+ // threshold. Both fields are protected by mu.
+ transportResponseFrames int
+ trfChan atomic.Value // *chan struct{}
}
func newControlBuffer(done <-chan struct{}) *controlBuffer {
@@ -248,12 +295,24 @@
}
}
-func (c *controlBuffer) put(it interface{}) error {
+// throttle blocks if there are too many incomingSettings/cleanupStreams in the
+// controlbuf.
+func (c *controlBuffer) throttle() {
+ ch, _ := c.trfChan.Load().(*chan struct{})
+ if ch != nil {
+ select {
+ case <-*ch:
+ case <-c.done:
+ }
+ }
+}
+
+func (c *controlBuffer) put(it cbItem) error {
_, err := c.executeAndPut(nil, it)
return err
}
-func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
+func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
var wakeUp bool
c.mu.Lock()
if c.err != nil {
@@ -271,6 +330,15 @@
c.consumerWaiting = false
}
c.list.enqueue(it)
+ if it.isTransportResponseFrame() {
+ c.transportResponseFrames++
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
+ // We are adding the frame that puts us over the threshold; create
+ // a throttling channel.
+ ch := make(chan struct{})
+ c.trfChan.Store(&ch)
+ }
+ }
c.mu.Unlock()
if wakeUp {
select {
@@ -304,7 +372,17 @@
return nil, c.err
}
if !c.list.isEmpty() {
- h := c.list.dequeue()
+ h := c.list.dequeue().(cbItem)
+ if h.isTransportResponseFrame() {
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
+ // We are removing the frame that put us over the
+ // threshold; close and clear the throttling channel.
+ ch := c.trfChan.Load().(*chan struct{})
+ close(*ch)
+ c.trfChan.Store((*chan struct{})(nil))
+ }
+ c.transportResponseFrames--
+ }
c.mu.Unlock()
return h, nil
}
@@ -559,21 +637,17 @@
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
- sendPing, err := hdr.initStream(str.id)
- if err != nil {
+ if err := hdr.initStream(str.id); err != nil {
if err == ErrConnClosing {
return err
}
// Other errors(errStreamDrain) need not close transport.
return nil
}
- if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
+ if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
return err
}
l.estdStreams[str.id] = str
- if sendPing {
- return l.pingHandler(&ping{data: [8]byte{}})
- }
return nil
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
index 5ea997a..f262edd 100644
--- a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
+++ b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
@@ -149,6 +149,7 @@
n = uint32(math.MaxInt32)
}
f.mu.Lock()
+ defer f.mu.Unlock()
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
// can send without a window update.
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
@@ -169,10 +170,8 @@
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
f.delta = n
}
- f.mu.Unlock()
return f.delta
}
- f.mu.Unlock()
return 0
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index c96178d..9bd8c27 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -62,8 +62,6 @@
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
- // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
- awakenKeepalive chan struct{}
framer *framer
// controlBuf delivers all the control related tasks (e.g., window
@@ -110,6 +108,16 @@
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
+ // A condition variable used to signal when the keepalive goroutine should
+ // go dormant. The condition for dormancy is based on the number of active
+ // streams and the `PermitWithoutStream` keepalive client parameter. And
+ // since the number of active streams is guarded by the above mutex, we use
+ // the same for this condition variable as well.
+ kpDormancyCond *sync.Cond
+ // A boolean to track whether the keepalive goroutine is dormant or not.
+ // This is checked before attempting to signal the above condition
+ // variable.
+ kpDormant bool
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
@@ -232,7 +240,6 @@
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
- awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
@@ -264,9 +271,6 @@
updateFlowControl: t.updateFlowControl,
}
}
- // Make sure awakenKeepalive can't be written upon.
- // keepalive routine will make it writable, if need be.
- t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
@@ -281,6 +285,7 @@
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
}
if t.keepaliveEnabled {
+ t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
@@ -493,6 +498,9 @@
}
func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
+ if len(t.perRPCCreds) == 0 {
+ return nil, nil
+ }
authData := map[string]string{}
for _, c := range t.perRPCCreds {
data, err := c.GetRequestMetadata(ctx, audience)
@@ -513,7 +521,7 @@
}
func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
- callAuthData := map[string]string{}
+ var callAuthData map[string]string
// Check if credentials.PerRPCCredentials were provided via call options.
// Note: if these credentials are provided both via dial options and call
// options, then both sets of credentials will be applied.
@@ -525,6 +533,7 @@
if err != nil {
return nil, status.Errorf(codes.Internal, "transport: %v", err)
}
+ callAuthData = make(map[string]string, len(data))
for k, v := range data {
// Capital header names are illegal in HTTP/2
k = strings.ToLower(k)
@@ -556,12 +565,11 @@
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
close(s.headerChan)
}
-
}
hdr := &headerFrame{
hf: headerFields,
endStream: false,
- initStream: func(id uint32) (bool, error) {
+ initStream: func(id uint32) error {
t.mu.Lock()
if state := t.state; state != reachable {
t.mu.Unlock()
@@ -571,29 +579,19 @@
err = ErrConnClosing
}
cleanup(err)
- return false, err
+ return err
}
t.activeStreams[id] = s
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
- var sendPing bool
- // If the number of active streams change from 0 to 1, then check if keepalive
- // has gone dormant. If so, wake it up.
- if len(t.activeStreams) == 1 && t.keepaliveEnabled {
- select {
- case t.awakenKeepalive <- struct{}{}:
- sendPing = true
- // Fill the awakenKeepalive channel again as this channel must be
- // kept non-writable except at the point that the keepalive()
- // goroutine is waiting either to be awaken or shutdown.
- t.awakenKeepalive <- struct{}{}
- default:
- }
+ // If the keepalive goroutine has gone dormant, wake it up.
+ if t.kpDormant {
+ t.kpDormancyCond.Signal()
}
t.mu.Unlock()
- return sendPing, nil
+ return nil
},
onOrphaned: cleanup,
wq: s.wq,
@@ -769,9 +767,17 @@
t.mu.Unlock()
return nil
}
+ // Call t.onClose before setting the state to closing to prevent the client
+ // from attempting to create new streams ASAP.
+ t.onClose()
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
+ if t.kpDormant {
+ // If the keepalive goroutine is blocked on this condition variable, we
+ // should unblock it so that the goroutine eventually exits.
+ t.kpDormancyCond.Signal()
+ }
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
@@ -789,7 +795,6 @@
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
- t.onClose()
return err
}
@@ -848,11 +853,11 @@
return t.controlBuf.put(df)
}
-func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
+func (t *http2Client) getStream(f http2.Frame) *Stream {
t.mu.Lock()
- defer t.mu.Unlock()
- s, ok := t.activeStreams[f.Header().StreamID]
- return s, ok
+ s := t.activeStreams[f.Header().StreamID]
+ t.mu.Unlock()
+ return s
}
// adjustWindow sends out extra window update over the initial window size
@@ -932,8 +937,8 @@
t.controlBuf.put(bdpPing)
}
// Select the right stream to dispatch.
- s, ok := t.getStream(f)
- if !ok {
+ s := t.getStream(f)
+ if s == nil {
return
}
if size > 0 {
@@ -964,8 +969,8 @@
}
func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
- s, ok := t.getStream(f)
- if !ok {
+ s := t.getStream(f)
+ if s == nil {
return
}
if f.ErrCode == http2.ErrCodeRefusedStream {
@@ -978,9 +983,9 @@
statusCode = codes.Unknown
}
if statusCode == codes.Canceled {
- // Our deadline was already exceeded, and that was likely the cause of
- // this cancelation. Alter the status code accordingly.
- if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
+ if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
+ // Our deadline was already exceeded, and that was likely the cause
+ // of this cancelation. Alter the status code accordingly.
statusCode = codes.DeadlineExceeded
}
}
@@ -1085,11 +1090,12 @@
default:
t.setGoAwayReason(f)
close(t.goAway)
- t.state = draining
t.controlBuf.put(&incomingGoAway{})
-
- // This has to be a new goroutine because we're still using the current goroutine to read in the transport.
+ // Notify the clientconn about the GOAWAY before we set the state to
+ // draining, to allow the client to stop attempting to create streams
+ // before disallowing new streams on this connection.
t.onGoAway(t.goAwayReason)
+ t.state = draining
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
@@ -1141,8 +1147,8 @@
// operateHeaders takes action on the decoded headers.
func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
- s, ok := t.getStream(frame)
- if !ok {
+ s := t.getStream(frame)
+ if s == nil {
return
}
endStream := frame.StreamEnded()
@@ -1239,6 +1245,7 @@
// loop to keep reading incoming messages on this transport.
for {
+ t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
@@ -1296,29 +1303,32 @@
timer.Reset(t.kp.Time)
continue
}
- // Check if keepalive should go dormant.
t.mu.Lock()
- if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
- // Make awakenKeepalive writable.
- <-t.awakenKeepalive
+ if t.state == closing {
+ // If the transport is closing, we should exit from the
+ // keepalive goroutine here. If not, we could have a race
+ // between the call to Signal() from Close() and the call to
+ // Wait() here, whereby the keepalive goroutine ends up
+ // blocking on the condition variable which will never be
+ // signalled again.
t.mu.Unlock()
- select {
- case <-t.awakenKeepalive:
- // If the control gets here a ping has been sent
- // need to reset the timer with keepalive.Timeout.
- case <-t.ctx.Done():
- return
- }
- } else {
- t.mu.Unlock()
- if channelz.IsOn() {
- atomic.AddInt64(&t.czData.kpCount, 1)
- }
- // Send ping.
- t.controlBuf.put(p)
+ return
}
+ if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
+ t.kpDormant = true
+ t.kpDormancyCond.Wait()
+ }
+ t.kpDormant = false
+ t.mu.Unlock()
- // By the time control gets here a ping has been sent one way or the other.
+ if channelz.IsOn() {
+ atomic.AddInt64(&t.czData.kpCount, 1)
+ }
+ // We get here either because we were dormant and a new stream was
+ // created which unblocked the Wait() call, or because the
+ // keepalive timer expired. In both cases, we need to send a ping.
+ t.controlBuf.put(p)
+
timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
@@ -1326,6 +1336,7 @@
timer.Reset(t.kp.Time)
continue
}
+ infof("transport: closing client transport due to idleness.")
t.Close()
return
case <-t.ctx.Done():
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 150b73e..33686a1 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -65,8 +65,7 @@
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
- ctxDone <-chan struct{} // Cache the context.Done() chan
- cancel context.CancelFunc
+ done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
@@ -138,7 +137,10 @@
}
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
// Send initial settings as connection preface to client.
- var isettings []http2.Setting
+ isettings := []http2.Setting{{
+ ID: http2.SettingMaxFrameSize,
+ Val: http2MaxFrameLen,
+ }}
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config.MaxStreams
@@ -203,11 +205,10 @@
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
- ctx, cancel := context.WithCancel(context.Background())
+ done := make(chan struct{})
t := &http2Server{
- ctx: ctx,
- cancel: cancel,
- ctxDone: ctx.Done(),
+ ctx: context.Background(),
+ done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@@ -228,7 +229,7 @@
czData: new(channelzData),
bufferPool: newBufferPool(),
}
- t.controlBuf = newControlBuffer(t.ctxDone)
+ t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
@@ -359,12 +360,14 @@
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
+ s.cancel()
return false
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
+ s.cancel()
return false
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
@@ -375,12 +378,14 @@
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
+ s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
+ s.cancel()
return true
}
t.maxStreamID = streamID
@@ -436,6 +441,7 @@
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
+ t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1)
if err != nil {
@@ -766,6 +772,10 @@
return nil
}
+func (t *http2Server) setResetPingStrikes() {
+ atomic.StoreUint32(&t.resetPingStrikes, 1)
+}
+
func (t *http2Server) writeHeaderLocked(s *Stream) error {
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
@@ -780,9 +790,7 @@
streamID: s.id,
hf: headerFields,
endStream: false,
- onWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ onWrite: t.setResetPingStrikes,
})
if !success {
if err != nil {
@@ -842,9 +850,7 @@
streamID: s.id,
hf: headerFields,
endStream: true,
- onWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ onWrite: t.setResetPingStrikes,
}
s.hdrMu.Unlock()
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
@@ -881,7 +887,7 @@
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel()
select {
- case <-t.ctx.Done():
+ case <-t.done:
return ErrConnClosing
default:
}
@@ -896,16 +902,14 @@
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
df := &dataFrame{
- streamID: s.id,
- h: hdr,
- d: data,
- onEachWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ streamID: s.id,
+ h: hdr,
+ d: data,
+ onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
- case <-t.ctx.Done():
+ case <-t.done:
return ErrConnClosing
default:
}
@@ -967,10 +971,11 @@
select {
case <-maxAge.C:
// Close the connection after grace period.
+ infof("transport: closing server transport due to maximum connection age.")
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
- case <-t.ctx.Done():
+ case <-t.done:
}
return
case <-keepalive.C:
@@ -980,6 +985,7 @@
continue
}
if pingSent {
+ infof("transport: closing server transport due to idleness.")
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
@@ -991,7 +997,7 @@
}
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
- case <-t.ctx.Done():
+ case <-t.done:
return
}
}
@@ -1011,7 +1017,7 @@
t.activeStreams = nil
t.mu.Unlock()
t.controlBuf.finish()
- t.cancel()
+ close(t.done)
err := t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
@@ -1151,7 +1157,7 @@
select {
case <-t.drainChan:
case <-timer.C:
- case <-t.ctx.Done():
+ case <-t.done:
return
}
t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
@@ -1201,7 +1207,7 @@
select {
case sz := <-resp:
return int64(sz)
- case <-t.ctxDone:
+ case <-t.done:
return -1
case <-timer.C:
return -2
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 9d21286..8f5f334 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -667,6 +667,7 @@
writer: w,
fr: http2.NewFramer(w, r),
}
+ f.fr.SetMaxReadFrameSize(http2MaxFrameLen)
// Opt-in to Frame reuse API on framer to reduce garbage.
// Frames aren't safe to read from after a subsequent call to ReadFrame.
f.fr.SetReuseFrames()
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 0f33c9c..1c1d106 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -184,6 +184,19 @@
// r.readAdditional acts on that message and returns the necessary error.
select {
case <-r.ctxDone:
+ // Note that this adds the ctx error to the end of recv buffer, and
+ // reads from the head. This will delay the error until recv buffer is
+ // empty, thus will delay ctx cancellation in Recv().
+ //
+ // It's done this way to fix a race between ctx cancel and trailer. The
+ // race was, stream.Recv() may return ctx error if ctxDone wins the
+ // race, but stream.Trailer() may return a non-nil md because the stream
+ // was not marked as done when trailer is received. This closeStream
+ // call will mark stream as done, thus fix the race.
+ //
+ // TODO: delaying ctx error seems like a unnecessary side effect. What
+ // we really want is to mark the stream as done, and return ctx error
+ // faster.
r.closeStream(ContextErr(r.ctx.Err()))
m := <-r.recv.get()
return r.readAdditional(m, p)
@@ -298,6 +311,14 @@
}
select {
case <-s.ctx.Done():
+ // We prefer success over failure when reading messages because we delay
+ // context error in stream.Read(). To keep behavior consistent, we also
+ // prefer success here.
+ select {
+ case <-s.headerChan:
+ return nil
+ default:
+ }
return ContextErr(s.ctx.Err())
case <-s.headerChan:
return nil