[SEBA-930] update GRPC version to 1.27 and change kafka message producing
Change-Id: I14145a1351eb2523fa54e66381ad97abc5eedf50
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index 5356194..824f28e 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -34,6 +34,7 @@
type scStateUpdate struct {
sc balancer.SubConn
state connectivity.State
+ err error
}
// ccBalancerWrapper is a wrapper on top of cc for balancers.
@@ -74,7 +75,7 @@
ccb.balancerMu.Lock()
su := t.(*scStateUpdate)
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
- ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state})
+ ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err})
} else {
ccb.balancer.HandleSubConnStateChange(su.sc, su.state)
}
@@ -91,7 +92,7 @@
for acbw := range scs {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
- ccb.UpdateBalancerState(connectivity.Connecting, nil)
+ ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
return
}
}
@@ -101,7 +102,7 @@
ccb.done.Fire()
}
-func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
// When updating addresses for a SubConn, if the address in use is not in
// the new addresses, the old ac will be tearDown() and a new ac will be
// created. tearDown() generates a state change with Shutdown state, we
@@ -115,6 +116,7 @@
ccb.scBuffer.Put(&scStateUpdate{
sc: sc,
state: s,
+ err: err,
})
}
@@ -186,7 +188,22 @@
ccb.cc.csMgr.updateState(s)
}
-func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) {
+func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
+ ccb.mu.Lock()
+ defer ccb.mu.Unlock()
+ if ccb.subConns == nil {
+ return
+ }
+ // Update picker before updating state. Even though the ordering here does
+ // not matter, it can lead to multiple calls of Pick in the common start-up
+ // case where we wait for ready and then perform an RPC. If the picker is
+ // updated later, we could call the "connecting" picker when the state is
+ // updated, and then call the "ready" picker after the picker gets updated.
+ ccb.cc.blockingpicker.updatePickerV2(s.Picker)
+ ccb.cc.csMgr.updateState(s.ConnectivityState)
+}
+
+func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
ccb.cc.resolveNow(o)
}