Updating to latest protos and device-management interface, releasing 2.0
Change-Id: I2d2ebf5b305d6d06b8d01c49d4d67e7ff050f5d4
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 934ef68..625d47b 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -35,6 +35,9 @@
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
+ "google.golang.org/grpc/internal/grpcutil"
+ iresolver "google.golang.org/grpc/internal/resolver"
+ "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -49,14 +52,20 @@
// of the RPC.
type StreamHandler func(srv interface{}, stream ServerStream) error
-// StreamDesc represents a streaming RPC service's method specification.
+// StreamDesc represents a streaming RPC service's method specification. Used
+// on the server when registering services and on the client when initiating
+// new streams.
type StreamDesc struct {
- StreamName string
- Handler StreamHandler
+ // StreamName and Handler are only used when registering handlers on a
+ // server.
+ StreamName string // the name of the method excluding the service
+ Handler StreamHandler // the handler called for the method
- // At least one of these is true.
- ServerStreams bool
- ClientStreams bool
+ // ServerStreams and ClientStreams are used for registering handlers on a
+ // server as well as defining RPC behavior when passed to NewClientStream
+ // and ClientConn.NewStream. At least one must be true.
+ ServerStreams bool // indicates the server can perform streaming sends
+ ClientStreams bool // indicates the client can perform streaming sends
}
// Stream defines the common interface a client or server stream has to satisfy.
@@ -163,13 +172,48 @@
}
}()
}
- c := defaultCallInfo()
// Provide an opportunity for the first RPC to see the first service config
// provided by the resolver.
if err := cc.waitForResolvedAddrs(ctx); err != nil {
return nil, err
}
- mc := cc.GetMethodConfig(method)
+
+ var mc serviceconfig.MethodConfig
+ var onCommit func()
+ var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
+ return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
+ }
+
+ rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
+ rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
+ if err != nil {
+ return nil, toRPCErr(err)
+ }
+
+ if rpcConfig != nil {
+ if rpcConfig.Context != nil {
+ ctx = rpcConfig.Context
+ }
+ mc = rpcConfig.MethodConfig
+ onCommit = rpcConfig.OnCommitted
+ if rpcConfig.Interceptor != nil {
+ rpcInfo.Context = nil
+ ns := newStream
+ newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
+ cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
+ if err != nil {
+ return nil, toRPCErr(err)
+ }
+ return cs, nil
+ }
+ }
+ }
+
+ return newStream(ctx, func() {})
+}
+
+func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
+ c := defaultCallInfo()
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
}
@@ -206,6 +250,7 @@
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
+ DoneFunc: doneFunc,
}
// Set our outgoing compression according to the UseCompressor CallOption, if
@@ -229,33 +274,6 @@
if c.creds != nil {
callHdr.Creds = c.creds
}
- var trInfo *traceInfo
- if EnableTracing {
- trInfo = &traceInfo{
- tr: trace.New("grpc.Sent."+methodFamily(method), method),
- firstLine: firstLine{
- client: true,
- },
- }
- if deadline, ok := ctx.Deadline(); ok {
- trInfo.firstLine.deadline = time.Until(deadline)
- }
- trInfo.tr.LazyLog(&trInfo.firstLine, false)
- ctx = trace.NewContext(ctx, trInfo.tr)
- }
- ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
- sh := cc.dopts.copts.StatsHandler
- var beginTime time.Time
- if sh != nil {
- ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
- beginTime = time.Now()
- begin := &stats.Begin{
- Client: true,
- BeginTime: beginTime,
- FailFast: c.failFast,
- }
- sh.HandleRPC(ctx, begin)
- }
cs := &clientStream{
callHdr: callHdr,
@@ -269,18 +287,15 @@
cp: cp,
comp: comp,
cancel: cancel,
- beginTime: beginTime,
firstAttempt: true,
+ onCommit: onCommit,
}
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
}
cs.binlog = binarylog.GetMethodLogger(method)
- cs.callInfo.stream = cs
- // Only this initial attempt has stats/tracing.
- // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
- if err := cs.newAttemptLocked(sh, trInfo); err != nil {
+ if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.finish(err)
return nil, err
}
@@ -328,8 +343,43 @@
// newAttemptLocked creates a new attempt with a transport.
// If it succeeds, then it replaces clientStream's attempt with this new attempt.
-func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
+func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
+ ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
+ method := cs.callHdr.Method
+ sh := cs.cc.dopts.copts.StatsHandler
+ var beginTime time.Time
+ if sh != nil {
+ ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
+ beginTime = time.Now()
+ begin := &stats.Begin{
+ Client: true,
+ BeginTime: beginTime,
+ FailFast: cs.callInfo.failFast,
+ IsClientStream: cs.desc.ClientStreams,
+ IsServerStream: cs.desc.ServerStreams,
+ IsTransparentRetryAttempt: isTransparent,
+ }
+ sh.HandleRPC(ctx, begin)
+ }
+
+ var trInfo *traceInfo
+ if EnableTracing {
+ trInfo = &traceInfo{
+ tr: trace.New("grpc.Sent."+methodFamily(method), method),
+ firstLine: firstLine{
+ client: true,
+ },
+ }
+ if deadline, ok := ctx.Deadline(); ok {
+ trInfo.firstLine.deadline = time.Until(deadline)
+ }
+ trInfo.tr.LazyLog(&trInfo.firstLine, false)
+ ctx = trace.NewContext(ctx, trInfo.tr)
+ }
+
newAttempt := &csAttempt{
+ ctx: ctx,
+ beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
@@ -344,10 +394,18 @@
}
}()
- if err := cs.ctx.Err(); err != nil {
+ if err := ctx.Err(); err != nil {
return toRPCErr(err)
}
- t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
+
+ if cs.cc.parsedTarget.Scheme == "xds" {
+ // Add extra metadata (metadata that will be added by transport) to context
+ // so the balancer can see them.
+ ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
+ "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
+ ))
+ }
+ t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
if err != nil {
return err
}
@@ -363,9 +421,11 @@
func (a *csAttempt) newStream() error {
cs := a.cs
cs.callHdr.PreviousAttempts = cs.numRetries
- s, err := a.t.NewStream(cs.ctx, cs.callHdr)
+ s, err := a.t.NewStream(a.ctx, cs.callHdr)
if err != nil {
- return toRPCErr(err)
+ // Return without converting to an RPC error so retry code can
+ // inspect.
+ return err
}
cs.attempt.s = s
cs.attempt.p = &parser{r: s}
@@ -386,8 +446,7 @@
cancel context.CancelFunc // cancels all attempts
- sentLast bool // sent an end stream
- beginTime time.Time
+ sentLast bool // sent an end stream
methodConfig *MethodConfig
@@ -418,7 +477,8 @@
// place where we need to check if the attempt is nil.
attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
- committed bool // active attempt committed for retry?
+ committed bool // active attempt committed for retry?
+ onCommit func()
buffer []func(a *csAttempt) error // operations to replay on retry
bufferSize int // current size of buffer
}
@@ -426,6 +486,7 @@
// csAttempt implements a single transport stream attempt within a
// clientStream.
type csAttempt struct {
+ ctx context.Context
cs *clientStream
t transport.ClientTransport
s *transport.Stream
@@ -444,9 +505,13 @@
trInfo *traceInfo
statsHandler stats.Handler
+ beginTime time.Time
}
func (cs *clientStream) commitAttemptLocked() {
+ if !cs.committed && cs.onCommit != nil {
+ cs.onCommit()
+ }
cs.committed = true
cs.buffer = nil
}
@@ -458,37 +523,57 @@
}
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
-// the error that should be returned by the operation.
-func (cs *clientStream) shouldRetry(err error) error {
- if cs.attempt.s == nil && !cs.callInfo.failFast {
- // In the event of any error from NewStream (attempt.s == nil), we
- // never attempted to write anything to the wire, so we can retry
- // indefinitely for non-fail-fast RPCs.
- return nil
+// the error that should be returned by the operation. If the RPC should be
+// retried, the bool indicates whether it is being retried transparently.
+func (cs *clientStream) shouldRetry(err error) (bool, error) {
+ if cs.attempt.s == nil {
+ // Error from NewClientStream.
+ nse, ok := err.(*transport.NewStreamError)
+ if !ok {
+ // Unexpected, but assume no I/O was performed and the RPC is not
+ // fatal, so retry indefinitely.
+ return true, nil
+ }
+
+ // Unwrap and convert error.
+ err = toRPCErr(nse.Err)
+
+ // Never retry DoNotRetry errors, which indicate the RPC should not be
+ // retried due to max header list size violation, etc.
+ if nse.DoNotRetry {
+ return false, err
+ }
+
+ // In the event of a non-IO operation error from NewStream, we never
+ // attempted to write anything to the wire, so we can retry
+ // indefinitely.
+ if !nse.DoNotTransparentRetry {
+ return true, nil
+ }
}
if cs.finished || cs.committed {
// RPC is finished or committed; cannot retry.
- return err
+ return false, err
}
// Wait for the trailers.
+ unprocessed := false
if cs.attempt.s != nil {
<-cs.attempt.s.Done()
+ unprocessed = cs.attempt.s.Unprocessed()
}
- if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
+ if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
- cs.firstAttempt = false
- return nil
+ return true, nil
}
- cs.firstAttempt = false
if cs.cc.dopts.disableRetry {
- return err
+ return false, err
}
pushback := 0
hasPushback := false
if cs.attempt.s != nil {
if !cs.attempt.s.TrailersOnly() {
- return err
+ return false, err
}
// TODO(retry): Move down if the spec changes to not check server pushback
@@ -497,15 +582,15 @@
if len(sps) == 1 {
var e error
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
- channelz.Infof(cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
+ channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
cs.retryThrottler.throttle() // This counts as a failure for throttling.
- return err
+ return false, err
}
hasPushback = true
} else if len(sps) > 1 {
- channelz.Warningf(cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
+ channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
cs.retryThrottler.throttle() // This counts as a failure for throttling.
- return err
+ return false, err
}
}
@@ -516,18 +601,18 @@
code = status.Convert(err).Code()
}
- rp := cs.methodConfig.retryPolicy
- if rp == nil || !rp.retryableStatusCodes[code] {
- return err
+ rp := cs.methodConfig.RetryPolicy
+ if rp == nil || !rp.RetryableStatusCodes[code] {
+ return false, err
}
// Note: the ordering here is important; we count this as a failure
// only if the code matched a retryable code.
if cs.retryThrottler.throttle() {
- return err
+ return false, err
}
- if cs.numRetries+1 >= rp.maxAttempts {
- return err
+ if cs.numRetries+1 >= rp.MaxAttempts {
+ return false, err
}
var dur time.Duration
@@ -535,9 +620,9 @@
dur = time.Millisecond * time.Duration(pushback)
cs.numRetriesSincePushback = 0
} else {
- fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
- cur := float64(rp.initialBackoff) * fact
- if max := float64(rp.maxBackoff); cur > max {
+ fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
+ cur := float64(rp.InitialBackoff) * fact
+ if max := float64(rp.MaxBackoff); cur > max {
cur = max
}
dur = time.Duration(grpcrand.Int63n(int64(cur)))
@@ -550,22 +635,24 @@
select {
case <-t.C:
cs.numRetries++
- return nil
+ return false, nil
case <-cs.ctx.Done():
t.Stop()
- return status.FromContextError(cs.ctx.Err()).Err()
+ return false, status.FromContextError(cs.ctx.Err()).Err()
}
}
// Returns nil if a retry was performed and succeeded; error otherwise.
func (cs *clientStream) retryLocked(lastErr error) error {
for {
- cs.attempt.finish(lastErr)
- if err := cs.shouldRetry(lastErr); err != nil {
+ cs.attempt.finish(toRPCErr(lastErr))
+ isTransparent, err := cs.shouldRetry(lastErr)
+ if err != nil {
cs.commitAttemptLocked()
return err
}
- if err := cs.newAttemptLocked(nil, nil); err != nil {
+ cs.firstAttempt = false
+ if err := cs.newAttemptLocked(isTransparent); err != nil {
return err
}
if lastErr = cs.replayBufferLocked(); lastErr == nil {
@@ -586,7 +673,11 @@
for {
if cs.committed {
cs.mu.Unlock()
- return op(cs.attempt)
+ // toRPCErr is used in case the error from the attempt comes from
+ // NewClientStream, which intentionally doesn't return a status
+ // error to allow for further inspection; all other errors should
+ // already be status errors.
+ return toRPCErr(op(cs.attempt))
}
a := cs.attempt
cs.mu.Unlock()
@@ -799,6 +890,15 @@
}
cs.finished = true
cs.commitAttemptLocked()
+ if cs.attempt != nil {
+ cs.attempt.finish(err)
+ // after functions all rely upon having a stream.
+ if cs.attempt.s != nil {
+ for _, o := range cs.opts {
+ o.after(cs.callInfo, cs.attempt)
+ }
+ }
+ }
cs.mu.Unlock()
// For binary logging. only log cancel in finish (could be caused by RPC ctx
// canceled or ClientConn closed). Trailer will be logged in RecvMsg.
@@ -820,15 +920,6 @@
cs.cc.incrCallsSucceeded()
}
}
- if cs.attempt != nil {
- cs.attempt.finish(err)
- // after functions all rely upon having a stream.
- if cs.attempt.s != nil {
- for _, o := range cs.opts {
- o.after(cs.callInfo)
- }
- }
- }
cs.cancel()
}
@@ -851,7 +942,7 @@
return io.EOF
}
if a.statsHandler != nil {
- a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
+ a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
}
if channelz.IsOn() {
a.t.IncrMsgSent()
@@ -899,13 +990,13 @@
a.mu.Unlock()
}
if a.statsHandler != nil {
- a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
+ a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
Data: payInfo.uncompressedBytes,
- WireLength: payInfo.wireLength,
+ WireLength: payInfo.wireLength + headerLen,
Length: len(payInfo.uncompressedBytes),
})
}
@@ -961,12 +1052,12 @@
if a.statsHandler != nil {
end := &stats.End{
Client: true,
- BeginTime: a.cs.beginTime,
+ BeginTime: a.beginTime,
EndTime: time.Now(),
Trailer: tr,
Error: err,
}
- a.statsHandler.HandleRPC(a.cs.ctx, end)
+ a.statsHandler.HandleRPC(a.ctx, end)
}
if a.trInfo != nil && a.trInfo.tr != nil {
if err == nil {
@@ -1066,7 +1157,6 @@
t: t,
}
- as.callInfo.stream = as
s, err := as.t.NewStream(as.ctx, as.callHdr)
if err != nil {
err = toRPCErr(err)
@@ -1488,7 +1578,7 @@
Payload: m,
// TODO truncate large payload.
Data: payInfo.uncompressedBytes,
- WireLength: payInfo.wireLength,
+ WireLength: payInfo.wireLength + headerLen,
Length: len(payInfo.uncompressedBytes),
})
}