[SEBA-930] update GRPC version to 1.27 and change kafka message producing

Change-Id: I14145a1351eb2523fa54e66381ad97abc5eedf50
diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go
index 1a5c1aa..d952f09 100644
--- a/vendor/google.golang.org/grpc/balancer/base/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go
@@ -20,6 +20,7 @@
 
 import (
 	"context"
+	"errors"
 
 	"google.golang.org/grpc/balancer"
 	"google.golang.org/grpc/connectivity"
@@ -28,25 +29,32 @@
 )
 
 type baseBuilder struct {
-	name          string
-	pickerBuilder PickerBuilder
-	config        Config
+	name            string
+	pickerBuilder   PickerBuilder
+	v2PickerBuilder V2PickerBuilder
+	config          Config
 }
 
 func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
-	return &baseBalancer{
-		cc:            cc,
-		pickerBuilder: bb.pickerBuilder,
+	bal := &baseBalancer{
+		cc:              cc,
+		pickerBuilder:   bb.pickerBuilder,
+		v2PickerBuilder: bb.v2PickerBuilder,
 
 		subConns: make(map[resolver.Address]balancer.SubConn),
 		scStates: make(map[balancer.SubConn]connectivity.State),
 		csEvltr:  &balancer.ConnectivityStateEvaluator{},
-		// Initialize picker to a picker that always return
-		// ErrNoSubConnAvailable, because when state of a SubConn changes, we
-		// may call UpdateBalancerState with this picker.
-		picker: NewErrPicker(balancer.ErrNoSubConnAvailable),
-		config: bb.config,
+		config:   bb.config,
 	}
+	// Initialize picker to a picker that always returns
+	// ErrNoSubConnAvailable, because when state of a SubConn changes, we
+	// may call UpdateState with this picker.
+	if bb.pickerBuilder != nil {
+		bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
+	} else {
+		bal.v2Picker = NewErrPickerV2(balancer.ErrNoSubConnAvailable)
+	}
+	return bal
 }
 
 func (bb *baseBuilder) Name() string {
@@ -56,8 +64,9 @@
 var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer
 
 type baseBalancer struct {
-	cc            balancer.ClientConn
-	pickerBuilder PickerBuilder
+	cc              balancer.ClientConn
+	pickerBuilder   PickerBuilder
+	v2PickerBuilder V2PickerBuilder
 
 	csEvltr *balancer.ConnectivityStateEvaluator
 	state   connectivity.State
@@ -65,6 +74,7 @@
 	subConns map[resolver.Address]balancer.SubConn
 	scStates map[balancer.SubConn]connectivity.State
 	picker   balancer.Picker
+	v2Picker balancer.V2Picker
 	config   Config
 }
 
@@ -72,8 +82,15 @@
 	panic("not implemented")
 }
 
-func (b *baseBalancer) ResolverError(error) {
-	// Ignore
+func (b *baseBalancer) ResolverError(err error) {
+	switch b.state {
+	case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
+		if b.picker != nil {
+			b.picker = NewErrPicker(err)
+		} else {
+			b.v2Picker = NewErrPickerV2(err)
+		}
+	}
 }
 
 func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
@@ -114,20 +131,44 @@
 // from it. The picker is
 //  - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
 //  - built by the pickerBuilder with all READY SubConns otherwise.
-func (b *baseBalancer) regeneratePicker() {
+func (b *baseBalancer) regeneratePicker(err error) {
 	if b.state == connectivity.TransientFailure {
-		b.picker = NewErrPicker(balancer.ErrTransientFailure)
+		if b.pickerBuilder != nil {
+			b.picker = NewErrPicker(balancer.ErrTransientFailure)
+		} else {
+			if err != nil {
+				b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err))
+			} else {
+				// This means the last subchannel transition was not to
+				// TransientFailure (otherwise err must be set), but the
+				// aggregate state of the balancer is TransientFailure, meaning
+				// there are no other addresses.
+				b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses")))
+			}
+		}
 		return
 	}
-	readySCs := make(map[resolver.Address]balancer.SubConn)
+	if b.pickerBuilder != nil {
+		readySCs := make(map[resolver.Address]balancer.SubConn)
 
-	// Filter out all ready SCs from full subConn map.
-	for addr, sc := range b.subConns {
-		if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
-			readySCs[addr] = sc
+		// Filter out all ready SCs from full subConn map.
+		for addr, sc := range b.subConns {
+			if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
+				readySCs[addr] = sc
+			}
 		}
+		b.picker = b.pickerBuilder.Build(readySCs)
+	} else {
+		readySCs := make(map[balancer.SubConn]SubConnInfo)
+
+		// Filter out all ready SCs from full subConn map.
+		for addr, sc := range b.subConns {
+			if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
+				readySCs[sc] = SubConnInfo{Address: addr}
+			}
+		}
+		b.v2Picker = b.v2PickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
 	}
-	b.picker = b.pickerBuilder.Build(readySCs)
 }
 
 func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
@@ -166,10 +207,14 @@
 	//  - the aggregated state of balancer became non-TransientFailure from TransientFailure
 	if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
 		(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
-		b.regeneratePicker()
+		b.regeneratePicker(state.ConnectionError)
 	}
 
-	b.cc.UpdateBalancerState(b.state, b.picker)
+	if b.picker != nil {
+		b.cc.UpdateBalancerState(b.state, b.picker)
+	} else {
+		b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.v2Picker})
+	}
 }
 
 // Close is a nop because base balancer doesn't have internal state to clean up,
@@ -186,6 +231,19 @@
 	err error // Pick() always returns this err.
 }
 
-func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
+func (p *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
 	return nil, nil, p.err
 }
+
+// NewErrPickerV2 returns a V2Picker that always returns err on Pick().
+func NewErrPickerV2(err error) balancer.V2Picker {
+	return &errPickerV2{err: err}
+}
+
+type errPickerV2 struct {
+	err error // Pick() always returns this err.
+}
+
+func (p *errPickerV2) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
+	return balancer.PickResult{}, p.err
+}