[VOL-2312] Logging - Integrate voltctl with new etcd-based dynamic loglevel mechanism. Testing is in progress

Change-Id: I2e13bb79008c9a49ebb6f58e575f51efebe6dbfd
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index e10e623..134a624 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -30,7 +30,6 @@
 	"golang.org/x/net/trace"
 	"google.golang.org/grpc/balancer"
 	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/connectivity"
 	"google.golang.org/grpc/encoding"
 	"google.golang.org/grpc/grpclog"
 	"google.golang.org/grpc/internal/balancerload"
@@ -328,13 +327,23 @@
 	return cs, nil
 }
 
-func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error {
-	cs.attempt = &csAttempt{
+// newAttemptLocked creates a new attempt with a transport.
+// If it succeeds, then it replaces clientStream's attempt with this new attempt.
+func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
+	newAttempt := &csAttempt{
 		cs:           cs,
 		dc:           cs.cc.dopts.dc,
 		statsHandler: sh,
 		trInfo:       trInfo,
 	}
+	defer func() {
+		if retErr != nil {
+			// This attempt is not set in the clientStream, so it's finish won't
+			// be called. Call it here for stats and trace in case they are not
+			// nil.
+			newAttempt.finish(retErr)
+		}
+	}()
 
 	if err := cs.ctx.Err(); err != nil {
 		return toRPCErr(err)
@@ -346,8 +355,9 @@
 	if trInfo != nil {
 		trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
 	}
-	cs.attempt.t = t
-	cs.attempt.done = done
+	newAttempt.t = t
+	newAttempt.done = done
+	cs.attempt = newAttempt
 	return nil
 }
 
@@ -396,11 +406,18 @@
 	serverHeaderBinlogged bool
 
 	mu                      sync.Mutex
-	firstAttempt            bool       // if true, transparent retry is valid
-	numRetries              int        // exclusive of transparent retry attempt(s)
-	numRetriesSincePushback int        // retries since pushback; to reset backoff
-	finished                bool       // TODO: replace with atomic cmpxchg or sync.Once?
-	attempt                 *csAttempt // the active client stream attempt
+	firstAttempt            bool // if true, transparent retry is valid
+	numRetries              int  // exclusive of transparent retry attempt(s)
+	numRetriesSincePushback int  // retries since pushback; to reset backoff
+	finished                bool // TODO: replace with atomic cmpxchg or sync.Once?
+	// attempt is the active client stream attempt.
+	// The only place where it is written is the newAttemptLocked method and this method never writes nil.
+	// So, attempt can be nil only inside newClientStream function when clientStream is first created.
+	// One of the first things done after clientStream's creation, is to call newAttemptLocked which either
+	// assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
+	// then newClientStream calls finish on the clientStream and returns. So, finish method is the only
+	// place where we need to check if the attempt is nil.
+	attempt *csAttempt
 	// TODO(hedging): hedging will have multiple attempts simultaneously.
 	committed  bool                       // active attempt committed for retry?
 	buffer     []func(a *csAttempt) error // operations to replay on retry
@@ -458,8 +475,8 @@
 	if cs.attempt.s != nil {
 		<-cs.attempt.s.Done()
 	}
-	if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
-		// First attempt, wait-for-ready, stream unprocessed: transparently retry.
+	if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
+		// First attempt, stream unprocessed: transparently retry.
 		cs.firstAttempt = false
 		return nil
 	}
@@ -806,11 +823,11 @@
 	}
 	if cs.attempt != nil {
 		cs.attempt.finish(err)
-	}
-	// after functions all rely upon having a stream.
-	if cs.attempt.s != nil {
-		for _, o := range cs.opts {
-			o.after(cs.callInfo)
+		// after functions all rely upon having a stream.
+		if cs.attempt.s != nil {
+			for _, o := range cs.opts {
+				o.after(cs.callInfo)
+			}
 		}
 	}
 	cs.cancel()
@@ -965,19 +982,18 @@
 	a.mu.Unlock()
 }
 
-func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
-	ac.mu.Lock()
-	if ac.transport != t {
-		ac.mu.Unlock()
-		return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
-	}
-	// transition to CONNECTING state when an attempt starts
-	if ac.state != connectivity.Connecting {
-		ac.updateConnectivityState(connectivity.Connecting)
-		ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
-	}
-	ac.mu.Unlock()
-
+// newClientStream creates a ClientStream with the specified transport, on the
+// given addrConn.
+//
+// It's expected that the given transport is either the same one in addrConn, or
+// is already closed. To avoid race, transport is specified separately, instead
+// of using ac.transpot.
+//
+// Main difference between this and ClientConn.NewStream:
+// - no retry
+// - no service config (or wait for service config)
+// - no tracing or stats
+func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
 	if t == nil {
 		// TODO: return RPC error here?
 		return nil, errors.New("transport provided is nil")
@@ -985,14 +1001,6 @@
 	// defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
 	c := &callInfo{}
 
-	for _, o := range opts {
-		if err := o.before(c); err != nil {
-			return nil, toRPCErr(err)
-		}
-	}
-	c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
-	c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
-
 	// Possible context leak:
 	// The cancel function for the child context we create will only be called
 	// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
@@ -1005,6 +1013,13 @@
 		}
 	}()
 
+	for _, o := range opts {
+		if err := o.before(c); err != nil {
+			return nil, toRPCErr(err)
+		}
+	}
+	c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
+	c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
 	if err := setCallInfoCodec(c); err != nil {
 		return nil, err
 	}
@@ -1037,6 +1052,7 @@
 		callHdr.Creds = c.creds
 	}
 
+	// Use a special addrConnStream to avoid retry.
 	as := &addrConnStream{
 		callHdr:  callHdr,
 		ac:       ac,