[SEBA-660] : Adding Igmp support in BbSim

Change-Id: I9f5c7d8ad39ac82850b04e2c997996d6c47b32d2
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index 7bc6621..824f28e 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -25,6 +25,8 @@
 	"google.golang.org/grpc/balancer"
 	"google.golang.org/grpc/connectivity"
 	"google.golang.org/grpc/grpclog"
+	"google.golang.org/grpc/internal/buffer"
+	"google.golang.org/grpc/internal/grpcsync"
 	"google.golang.org/grpc/resolver"
 )
 
@@ -32,64 +34,17 @@
 type scStateUpdate struct {
 	sc    balancer.SubConn
 	state connectivity.State
-}
-
-// scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
-// TODO make a general purpose buffer that uses interface{}.
-type scStateUpdateBuffer struct {
-	c       chan *scStateUpdate
-	mu      sync.Mutex
-	backlog []*scStateUpdate
-}
-
-func newSCStateUpdateBuffer() *scStateUpdateBuffer {
-	return &scStateUpdateBuffer{
-		c: make(chan *scStateUpdate, 1),
-	}
-}
-
-func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
-	b.mu.Lock()
-	defer b.mu.Unlock()
-	if len(b.backlog) == 0 {
-		select {
-		case b.c <- t:
-			return
-		default:
-		}
-	}
-	b.backlog = append(b.backlog, t)
-}
-
-func (b *scStateUpdateBuffer) load() {
-	b.mu.Lock()
-	defer b.mu.Unlock()
-	if len(b.backlog) > 0 {
-		select {
-		case b.c <- b.backlog[0]:
-			b.backlog[0] = nil
-			b.backlog = b.backlog[1:]
-		default:
-		}
-	}
-}
-
-// get returns the channel that the scStateUpdate will be sent to.
-//
-// Upon receiving, the caller should call load to send another
-// scStateChangeTuple onto the channel if there is any.
-func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
-	return b.c
+	err   error
 }
 
 // ccBalancerWrapper is a wrapper on top of cc for balancers.
 // It implements balancer.ClientConn interface.
 type ccBalancerWrapper struct {
-	cc               *ClientConn
-	balancer         balancer.Balancer
-	stateChangeQueue *scStateUpdateBuffer
-	ccUpdateCh       chan *balancer.ClientConnState
-	done             chan struct{}
+	cc         *ClientConn
+	balancerMu sync.Mutex // synchronizes calls to the balancer
+	balancer   balancer.Balancer
+	scBuffer   *buffer.Unbounded
+	done       *grpcsync.Event
 
 	mu       sync.Mutex
 	subConns map[*acBalancerWrapper]struct{}
@@ -97,11 +52,10 @@
 
 func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
 	ccb := &ccBalancerWrapper{
-		cc:               cc,
-		stateChangeQueue: newSCStateUpdateBuffer(),
-		ccUpdateCh:       make(chan *balancer.ClientConnState, 1),
-		done:             make(chan struct{}),
-		subConns:         make(map[*acBalancerWrapper]struct{}),
+		cc:       cc,
+		scBuffer: buffer.NewUnbounded(),
+		done:     grpcsync.NewEvent(),
+		subConns: make(map[*acBalancerWrapper]struct{}),
 	}
 	go ccb.watcher()
 	ccb.balancer = b.Build(ccb, bopts)
@@ -113,36 +67,23 @@
 func (ccb *ccBalancerWrapper) watcher() {
 	for {
 		select {
-		case t := <-ccb.stateChangeQueue.get():
-			ccb.stateChangeQueue.load()
-			select {
-			case <-ccb.done:
-				ccb.balancer.Close()
-				return
-			default:
+		case t := <-ccb.scBuffer.Get():
+			ccb.scBuffer.Load()
+			if ccb.done.HasFired() {
+				break
 			}
+			ccb.balancerMu.Lock()
+			su := t.(*scStateUpdate)
 			if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
-				ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state})
+				ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err})
 			} else {
-				ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
+				ccb.balancer.HandleSubConnStateChange(su.sc, su.state)
 			}
-		case s := <-ccb.ccUpdateCh:
-			select {
-			case <-ccb.done:
-				ccb.balancer.Close()
-				return
-			default:
-			}
-			if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
-				ub.UpdateClientConnState(*s)
-			} else {
-				ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
-			}
-		case <-ccb.done:
+			ccb.balancerMu.Unlock()
+		case <-ccb.done.Done():
 		}
 
-		select {
-		case <-ccb.done:
+		if ccb.done.HasFired() {
 			ccb.balancer.Close()
 			ccb.mu.Lock()
 			scs := ccb.subConns
@@ -151,19 +92,17 @@
 			for acbw := range scs {
 				ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
 			}
-			ccb.UpdateBalancerState(connectivity.Connecting, nil)
+			ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
 			return
-		default:
 		}
-		ccb.cc.firstResolveEvent.Fire()
 	}
 }
 
 func (ccb *ccBalancerWrapper) close() {
-	close(ccb.done)
+	ccb.done.Fire()
 }
 
-func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
 	// When updating addresses for a SubConn, if the address in use is not in
 	// the new addresses, the old ac will be tearDown() and a new ac will be
 	// created. tearDown() generates a state change with Shutdown state, we
@@ -174,30 +113,29 @@
 	if sc == nil {
 		return
 	}
-	ccb.stateChangeQueue.put(&scStateUpdate{
+	ccb.scBuffer.Put(&scStateUpdate{
 		sc:    sc,
 		state: s,
+		err:   err,
 	})
 }
 
-func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
-	if ccb.cc.curBalancerName != grpclbName {
-		// Filter any grpclb addresses since we don't have the grpclb balancer.
-		s := ccs.ResolverState
-		for i := 0; i < len(s.Addresses); {
-			if s.Addresses[i].Type == resolver.GRPCLB {
-				copy(s.Addresses[i:], s.Addresses[i+1:])
-				s.Addresses = s.Addresses[:len(s.Addresses)-1]
-				continue
-			}
-			i++
-		}
+func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
+	ccb.balancerMu.Lock()
+	defer ccb.balancerMu.Unlock()
+	if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
+		return ub.UpdateClientConnState(*ccs)
 	}
-	select {
-	case <-ccb.ccUpdateCh:
-	default:
+	ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil)
+	return nil
+}
+
+func (ccb *ccBalancerWrapper) resolverError(err error) {
+	if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
+		ccb.balancerMu.Lock()
+		ub.ResolverError(err)
+		ccb.balancerMu.Unlock()
 	}
-	ccb.ccUpdateCh <- ccs
 }
 
 func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
@@ -250,7 +188,22 @@
 	ccb.cc.csMgr.updateState(s)
 }
 
-func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) {
+func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
+	ccb.mu.Lock()
+	defer ccb.mu.Unlock()
+	if ccb.subConns == nil {
+		return
+	}
+	// Update picker before updating state.  Even though the ordering here does
+	// not matter, it can lead to multiple calls of Pick in the common start-up
+	// case where we wait for ready and then perform an RPC.  If the picker is
+	// updated later, we could call the "connecting" picker when the state is
+	// updated, and then call the "ready" picker after the picker gets updated.
+	ccb.cc.blockingpicker.updatePickerV2(s.Picker)
+	ccb.cc.csMgr.updateState(s.ConnectivityState)
+}
+
+func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
 	ccb.cc.resolveNow(o)
 }