VOL-1497 : Add more control to kv/memory access

- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way

Amendments:

- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter

Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 84b6dbe..56d0bf7 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -36,7 +36,6 @@
 	"google.golang.org/grpc/connectivity"
 	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/grpclog"
-	"google.golang.org/grpc/internal"
 	"google.golang.org/grpc/internal/backoff"
 	"google.golang.org/grpc/internal/channelz"
 	"google.golang.org/grpc/internal/envconfig"
@@ -592,13 +591,12 @@
 // Caller needs to make sure len(addrs) > 0.
 func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
 	ac := &addrConn{
-		cc:                  cc,
-		addrs:               addrs,
-		scopts:              opts,
-		dopts:               cc.dopts,
-		czData:              new(channelzData),
-		successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
-		resetBackoff:        make(chan struct{}),
+		cc:           cc,
+		addrs:        addrs,
+		scopts:       opts,
+		dopts:        cc.dopts,
+		czData:       new(channelzData),
+		resetBackoff: make(chan struct{}),
 	}
 	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
 	// Track ac in cc. This needs to be done before any getTransport(...) is called.
@@ -680,11 +678,10 @@
 		return nil
 	}
 	ac.updateConnectivityState(connectivity.Connecting)
-	ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
 	ac.mu.Unlock()
 
 	// Start a goroutine connecting to the server asynchronously.
-	go ac.resetTransport(false)
+	go ac.resetTransport()
 	return nil
 }
 
@@ -703,6 +700,12 @@
 		return true
 	}
 
+	// Unless we're busy reconnecting already, let's reconnect from the top of
+	// the list.
+	if ac.state != connectivity.Ready {
+		return false
+	}
+
 	var curAddrFound bool
 	for _, a := range addrs {
 		if reflect.DeepEqual(ac.curAddr, a) {
@@ -713,7 +716,6 @@
 	grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
 	if curAddrFound {
 		ac.addrs = addrs
-		ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
 	}
 
 	return curAddrFound
@@ -913,7 +915,6 @@
 	transport transport.ClientTransport // The current transport.
 
 	mu      sync.Mutex
-	addrIdx int                // The index in addrs list to start reconnecting from.
 	curAddr resolver.Address   // The current address.
 	addrs   []resolver.Address // All addresses that the resolver resolved to.
 
@@ -922,33 +923,30 @@
 
 	tearDownErr error // The reason this addrConn is torn down.
 
-	backoffIdx int
-	// backoffDeadline is the time until which resetTransport needs to
-	// wait before increasing backoffIdx count.
-	backoffDeadline time.Time
-	// connectDeadline is the time by which all connection
-	// negotiations must complete.
-	connectDeadline time.Time
-
+	backoffIdx   int // Needs to be stateful for resetConnectBackoff.
 	resetBackoff chan struct{}
 
-	channelzID int64 // channelz unique identification number
-	czData     *channelzData
-
-	successfulHandshake bool
-
+	channelzID         int64 // channelz unique identification number.
+	czData             *channelzData
 	healthCheckEnabled bool
 }
 
 // Note: this requires a lock on ac.mu.
 func (ac *addrConn) updateConnectivityState(s connectivity.State) {
+	if ac.state == s {
+		return
+	}
+
+	updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
+	grpclog.Infof(updateMsg)
 	ac.state = s
 	if channelz.IsOn() {
 		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
-			Desc:     fmt.Sprintf("Subchannel Connectivity change to %v", s),
+			Desc:     updateMsg,
 			Severity: channelz.CtINFO,
 		})
 	}
+	ac.cc.handleSubConnStateChange(ac.acbw, s)
 }
 
 // adjustParams updates parameters used to create transports upon
@@ -965,173 +963,219 @@
 	}
 }
 
-// resetTransport makes sure that a healthy ac.transport exists.
-//
-// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
-// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
-// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
-// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
-// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
-// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
-//
-// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
-func (ac *addrConn) resetTransport(resolveNow bool) {
-	for {
-		// If this is the first in a line of resets, we want to resolve immediately. The only other time we
-		// want to reset is if we have tried all the addresses handed to us.
-		if resolveNow {
-			ac.mu.Lock()
+func (ac *addrConn) resetTransport() {
+	for i := 0; ; i++ {
+		tryNextAddrFromStart := grpcsync.NewEvent()
+
+		ac.mu.Lock()
+		if i > 0 {
 			ac.cc.resolveNow(resolver.ResolveNowOption{})
-			ac.mu.Unlock()
 		}
-
-		ac.mu.Lock()
-		if ac.state == connectivity.Shutdown {
-			ac.mu.Unlock()
-			return
-		}
-
-		// The transport that was used before is no longer viable.
-		ac.transport = nil
-		// If the connection is READY, a failure must have occurred.
-		// Otherwise, we'll consider this is a transient failure when:
-		//   We've exhausted all addresses
-		//   We're in CONNECTING
-		//   And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake
-		if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) {
-			ac.updateConnectivityState(connectivity.TransientFailure)
-			ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
-		}
-		ac.transport = nil
+		addrs := ac.addrs
+		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
 		ac.mu.Unlock()
 
-		if err := ac.nextAddr(); err != nil {
-			return
-		}
+	addrLoop:
+		for _, addr := range addrs {
+			ac.mu.Lock()
 
-		ac.mu.Lock()
-		if ac.state == connectivity.Shutdown {
-			ac.mu.Unlock()
-			return
-		}
-
-		backoffIdx := ac.backoffIdx
-		backoffFor := ac.dopts.bs.Backoff(backoffIdx)
-
-		// This will be the duration that dial gets to finish.
-		dialDuration := getMinConnectTimeout()
-		if backoffFor > dialDuration {
-			// Give dial more time as we keep failing to connect.
-			dialDuration = backoffFor
-		}
-		start := time.Now()
-		connectDeadline := start.Add(dialDuration)
-		ac.backoffDeadline = start.Add(backoffFor)
-		ac.connectDeadline = connectDeadline
-
-		ac.mu.Unlock()
-
-		ac.cc.mu.RLock()
-		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
-		ac.cc.mu.RUnlock()
-
-		ac.mu.Lock()
-
-		if ac.state == connectivity.Shutdown {
-			ac.mu.Unlock()
-			return
-		}
-
-		if ac.state != connectivity.Connecting {
+			if ac.state == connectivity.Shutdown {
+				ac.mu.Unlock()
+				return
+			}
 			ac.updateConnectivityState(connectivity.Connecting)
-			ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+			ac.transport = nil
+			ac.mu.Unlock()
+
+			// This will be the duration that dial gets to finish.
+			dialDuration := getMinConnectTimeout()
+			if dialDuration < backoffFor {
+				// Give dial more time as we keep failing to connect.
+				dialDuration = backoffFor
+			}
+			connectDeadline := time.Now().Add(dialDuration)
+
+			ac.mu.Lock()
+			ac.cc.mu.RLock()
+			ac.dopts.copts.KeepaliveParams = ac.cc.mkp
+			ac.cc.mu.RUnlock()
+
+			if ac.state == connectivity.Shutdown {
+				ac.mu.Unlock()
+				return
+			}
+
+			copts := ac.dopts.copts
+			if ac.scopts.CredsBundle != nil {
+				copts.CredsBundle = ac.scopts.CredsBundle
+			}
+			hctx, hcancel := context.WithCancel(ac.ctx)
+			defer hcancel()
+			ac.mu.Unlock()
+
+			if channelz.IsOn() {
+				channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+					Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
+					Severity: channelz.CtINFO,
+				})
+			}
+
+			reconnect := grpcsync.NewEvent()
+			prefaceReceived := make(chan struct{})
+			newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
+			if err == nil {
+				ac.mu.Lock()
+				ac.curAddr = addr
+				ac.transport = newTr
+				ac.mu.Unlock()
+
+				healthCheckConfig := ac.cc.healthCheckConfig()
+				// LB channel health checking is only enabled when all the four requirements below are met:
+				// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
+				// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
+				// 3. a service config with non-empty healthCheckConfig field is provided,
+				// 4. the current load balancer allows it.
+				healthcheckManagingState := false
+				if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
+					if ac.cc.dopts.healthCheckFunc == nil {
+						// TODO: add a link to the health check doc in the error message.
+						grpclog.Error("the client side LB channel health check function has not been set.")
+					} else {
+						// TODO(deklerk) refactor to just return transport
+						go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
+						healthcheckManagingState = true
+					}
+				}
+				if !healthcheckManagingState {
+					ac.mu.Lock()
+					ac.updateConnectivityState(connectivity.Ready)
+					ac.mu.Unlock()
+				}
+			} else {
+				hcancel()
+				if err == errConnClosing {
+					return
+				}
+
+				if tryNextAddrFromStart.HasFired() {
+					break addrLoop
+				}
+				continue
+			}
+
+			ac.mu.Lock()
+			reqHandshake := ac.dopts.reqHandshake
+			ac.mu.Unlock()
+
+			<-reconnect.Done()
+			hcancel()
+
+			if reqHandshake == envconfig.RequireHandshakeHybrid {
+				// In RequireHandshakeHybrid mode, we must check to see whether
+				// server preface has arrived yet to decide whether to start
+				// reconnecting at the top of the list (server preface received)
+				// or continue with the next addr in the list as if the
+				// connection were not successful (server preface not received).
+				select {
+				case <-prefaceReceived:
+					// We received a server preface - huzzah! We consider this
+					// a success and restart from the top of the addr list.
+					ac.mu.Lock()
+					ac.backoffIdx = 0
+					ac.mu.Unlock()
+					break addrLoop
+				default:
+					// Despite having set state to READY, in hybrid mode we
+					// consider this a failure and continue connecting at the
+					// next addr in the list.
+					ac.mu.Lock()
+					if ac.state == connectivity.Shutdown {
+						ac.mu.Unlock()
+						return
+					}
+
+					ac.updateConnectivityState(connectivity.TransientFailure)
+					ac.mu.Unlock()
+
+					if tryNextAddrFromStart.HasFired() {
+						break addrLoop
+					}
+				}
+			} else {
+				// In RequireHandshakeOn mode, we would have already waited for
+				// the server preface, so we consider this a success and restart
+				// from the top of the addr list. In RequireHandshakeOff mode,
+				// we don't care to wait for the server preface before
+				// considering this a success, so we also restart from the top
+				// of the addr list.
+				ac.mu.Lock()
+				ac.backoffIdx = 0
+				ac.mu.Unlock()
+				break addrLoop
+			}
 		}
 
-		addr := ac.addrs[ac.addrIdx]
-		copts := ac.dopts.copts
-		if ac.scopts.CredsBundle != nil {
-			copts.CredsBundle = ac.scopts.CredsBundle
+		// After exhausting all addresses, or after need to reconnect after a
+		// READY, the addrConn enters TRANSIENT_FAILURE.
+		ac.mu.Lock()
+		if ac.state == connectivity.Shutdown {
+			ac.mu.Unlock()
+			return
 		}
+		ac.updateConnectivityState(connectivity.TransientFailure)
+
+		// Backoff.
+		b := ac.resetBackoff
+		timer := time.NewTimer(backoffFor)
+		acctx := ac.ctx
 		ac.mu.Unlock()
 
-		if channelz.IsOn() {
-			channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
-				Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
-				Severity: channelz.CtINFO,
-			})
+		select {
+		case <-timer.C:
+			ac.mu.Lock()
+			ac.backoffIdx++
+			ac.mu.Unlock()
+		case <-b:
+			timer.Stop()
+		case <-acctx.Done():
+			timer.Stop()
+			return
 		}
-
-		if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
-			continue
-		}
-
-		return
 	}
 }
 
-// createTransport creates a connection to one of the backends in addrs.
-func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
-	oneReset := sync.Once{}
-	skipReset := make(chan struct{})
-	allowedToReset := make(chan struct{})
-	prefaceReceived := make(chan struct{})
+// createTransport creates a connection to one of the backends in addrs. It
+// sets ac.transport in the success case, or it returns an error if it was
+// unable to successfully create a transport.
+//
+// If waitForHandshake is enabled, it blocks until server preface arrives.
+func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
 	onCloseCalled := make(chan struct{})
 
-	var prefaceMu sync.Mutex
-	var serverPrefaceReceived bool
-	var clientPrefaceWrote bool
-
-	hcCtx, hcCancel := context.WithCancel(ac.ctx)
-
-	onGoAway := func(r transport.GoAwayReason) {
-		hcCancel()
-		ac.mu.Lock()
-		ac.adjustParams(r)
-		ac.mu.Unlock()
-		select {
-		case <-skipReset: // The outer resetTransport loop will handle reconnection.
-			return
-		case <-allowedToReset: // We're in the clear to reset.
-			go oneReset.Do(func() { ac.resetTransport(false) })
-		}
-	}
-
-	prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
-
-	onClose := func() {
-		hcCancel()
-		close(onCloseCalled)
-		prefaceTimer.Stop()
-
-		select {
-		case <-skipReset: // The outer resetTransport loop will handle reconnection.
-			return
-		case <-allowedToReset: // We're in the clear to reset.
-			oneReset.Do(func() { ac.resetTransport(false) })
-		}
-	}
-
 	target := transport.TargetInfo{
 		Addr:      addr.Addr,
 		Metadata:  addr.Metadata,
 		Authority: ac.cc.authority,
 	}
 
+	prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
+
+	onGoAway := func(r transport.GoAwayReason) {
+		ac.mu.Lock()
+		ac.adjustParams(r)
+		ac.mu.Unlock()
+		reconnect.Fire()
+	}
+
+	onClose := func() {
+		close(onCloseCalled)
+		prefaceTimer.Stop()
+		reconnect.Fire()
+	}
+
 	onPrefaceReceipt := func() {
 		close(prefaceReceived)
 		prefaceTimer.Stop()
-
-		// TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
-		ac.mu.Lock()
-
-		prefaceMu.Lock()
-		serverPrefaceReceived = true
-		if clientPrefaceWrote {
-			ac.successfulHandshake = true
-		}
-		prefaceMu.Unlock()
-
-		ac.mu.Unlock()
 	}
 
 	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
@@ -1143,13 +1187,6 @@
 	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
 
 	if err == nil {
-		prefaceMu.Lock()
-		clientPrefaceWrote = true
-		if serverPrefaceReceived || ac.dopts.reqHandshake == envconfig.RequireHandshakeOff {
-			ac.successfulHandshake = true
-		}
-		prefaceMu.Unlock()
-
 		if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
 			select {
 			case <-prefaceTimer.C:
@@ -1160,8 +1197,7 @@
 				// We got the preface - huzzah! things are good.
 			case <-onCloseCalled:
 				// The transport has already closed - noop.
-				close(allowedToReset)
-				return nil
+				return nil, errors.New("connection closed")
 			}
 		} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
 			go func() {
@@ -1186,70 +1222,32 @@
 			// ac.tearDown(...) has been invoked.
 			ac.mu.Unlock()
 
-			// We don't want to reset during this close because we prefer to kick out of this function and let the loop
-			// in resetTransport take care of reconnecting.
-			close(skipReset)
-
-			return errConnClosing
+			return nil, errConnClosing
 		}
 		ac.mu.Unlock()
 		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
-
-		// We don't want to reset during this close because we prefer to kick out of this function and let the loop
-		// in resetTransport take care of reconnecting.
-		close(skipReset)
-
-		return err
+		return nil, err
 	}
 
 	// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
 	ac.mu.Lock()
 	if ac.state == connectivity.Shutdown {
 		ac.mu.Unlock()
-		close(skipReset)
 		newTr.Close()
-		return nil
+		return nil, errConnClosing
 	}
-	ac.transport = newTr
 	ac.mu.Unlock()
 
-	healthCheckConfig := ac.cc.healthCheckConfig()
-	// LB channel health checking is only enabled when all the four requirements below are met:
-	// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
-	// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
-	// 3. a service config with non-empty healthCheckConfig field is provided,
-	// 4. the current load balancer allows it.
-	if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
-		if internal.HealthCheckFunc != nil {
-			go ac.startHealthCheck(hcCtx, newTr, addr, healthCheckConfig.ServiceName)
-			close(allowedToReset)
-			return nil
-		}
-		// TODO: add a link to the health check doc in the error message.
-		grpclog.Error("the client side LB channel health check function has not been set.")
-	}
-
-	// No LB channel health check case
+	// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
 	ac.mu.Lock()
-
 	if ac.state == connectivity.Shutdown {
 		ac.mu.Unlock()
-
-		// unblock onGoAway/onClose callback.
-		close(skipReset)
-		return errConnClosing
+		newTr.Close()
+		return nil, errConnClosing
 	}
-
-	ac.updateConnectivityState(connectivity.Ready)
-	ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
-	ac.curAddr = addr
-
 	ac.mu.Unlock()
 
-	// Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
-	// goroutine failing races with all the code in this method that sets the connection to "ready".
-	close(allowedToReset)
-	return nil
+	return newTr, nil
 }
 
 func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
@@ -1269,19 +1267,12 @@
 				firstReady = false
 				ac.curAddr = addr
 			}
-			if ac.state != connectivity.Ready {
-				ac.updateConnectivityState(connectivity.Ready)
-				ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
-			}
+			ac.updateConnectivityState(connectivity.Ready)
 		} else {
-			if ac.state != connectivity.TransientFailure {
-				ac.updateConnectivityState(connectivity.TransientFailure)
-				ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
-			}
+			ac.updateConnectivityState(connectivity.TransientFailure)
 		}
 	}
-
-	err := internal.HealthCheckFunc(ctx, newStream, reportHealth, serviceName)
+	err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
 	if err != nil {
 		if status.Code(err) == codes.Unimplemented {
 			if channelz.IsOn() {
@@ -1297,55 +1288,6 @@
 	}
 }
 
-// nextAddr increments the addrIdx if there are more addresses to try. If
-// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
-// increment the backoffIdx.
-//
-// nextAddr must be called without ac.mu being held.
-func (ac *addrConn) nextAddr() error {
-	ac.mu.Lock()
-
-	// If a handshake has been observed, we want the next usage to start at
-	// index 0 immediately.
-	if ac.successfulHandshake {
-		ac.successfulHandshake = false
-		ac.backoffDeadline = time.Time{}
-		ac.connectDeadline = time.Time{}
-		ac.addrIdx = 0
-		ac.backoffIdx = 0
-		ac.mu.Unlock()
-		return nil
-	}
-
-	if ac.addrIdx < len(ac.addrs)-1 {
-		ac.addrIdx++
-		ac.mu.Unlock()
-		return nil
-	}
-
-	ac.addrIdx = 0
-	ac.backoffIdx++
-
-	if ac.state == connectivity.Shutdown {
-		ac.mu.Unlock()
-		return errConnClosing
-	}
-	ac.cc.resolveNow(resolver.ResolveNowOption{})
-	backoffDeadline := ac.backoffDeadline
-	b := ac.resetBackoff
-	ac.mu.Unlock()
-	timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
-	select {
-	case <-timer.C:
-	case <-b:
-		timer.Stop()
-	case <-ac.ctx.Done():
-		timer.Stop()
-		return ac.ctx.Err()
-	}
-	return nil
-}
-
 func (ac *addrConn) resetConnectBackoff() {
 	ac.mu.Lock()
 	close(ac.resetBackoff)
@@ -1394,7 +1336,6 @@
 	ac.updateConnectivityState(connectivity.Shutdown)
 	ac.cancel()
 	ac.tearDownErr = err
-	ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
 	ac.curAddr = resolver.Address{}
 	if err == errConnDrain && curTr != nil {
 		// GracefulClose(...) may be executed multiple times when