Updating to latest protos and device-management interface, releasing 2.0

Change-Id: I2d2ebf5b305d6d06b8d01c49d4d67e7ff050f5d4
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 934ef68..625d47b 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -35,6 +35,9 @@
 	"google.golang.org/grpc/internal/binarylog"
 	"google.golang.org/grpc/internal/channelz"
 	"google.golang.org/grpc/internal/grpcrand"
+	"google.golang.org/grpc/internal/grpcutil"
+	iresolver "google.golang.org/grpc/internal/resolver"
+	"google.golang.org/grpc/internal/serviceconfig"
 	"google.golang.org/grpc/internal/transport"
 	"google.golang.org/grpc/metadata"
 	"google.golang.org/grpc/peer"
@@ -49,14 +52,20 @@
 // of the RPC.
 type StreamHandler func(srv interface{}, stream ServerStream) error
 
-// StreamDesc represents a streaming RPC service's method specification.
+// StreamDesc represents a streaming RPC service's method specification.  Used
+// on the server when registering services and on the client when initiating
+// new streams.
 type StreamDesc struct {
-	StreamName string
-	Handler    StreamHandler
+	// StreamName and Handler are only used when registering handlers on a
+	// server.
+	StreamName string        // the name of the method excluding the service
+	Handler    StreamHandler // the handler called for the method
 
-	// At least one of these is true.
-	ServerStreams bool
-	ClientStreams bool
+	// ServerStreams and ClientStreams are used for registering handlers on a
+	// server as well as defining RPC behavior when passed to NewClientStream
+	// and ClientConn.NewStream.  At least one must be true.
+	ServerStreams bool // indicates the server can perform streaming sends
+	ClientStreams bool // indicates the client can perform streaming sends
 }
 
 // Stream defines the common interface a client or server stream has to satisfy.
@@ -163,13 +172,48 @@
 			}
 		}()
 	}
-	c := defaultCallInfo()
 	// Provide an opportunity for the first RPC to see the first service config
 	// provided by the resolver.
 	if err := cc.waitForResolvedAddrs(ctx); err != nil {
 		return nil, err
 	}
-	mc := cc.GetMethodConfig(method)
+
+	var mc serviceconfig.MethodConfig
+	var onCommit func()
+	var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
+		return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
+	}
+
+	rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
+	rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
+	if err != nil {
+		return nil, toRPCErr(err)
+	}
+
+	if rpcConfig != nil {
+		if rpcConfig.Context != nil {
+			ctx = rpcConfig.Context
+		}
+		mc = rpcConfig.MethodConfig
+		onCommit = rpcConfig.OnCommitted
+		if rpcConfig.Interceptor != nil {
+			rpcInfo.Context = nil
+			ns := newStream
+			newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
+				cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
+				if err != nil {
+					return nil, toRPCErr(err)
+				}
+				return cs, nil
+			}
+		}
+	}
+
+	return newStream(ctx, func() {})
+}
+
+func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
+	c := defaultCallInfo()
 	if mc.WaitForReady != nil {
 		c.failFast = !*mc.WaitForReady
 	}
@@ -206,6 +250,7 @@
 		Host:           cc.authority,
 		Method:         method,
 		ContentSubtype: c.contentSubtype,
+		DoneFunc:       doneFunc,
 	}
 
 	// Set our outgoing compression according to the UseCompressor CallOption, if
@@ -229,33 +274,6 @@
 	if c.creds != nil {
 		callHdr.Creds = c.creds
 	}
-	var trInfo *traceInfo
-	if EnableTracing {
-		trInfo = &traceInfo{
-			tr: trace.New("grpc.Sent."+methodFamily(method), method),
-			firstLine: firstLine{
-				client: true,
-			},
-		}
-		if deadline, ok := ctx.Deadline(); ok {
-			trInfo.firstLine.deadline = time.Until(deadline)
-		}
-		trInfo.tr.LazyLog(&trInfo.firstLine, false)
-		ctx = trace.NewContext(ctx, trInfo.tr)
-	}
-	ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
-	sh := cc.dopts.copts.StatsHandler
-	var beginTime time.Time
-	if sh != nil {
-		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
-		beginTime = time.Now()
-		begin := &stats.Begin{
-			Client:    true,
-			BeginTime: beginTime,
-			FailFast:  c.failFast,
-		}
-		sh.HandleRPC(ctx, begin)
-	}
 
 	cs := &clientStream{
 		callHdr:      callHdr,
@@ -269,18 +287,15 @@
 		cp:           cp,
 		comp:         comp,
 		cancel:       cancel,
-		beginTime:    beginTime,
 		firstAttempt: true,
+		onCommit:     onCommit,
 	}
 	if !cc.dopts.disableRetry {
 		cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
 	}
 	cs.binlog = binarylog.GetMethodLogger(method)
 
-	cs.callInfo.stream = cs
-	// Only this initial attempt has stats/tracing.
-	// TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
-	if err := cs.newAttemptLocked(sh, trInfo); err != nil {
+	if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
 		cs.finish(err)
 		return nil, err
 	}
@@ -328,8 +343,43 @@
 
 // newAttemptLocked creates a new attempt with a transport.
 // If it succeeds, then it replaces clientStream's attempt with this new attempt.
-func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
+func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
+	ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
+	method := cs.callHdr.Method
+	sh := cs.cc.dopts.copts.StatsHandler
+	var beginTime time.Time
+	if sh != nil {
+		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
+		beginTime = time.Now()
+		begin := &stats.Begin{
+			Client:                    true,
+			BeginTime:                 beginTime,
+			FailFast:                  cs.callInfo.failFast,
+			IsClientStream:            cs.desc.ClientStreams,
+			IsServerStream:            cs.desc.ServerStreams,
+			IsTransparentRetryAttempt: isTransparent,
+		}
+		sh.HandleRPC(ctx, begin)
+	}
+
+	var trInfo *traceInfo
+	if EnableTracing {
+		trInfo = &traceInfo{
+			tr: trace.New("grpc.Sent."+methodFamily(method), method),
+			firstLine: firstLine{
+				client: true,
+			},
+		}
+		if deadline, ok := ctx.Deadline(); ok {
+			trInfo.firstLine.deadline = time.Until(deadline)
+		}
+		trInfo.tr.LazyLog(&trInfo.firstLine, false)
+		ctx = trace.NewContext(ctx, trInfo.tr)
+	}
+
 	newAttempt := &csAttempt{
+		ctx:          ctx,
+		beginTime:    beginTime,
 		cs:           cs,
 		dc:           cs.cc.dopts.dc,
 		statsHandler: sh,
@@ -344,10 +394,18 @@
 		}
 	}()
 
-	if err := cs.ctx.Err(); err != nil {
+	if err := ctx.Err(); err != nil {
 		return toRPCErr(err)
 	}
-	t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
+
+	if cs.cc.parsedTarget.Scheme == "xds" {
+		// Add extra metadata (metadata that will be added by transport) to context
+		// so the balancer can see them.
+		ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
+			"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
+		))
+	}
+	t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
 	if err != nil {
 		return err
 	}
@@ -363,9 +421,11 @@
 func (a *csAttempt) newStream() error {
 	cs := a.cs
 	cs.callHdr.PreviousAttempts = cs.numRetries
-	s, err := a.t.NewStream(cs.ctx, cs.callHdr)
+	s, err := a.t.NewStream(a.ctx, cs.callHdr)
 	if err != nil {
-		return toRPCErr(err)
+		// Return without converting to an RPC error so retry code can
+		// inspect.
+		return err
 	}
 	cs.attempt.s = s
 	cs.attempt.p = &parser{r: s}
@@ -386,8 +446,7 @@
 
 	cancel context.CancelFunc // cancels all attempts
 
-	sentLast  bool // sent an end stream
-	beginTime time.Time
+	sentLast bool // sent an end stream
 
 	methodConfig *MethodConfig
 
@@ -418,7 +477,8 @@
 	// place where we need to check if the attempt is nil.
 	attempt *csAttempt
 	// TODO(hedging): hedging will have multiple attempts simultaneously.
-	committed  bool                       // active attempt committed for retry?
+	committed  bool // active attempt committed for retry?
+	onCommit   func()
 	buffer     []func(a *csAttempt) error // operations to replay on retry
 	bufferSize int                        // current size of buffer
 }
@@ -426,6 +486,7 @@
 // csAttempt implements a single transport stream attempt within a
 // clientStream.
 type csAttempt struct {
+	ctx  context.Context
 	cs   *clientStream
 	t    transport.ClientTransport
 	s    *transport.Stream
@@ -444,9 +505,13 @@
 	trInfo *traceInfo
 
 	statsHandler stats.Handler
+	beginTime    time.Time
 }
 
 func (cs *clientStream) commitAttemptLocked() {
+	if !cs.committed && cs.onCommit != nil {
+		cs.onCommit()
+	}
 	cs.committed = true
 	cs.buffer = nil
 }
@@ -458,37 +523,57 @@
 }
 
 // shouldRetry returns nil if the RPC should be retried; otherwise it returns
-// the error that should be returned by the operation.
-func (cs *clientStream) shouldRetry(err error) error {
-	if cs.attempt.s == nil && !cs.callInfo.failFast {
-		// In the event of any error from NewStream (attempt.s == nil), we
-		// never attempted to write anything to the wire, so we can retry
-		// indefinitely for non-fail-fast RPCs.
-		return nil
+// the error that should be returned by the operation.  If the RPC should be
+// retried, the bool indicates whether it is being retried transparently.
+func (cs *clientStream) shouldRetry(err error) (bool, error) {
+	if cs.attempt.s == nil {
+		// Error from NewClientStream.
+		nse, ok := err.(*transport.NewStreamError)
+		if !ok {
+			// Unexpected, but assume no I/O was performed and the RPC is not
+			// fatal, so retry indefinitely.
+			return true, nil
+		}
+
+		// Unwrap and convert error.
+		err = toRPCErr(nse.Err)
+
+		// Never retry DoNotRetry errors, which indicate the RPC should not be
+		// retried due to max header list size violation, etc.
+		if nse.DoNotRetry {
+			return false, err
+		}
+
+		// In the event of a non-IO operation error from NewStream, we never
+		// attempted to write anything to the wire, so we can retry
+		// indefinitely.
+		if !nse.DoNotTransparentRetry {
+			return true, nil
+		}
 	}
 	if cs.finished || cs.committed {
 		// RPC is finished or committed; cannot retry.
-		return err
+		return false, err
 	}
 	// Wait for the trailers.
+	unprocessed := false
 	if cs.attempt.s != nil {
 		<-cs.attempt.s.Done()
+		unprocessed = cs.attempt.s.Unprocessed()
 	}
-	if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
+	if cs.firstAttempt && unprocessed {
 		// First attempt, stream unprocessed: transparently retry.
-		cs.firstAttempt = false
-		return nil
+		return true, nil
 	}
-	cs.firstAttempt = false
 	if cs.cc.dopts.disableRetry {
-		return err
+		return false, err
 	}
 
 	pushback := 0
 	hasPushback := false
 	if cs.attempt.s != nil {
 		if !cs.attempt.s.TrailersOnly() {
-			return err
+			return false, err
 		}
 
 		// TODO(retry): Move down if the spec changes to not check server pushback
@@ -497,15 +582,15 @@
 		if len(sps) == 1 {
 			var e error
 			if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
-				channelz.Infof(cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
+				channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
 				cs.retryThrottler.throttle() // This counts as a failure for throttling.
-				return err
+				return false, err
 			}
 			hasPushback = true
 		} else if len(sps) > 1 {
-			channelz.Warningf(cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
+			channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
 			cs.retryThrottler.throttle() // This counts as a failure for throttling.
-			return err
+			return false, err
 		}
 	}
 
@@ -516,18 +601,18 @@
 		code = status.Convert(err).Code()
 	}
 
-	rp := cs.methodConfig.retryPolicy
-	if rp == nil || !rp.retryableStatusCodes[code] {
-		return err
+	rp := cs.methodConfig.RetryPolicy
+	if rp == nil || !rp.RetryableStatusCodes[code] {
+		return false, err
 	}
 
 	// Note: the ordering here is important; we count this as a failure
 	// only if the code matched a retryable code.
 	if cs.retryThrottler.throttle() {
-		return err
+		return false, err
 	}
-	if cs.numRetries+1 >= rp.maxAttempts {
-		return err
+	if cs.numRetries+1 >= rp.MaxAttempts {
+		return false, err
 	}
 
 	var dur time.Duration
@@ -535,9 +620,9 @@
 		dur = time.Millisecond * time.Duration(pushback)
 		cs.numRetriesSincePushback = 0
 	} else {
-		fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
-		cur := float64(rp.initialBackoff) * fact
-		if max := float64(rp.maxBackoff); cur > max {
+		fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
+		cur := float64(rp.InitialBackoff) * fact
+		if max := float64(rp.MaxBackoff); cur > max {
 			cur = max
 		}
 		dur = time.Duration(grpcrand.Int63n(int64(cur)))
@@ -550,22 +635,24 @@
 	select {
 	case <-t.C:
 		cs.numRetries++
-		return nil
+		return false, nil
 	case <-cs.ctx.Done():
 		t.Stop()
-		return status.FromContextError(cs.ctx.Err()).Err()
+		return false, status.FromContextError(cs.ctx.Err()).Err()
 	}
 }
 
 // Returns nil if a retry was performed and succeeded; error otherwise.
 func (cs *clientStream) retryLocked(lastErr error) error {
 	for {
-		cs.attempt.finish(lastErr)
-		if err := cs.shouldRetry(lastErr); err != nil {
+		cs.attempt.finish(toRPCErr(lastErr))
+		isTransparent, err := cs.shouldRetry(lastErr)
+		if err != nil {
 			cs.commitAttemptLocked()
 			return err
 		}
-		if err := cs.newAttemptLocked(nil, nil); err != nil {
+		cs.firstAttempt = false
+		if err := cs.newAttemptLocked(isTransparent); err != nil {
 			return err
 		}
 		if lastErr = cs.replayBufferLocked(); lastErr == nil {
@@ -586,7 +673,11 @@
 	for {
 		if cs.committed {
 			cs.mu.Unlock()
-			return op(cs.attempt)
+			// toRPCErr is used in case the error from the attempt comes from
+			// NewClientStream, which intentionally doesn't return a status
+			// error to allow for further inspection; all other errors should
+			// already be status errors.
+			return toRPCErr(op(cs.attempt))
 		}
 		a := cs.attempt
 		cs.mu.Unlock()
@@ -799,6 +890,15 @@
 	}
 	cs.finished = true
 	cs.commitAttemptLocked()
+	if cs.attempt != nil {
+		cs.attempt.finish(err)
+		// after functions all rely upon having a stream.
+		if cs.attempt.s != nil {
+			for _, o := range cs.opts {
+				o.after(cs.callInfo, cs.attempt)
+			}
+		}
+	}
 	cs.mu.Unlock()
 	// For binary logging. only log cancel in finish (could be caused by RPC ctx
 	// canceled or ClientConn closed). Trailer will be logged in RecvMsg.
@@ -820,15 +920,6 @@
 			cs.cc.incrCallsSucceeded()
 		}
 	}
-	if cs.attempt != nil {
-		cs.attempt.finish(err)
-		// after functions all rely upon having a stream.
-		if cs.attempt.s != nil {
-			for _, o := range cs.opts {
-				o.after(cs.callInfo)
-			}
-		}
-	}
 	cs.cancel()
 }
 
@@ -851,7 +942,7 @@
 		return io.EOF
 	}
 	if a.statsHandler != nil {
-		a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
+		a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
 	}
 	if channelz.IsOn() {
 		a.t.IncrMsgSent()
@@ -899,13 +990,13 @@
 		a.mu.Unlock()
 	}
 	if a.statsHandler != nil {
-		a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
+		a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
 			Client:   true,
 			RecvTime: time.Now(),
 			Payload:  m,
 			// TODO truncate large payload.
 			Data:       payInfo.uncompressedBytes,
-			WireLength: payInfo.wireLength,
+			WireLength: payInfo.wireLength + headerLen,
 			Length:     len(payInfo.uncompressedBytes),
 		})
 	}
@@ -961,12 +1052,12 @@
 	if a.statsHandler != nil {
 		end := &stats.End{
 			Client:    true,
-			BeginTime: a.cs.beginTime,
+			BeginTime: a.beginTime,
 			EndTime:   time.Now(),
 			Trailer:   tr,
 			Error:     err,
 		}
-		a.statsHandler.HandleRPC(a.cs.ctx, end)
+		a.statsHandler.HandleRPC(a.ctx, end)
 	}
 	if a.trInfo != nil && a.trInfo.tr != nil {
 		if err == nil {
@@ -1066,7 +1157,6 @@
 		t:        t,
 	}
 
-	as.callInfo.stream = as
 	s, err := as.t.NewStream(as.ctx, as.callHdr)
 	if err != nil {
 		err = toRPCErr(err)
@@ -1488,7 +1578,7 @@
 			Payload:  m,
 			// TODO truncate large payload.
 			Data:       payInfo.uncompressedBytes,
-			WireLength: payInfo.wireLength,
+			WireLength: payInfo.wireLength + headerLen,
 			Length:     len(payInfo.uncompressedBytes),
 		})
 	}