Fix transitions for image_download and activation
Change-Id: I713ada52eef9d275c7d7596026e178c7382e8335
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index 8df4095..5356194 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -25,6 +25,8 @@
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/buffer"
+ "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)
@@ -34,62 +36,14 @@
state connectivity.State
}
-// scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
-// TODO make a general purpose buffer that uses interface{}.
-type scStateUpdateBuffer struct {
- c chan *scStateUpdate
- mu sync.Mutex
- backlog []*scStateUpdate
-}
-
-func newSCStateUpdateBuffer() *scStateUpdateBuffer {
- return &scStateUpdateBuffer{
- c: make(chan *scStateUpdate, 1),
- }
-}
-
-func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
- b.mu.Lock()
- defer b.mu.Unlock()
- if len(b.backlog) == 0 {
- select {
- case b.c <- t:
- return
- default:
- }
- }
- b.backlog = append(b.backlog, t)
-}
-
-func (b *scStateUpdateBuffer) load() {
- b.mu.Lock()
- defer b.mu.Unlock()
- if len(b.backlog) > 0 {
- select {
- case b.c <- b.backlog[0]:
- b.backlog[0] = nil
- b.backlog = b.backlog[1:]
- default:
- }
- }
-}
-
-// get returns the channel that the scStateUpdate will be sent to.
-//
-// Upon receiving, the caller should call load to send another
-// scStateChangeTuple onto the channel if there is any.
-func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
- return b.c
-}
-
// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
- cc *ClientConn
- balancer balancer.Balancer
- stateChangeQueue *scStateUpdateBuffer
- ccUpdateCh chan *balancer.ClientConnState
- done chan struct{}
+ cc *ClientConn
+ balancerMu sync.Mutex // synchronizes calls to the balancer
+ balancer balancer.Balancer
+ scBuffer *buffer.Unbounded
+ done *grpcsync.Event
mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
@@ -97,11 +51,10 @@
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
- cc: cc,
- stateChangeQueue: newSCStateUpdateBuffer(),
- ccUpdateCh: make(chan *balancer.ClientConnState, 1),
- done: make(chan struct{}),
- subConns: make(map[*acBalancerWrapper]struct{}),
+ cc: cc,
+ scBuffer: buffer.NewUnbounded(),
+ done: grpcsync.NewEvent(),
+ subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
@@ -113,36 +66,23 @@
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
- case t := <-ccb.stateChangeQueue.get():
- ccb.stateChangeQueue.load()
- select {
- case <-ccb.done:
- ccb.balancer.Close()
- return
- default:
+ case t := <-ccb.scBuffer.Get():
+ ccb.scBuffer.Load()
+ if ccb.done.HasFired() {
+ break
}
+ ccb.balancerMu.Lock()
+ su := t.(*scStateUpdate)
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
- ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state})
+ ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state})
} else {
- ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
+ ccb.balancer.HandleSubConnStateChange(su.sc, su.state)
}
- case s := <-ccb.ccUpdateCh:
- select {
- case <-ccb.done:
- ccb.balancer.Close()
- return
- default:
- }
- if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
- ub.UpdateClientConnState(*s)
- } else {
- ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
- }
- case <-ccb.done:
+ ccb.balancerMu.Unlock()
+ case <-ccb.done.Done():
}
- select {
- case <-ccb.done:
+ if ccb.done.HasFired() {
ccb.balancer.Close()
ccb.mu.Lock()
scs := ccb.subConns
@@ -153,14 +93,12 @@
}
ccb.UpdateBalancerState(connectivity.Connecting, nil)
return
- default:
}
- ccb.cc.firstResolveEvent.Fire()
}
}
func (ccb *ccBalancerWrapper) close() {
- close(ccb.done)
+ ccb.done.Fire()
}
func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
@@ -174,30 +112,28 @@
if sc == nil {
return
}
- ccb.stateChangeQueue.put(&scStateUpdate{
+ ccb.scBuffer.Put(&scStateUpdate{
sc: sc,
state: s,
})
}
-func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
- if ccb.cc.curBalancerName != grpclbName {
- // Filter any grpclb addresses since we don't have the grpclb balancer.
- s := &ccs.ResolverState
- for i := 0; i < len(s.Addresses); {
- if s.Addresses[i].Type == resolver.GRPCLB {
- copy(s.Addresses[i:], s.Addresses[i+1:])
- s.Addresses = s.Addresses[:len(s.Addresses)-1]
- continue
- }
- i++
- }
+func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
+ ccb.balancerMu.Lock()
+ defer ccb.balancerMu.Unlock()
+ if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
+ return ub.UpdateClientConnState(*ccs)
}
- select {
- case <-ccb.ccUpdateCh:
- default:
+ ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil)
+ return nil
+}
+
+func (ccb *ccBalancerWrapper) resolverError(err error) {
+ if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
+ ccb.balancerMu.Lock()
+ ub.ResolverError(err)
+ ccb.balancerMu.Unlock()
}
- ccb.ccUpdateCh <- ccs
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {