VOL-1497 : Add more control to kv/memory access
- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way
Amendments:
- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter
Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 84b6dbe..56d0bf7 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -36,7 +36,6 @@
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/envconfig"
@@ -592,13 +591,12 @@
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
ac := &addrConn{
- cc: cc,
- addrs: addrs,
- scopts: opts,
- dopts: cc.dopts,
- czData: new(channelzData),
- successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
- resetBackoff: make(chan struct{}),
+ cc: cc,
+ addrs: addrs,
+ scopts: opts,
+ dopts: cc.dopts,
+ czData: new(channelzData),
+ resetBackoff: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called.
@@ -680,11 +678,10 @@
return nil
}
ac.updateConnectivityState(connectivity.Connecting)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.mu.Unlock()
// Start a goroutine connecting to the server asynchronously.
- go ac.resetTransport(false)
+ go ac.resetTransport()
return nil
}
@@ -703,6 +700,12 @@
return true
}
+ // Unless we're busy reconnecting already, let's reconnect from the top of
+ // the list.
+ if ac.state != connectivity.Ready {
+ return false
+ }
+
var curAddrFound bool
for _, a := range addrs {
if reflect.DeepEqual(ac.curAddr, a) {
@@ -713,7 +716,6 @@
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
if curAddrFound {
ac.addrs = addrs
- ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
}
return curAddrFound
@@ -913,7 +915,6 @@
transport transport.ClientTransport // The current transport.
mu sync.Mutex
- addrIdx int // The index in addrs list to start reconnecting from.
curAddr resolver.Address // The current address.
addrs []resolver.Address // All addresses that the resolver resolved to.
@@ -922,33 +923,30 @@
tearDownErr error // The reason this addrConn is torn down.
- backoffIdx int
- // backoffDeadline is the time until which resetTransport needs to
- // wait before increasing backoffIdx count.
- backoffDeadline time.Time
- // connectDeadline is the time by which all connection
- // negotiations must complete.
- connectDeadline time.Time
-
+ backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
- channelzID int64 // channelz unique identification number
- czData *channelzData
-
- successfulHandshake bool
-
+ channelzID int64 // channelz unique identification number.
+ czData *channelzData
healthCheckEnabled bool
}
// Note: this requires a lock on ac.mu.
func (ac *addrConn) updateConnectivityState(s connectivity.State) {
+ if ac.state == s {
+ return
+ }
+
+ updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
+ grpclog.Infof(updateMsg)
ac.state = s
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Subchannel Connectivity change to %v", s),
+ Desc: updateMsg,
Severity: channelz.CtINFO,
})
}
+ ac.cc.handleSubConnStateChange(ac.acbw, s)
}
// adjustParams updates parameters used to create transports upon
@@ -965,173 +963,219 @@
}
}
-// resetTransport makes sure that a healthy ac.transport exists.
-//
-// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
-// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
-// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
-// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
-// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
-// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
-//
-// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
-func (ac *addrConn) resetTransport(resolveNow bool) {
- for {
- // If this is the first in a line of resets, we want to resolve immediately. The only other time we
- // want to reset is if we have tried all the addresses handed to us.
- if resolveNow {
- ac.mu.Lock()
+func (ac *addrConn) resetTransport() {
+ for i := 0; ; i++ {
+ tryNextAddrFromStart := grpcsync.NewEvent()
+
+ ac.mu.Lock()
+ if i > 0 {
ac.cc.resolveNow(resolver.ResolveNowOption{})
- ac.mu.Unlock()
}
-
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
-
- // The transport that was used before is no longer viable.
- ac.transport = nil
- // If the connection is READY, a failure must have occurred.
- // Otherwise, we'll consider this is a transient failure when:
- // We've exhausted all addresses
- // We're in CONNECTING
- // And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake
- if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) {
- ac.updateConnectivityState(connectivity.TransientFailure)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
- }
- ac.transport = nil
+ addrs := ac.addrs
+ backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
ac.mu.Unlock()
- if err := ac.nextAddr(); err != nil {
- return
- }
+ addrLoop:
+ for _, addr := range addrs {
+ ac.mu.Lock()
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
-
- backoffIdx := ac.backoffIdx
- backoffFor := ac.dopts.bs.Backoff(backoffIdx)
-
- // This will be the duration that dial gets to finish.
- dialDuration := getMinConnectTimeout()
- if backoffFor > dialDuration {
- // Give dial more time as we keep failing to connect.
- dialDuration = backoffFor
- }
- start := time.Now()
- connectDeadline := start.Add(dialDuration)
- ac.backoffDeadline = start.Add(backoffFor)
- ac.connectDeadline = connectDeadline
-
- ac.mu.Unlock()
-
- ac.cc.mu.RLock()
- ac.dopts.copts.KeepaliveParams = ac.cc.mkp
- ac.cc.mu.RUnlock()
-
- ac.mu.Lock()
-
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
-
- if ac.state != connectivity.Connecting {
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
ac.updateConnectivityState(connectivity.Connecting)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.transport = nil
+ ac.mu.Unlock()
+
+ // This will be the duration that dial gets to finish.
+ dialDuration := getMinConnectTimeout()
+ if dialDuration < backoffFor {
+ // Give dial more time as we keep failing to connect.
+ dialDuration = backoffFor
+ }
+ connectDeadline := time.Now().Add(dialDuration)
+
+ ac.mu.Lock()
+ ac.cc.mu.RLock()
+ ac.dopts.copts.KeepaliveParams = ac.cc.mkp
+ ac.cc.mu.RUnlock()
+
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+
+ copts := ac.dopts.copts
+ if ac.scopts.CredsBundle != nil {
+ copts.CredsBundle = ac.scopts.CredsBundle
+ }
+ hctx, hcancel := context.WithCancel(ac.ctx)
+ defer hcancel()
+ ac.mu.Unlock()
+
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
+ Severity: channelz.CtINFO,
+ })
+ }
+
+ reconnect := grpcsync.NewEvent()
+ prefaceReceived := make(chan struct{})
+ newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
+ if err == nil {
+ ac.mu.Lock()
+ ac.curAddr = addr
+ ac.transport = newTr
+ ac.mu.Unlock()
+
+ healthCheckConfig := ac.cc.healthCheckConfig()
+ // LB channel health checking is only enabled when all the four requirements below are met:
+ // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
+ // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
+ // 3. a service config with non-empty healthCheckConfig field is provided,
+ // 4. the current load balancer allows it.
+ healthcheckManagingState := false
+ if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
+ if ac.cc.dopts.healthCheckFunc == nil {
+ // TODO: add a link to the health check doc in the error message.
+ grpclog.Error("the client side LB channel health check function has not been set.")
+ } else {
+ // TODO(deklerk) refactor to just return transport
+ go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
+ healthcheckManagingState = true
+ }
+ }
+ if !healthcheckManagingState {
+ ac.mu.Lock()
+ ac.updateConnectivityState(connectivity.Ready)
+ ac.mu.Unlock()
+ }
+ } else {
+ hcancel()
+ if err == errConnClosing {
+ return
+ }
+
+ if tryNextAddrFromStart.HasFired() {
+ break addrLoop
+ }
+ continue
+ }
+
+ ac.mu.Lock()
+ reqHandshake := ac.dopts.reqHandshake
+ ac.mu.Unlock()
+
+ <-reconnect.Done()
+ hcancel()
+
+ if reqHandshake == envconfig.RequireHandshakeHybrid {
+ // In RequireHandshakeHybrid mode, we must check to see whether
+ // server preface has arrived yet to decide whether to start
+ // reconnecting at the top of the list (server preface received)
+ // or continue with the next addr in the list as if the
+ // connection were not successful (server preface not received).
+ select {
+ case <-prefaceReceived:
+ // We received a server preface - huzzah! We consider this
+ // a success and restart from the top of the addr list.
+ ac.mu.Lock()
+ ac.backoffIdx = 0
+ ac.mu.Unlock()
+ break addrLoop
+ default:
+ // Despite having set state to READY, in hybrid mode we
+ // consider this a failure and continue connecting at the
+ // next addr in the list.
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+
+ ac.updateConnectivityState(connectivity.TransientFailure)
+ ac.mu.Unlock()
+
+ if tryNextAddrFromStart.HasFired() {
+ break addrLoop
+ }
+ }
+ } else {
+ // In RequireHandshakeOn mode, we would have already waited for
+ // the server preface, so we consider this a success and restart
+ // from the top of the addr list. In RequireHandshakeOff mode,
+ // we don't care to wait for the server preface before
+ // considering this a success, so we also restart from the top
+ // of the addr list.
+ ac.mu.Lock()
+ ac.backoffIdx = 0
+ ac.mu.Unlock()
+ break addrLoop
+ }
}
- addr := ac.addrs[ac.addrIdx]
- copts := ac.dopts.copts
- if ac.scopts.CredsBundle != nil {
- copts.CredsBundle = ac.scopts.CredsBundle
+ // After exhausting all addresses, or after need to reconnect after a
+ // READY, the addrConn enters TRANSIENT_FAILURE.
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
}
+ ac.updateConnectivityState(connectivity.TransientFailure)
+
+ // Backoff.
+ b := ac.resetBackoff
+ timer := time.NewTimer(backoffFor)
+ acctx := ac.ctx
ac.mu.Unlock()
- if channelz.IsOn() {
- channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
- Severity: channelz.CtINFO,
- })
+ select {
+ case <-timer.C:
+ ac.mu.Lock()
+ ac.backoffIdx++
+ ac.mu.Unlock()
+ case <-b:
+ timer.Stop()
+ case <-acctx.Done():
+ timer.Stop()
+ return
}
-
- if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
- continue
- }
-
- return
}
}
-// createTransport creates a connection to one of the backends in addrs.
-func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
- oneReset := sync.Once{}
- skipReset := make(chan struct{})
- allowedToReset := make(chan struct{})
- prefaceReceived := make(chan struct{})
+// createTransport creates a connection to one of the backends in addrs. It
+// sets ac.transport in the success case, or it returns an error if it was
+// unable to successfully create a transport.
+//
+// If waitForHandshake is enabled, it blocks until server preface arrives.
+func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
onCloseCalled := make(chan struct{})
- var prefaceMu sync.Mutex
- var serverPrefaceReceived bool
- var clientPrefaceWrote bool
-
- hcCtx, hcCancel := context.WithCancel(ac.ctx)
-
- onGoAway := func(r transport.GoAwayReason) {
- hcCancel()
- ac.mu.Lock()
- ac.adjustParams(r)
- ac.mu.Unlock()
- select {
- case <-skipReset: // The outer resetTransport loop will handle reconnection.
- return
- case <-allowedToReset: // We're in the clear to reset.
- go oneReset.Do(func() { ac.resetTransport(false) })
- }
- }
-
- prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
-
- onClose := func() {
- hcCancel()
- close(onCloseCalled)
- prefaceTimer.Stop()
-
- select {
- case <-skipReset: // The outer resetTransport loop will handle reconnection.
- return
- case <-allowedToReset: // We're in the clear to reset.
- oneReset.Do(func() { ac.resetTransport(false) })
- }
- }
-
target := transport.TargetInfo{
Addr: addr.Addr,
Metadata: addr.Metadata,
Authority: ac.cc.authority,
}
+ prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
+
+ onGoAway := func(r transport.GoAwayReason) {
+ ac.mu.Lock()
+ ac.adjustParams(r)
+ ac.mu.Unlock()
+ reconnect.Fire()
+ }
+
+ onClose := func() {
+ close(onCloseCalled)
+ prefaceTimer.Stop()
+ reconnect.Fire()
+ }
+
onPrefaceReceipt := func() {
close(prefaceReceived)
prefaceTimer.Stop()
-
- // TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
- ac.mu.Lock()
-
- prefaceMu.Lock()
- serverPrefaceReceived = true
- if clientPrefaceWrote {
- ac.successfulHandshake = true
- }
- prefaceMu.Unlock()
-
- ac.mu.Unlock()
}
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
@@ -1143,13 +1187,6 @@
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
if err == nil {
- prefaceMu.Lock()
- clientPrefaceWrote = true
- if serverPrefaceReceived || ac.dopts.reqHandshake == envconfig.RequireHandshakeOff {
- ac.successfulHandshake = true
- }
- prefaceMu.Unlock()
-
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-prefaceTimer.C:
@@ -1160,8 +1197,7 @@
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
- close(allowedToReset)
- return nil
+ return nil, errors.New("connection closed")
}
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
go func() {
@@ -1186,70 +1222,32 @@
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
- // We don't want to reset during this close because we prefer to kick out of this function and let the loop
- // in resetTransport take care of reconnecting.
- close(skipReset)
-
- return errConnClosing
+ return nil, errConnClosing
}
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
-
- // We don't want to reset during this close because we prefer to kick out of this function and let the loop
- // in resetTransport take care of reconnecting.
- close(skipReset)
-
- return err
+ return nil, err
}
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
- close(skipReset)
newTr.Close()
- return nil
+ return nil, errConnClosing
}
- ac.transport = newTr
ac.mu.Unlock()
- healthCheckConfig := ac.cc.healthCheckConfig()
- // LB channel health checking is only enabled when all the four requirements below are met:
- // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
- // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
- // 3. a service config with non-empty healthCheckConfig field is provided,
- // 4. the current load balancer allows it.
- if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
- if internal.HealthCheckFunc != nil {
- go ac.startHealthCheck(hcCtx, newTr, addr, healthCheckConfig.ServiceName)
- close(allowedToReset)
- return nil
- }
- // TODO: add a link to the health check doc in the error message.
- grpclog.Error("the client side LB channel health check function has not been set.")
- }
-
- // No LB channel health check case
+ // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
-
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
-
- // unblock onGoAway/onClose callback.
- close(skipReset)
- return errConnClosing
+ newTr.Close()
+ return nil, errConnClosing
}
-
- ac.updateConnectivityState(connectivity.Ready)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
- ac.curAddr = addr
-
ac.mu.Unlock()
- // Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
- // goroutine failing races with all the code in this method that sets the connection to "ready".
- close(allowedToReset)
- return nil
+ return newTr, nil
}
func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
@@ -1269,19 +1267,12 @@
firstReady = false
ac.curAddr = addr
}
- if ac.state != connectivity.Ready {
- ac.updateConnectivityState(connectivity.Ready)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
- }
+ ac.updateConnectivityState(connectivity.Ready)
} else {
- if ac.state != connectivity.TransientFailure {
- ac.updateConnectivityState(connectivity.TransientFailure)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
- }
+ ac.updateConnectivityState(connectivity.TransientFailure)
}
}
-
- err := internal.HealthCheckFunc(ctx, newStream, reportHealth, serviceName)
+ err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
if channelz.IsOn() {
@@ -1297,55 +1288,6 @@
}
}
-// nextAddr increments the addrIdx if there are more addresses to try. If
-// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
-// increment the backoffIdx.
-//
-// nextAddr must be called without ac.mu being held.
-func (ac *addrConn) nextAddr() error {
- ac.mu.Lock()
-
- // If a handshake has been observed, we want the next usage to start at
- // index 0 immediately.
- if ac.successfulHandshake {
- ac.successfulHandshake = false
- ac.backoffDeadline = time.Time{}
- ac.connectDeadline = time.Time{}
- ac.addrIdx = 0
- ac.backoffIdx = 0
- ac.mu.Unlock()
- return nil
- }
-
- if ac.addrIdx < len(ac.addrs)-1 {
- ac.addrIdx++
- ac.mu.Unlock()
- return nil
- }
-
- ac.addrIdx = 0
- ac.backoffIdx++
-
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return errConnClosing
- }
- ac.cc.resolveNow(resolver.ResolveNowOption{})
- backoffDeadline := ac.backoffDeadline
- b := ac.resetBackoff
- ac.mu.Unlock()
- timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
- select {
- case <-timer.C:
- case <-b:
- timer.Stop()
- case <-ac.ctx.Done():
- timer.Stop()
- return ac.ctx.Err()
- }
- return nil
-}
-
func (ac *addrConn) resetConnectBackoff() {
ac.mu.Lock()
close(ac.resetBackoff)
@@ -1394,7 +1336,6 @@
ac.updateConnectivityState(connectivity.Shutdown)
ac.cancel()
ac.tearDownErr = err
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.curAddr = resolver.Address{}
if err == errConnDrain && curTr != nil {
// GracefulClose(...) may be executed multiple times when