[SEBA-930] update GRPC version to 1.27 and change kafka message producing

Change-Id: I14145a1351eb2523fa54e66381ad97abc5eedf50
diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go
index 917c242..9258858 100644
--- a/vendor/google.golang.org/grpc/balancer/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/balancer.go
@@ -117,6 +117,15 @@
 	HealthCheckEnabled bool
 }
 
+// State contains the balancer's state relevant to the gRPC ClientConn.
+type State struct {
+	// State contains the connectivity state of the balancer, which is used to
+	// determine the state of the ClientConn.
+	ConnectivityState connectivity.State
+	// Picker is used to choose connections (SubConns) for RPCs.
+	Picker V2Picker
+}
+
 // ClientConn represents a gRPC ClientConn.
 //
 // This interface is to be implemented by gRPC. Users should not need a
@@ -137,10 +146,19 @@
 	//
 	// gRPC will update the connectivity state of the ClientConn, and will call pick
 	// on the new picker to pick new SubConn.
+	//
+	// Deprecated: use UpdateState instead
 	UpdateBalancerState(s connectivity.State, p Picker)
 
+	// UpdateState notifies gRPC that the balancer's internal state has
+	// changed.
+	//
+	// gRPC will update the connectivity state of the ClientConn, and will call pick
+	// on the new picker to pick new SubConns.
+	UpdateState(State)
+
 	// ResolveNow is called by balancer to notify gRPC to do a name resolving.
-	ResolveNow(resolver.ResolveNowOption)
+	ResolveNow(resolver.ResolveNowOptions)
 
 	// Target returns the dial target for this ClientConn.
 	//
@@ -185,11 +203,14 @@
 	ParseConfig(LoadBalancingConfigJSON json.RawMessage) (serviceconfig.LoadBalancingConfig, error)
 }
 
-// PickOptions contains addition information for the Pick operation.
-type PickOptions struct {
+// PickInfo contains additional information for the Pick operation.
+type PickInfo struct {
 	// FullMethodName is the method name that NewClientStream() is called
 	// with. The canonical format is /service/Method.
 	FullMethodName string
+	// Ctx is the RPC's context, and may contain relevant RPC-level information
+	// like the outgoing header metadata.
+	Ctx context.Context
 }
 
 // DoneInfo contains additional information for done.
@@ -215,7 +236,7 @@
 	ErrNoSubConnAvailable = errors.New("no SubConn is available")
 	// ErrTransientFailure indicates all SubConns are in TransientFailure.
 	// WaitForReady RPCs will block, non-WaitForReady RPCs will fail.
-	ErrTransientFailure = errors.New("all SubConns are in TransientFailure")
+	ErrTransientFailure = TransientFailureError(errors.New("all SubConns are in TransientFailure"))
 )
 
 // Picker is used by gRPC to pick a SubConn to send an RPC.
@@ -223,6 +244,8 @@
 // internal state has changed.
 //
 // The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
+//
+// Deprecated: use V2Picker instead
 type Picker interface {
 	// Pick returns the SubConn to be used to send the RPC.
 	// The returned SubConn must be one returned by NewSubConn().
@@ -243,18 +266,76 @@
 	//
 	// If the returned error is not nil:
 	// - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
-	// - If the error is ErrTransientFailure:
+	// - If the error is ErrTransientFailure or implements IsTransientFailure()
+	//   bool, returning true:
 	//   - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState()
 	//     is called to pick again;
 	//   - Otherwise, RPC will fail with unavailable error.
 	// - Else (error is other non-nil error):
-	//   - The RPC will fail with unavailable error.
+	//   - The RPC will fail with the error's status code, or Unknown if it is
+	//     not a status error.
 	//
 	// The returned done() function will be called once the rpc has finished,
 	// with the final status of that RPC.  If the SubConn returned is not a
 	// valid SubConn type, done may not be called.  done may be nil if balancer
 	// doesn't care about the RPC status.
-	Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
+	Pick(ctx context.Context, info PickInfo) (conn SubConn, done func(DoneInfo), err error)
+}
+
+// PickResult contains information related to a connection chosen for an RPC.
+type PickResult struct {
+	// SubConn is the connection to use for this pick, if its state is Ready.
+	// If the state is not Ready, gRPC will block the RPC until a new Picker is
+	// provided by the balancer (using ClientConn.UpdateState).  The SubConn
+	// must be one returned by ClientConn.NewSubConn.
+	SubConn SubConn
+
+	// Done is called when the RPC is completed.  If the SubConn is not ready,
+	// this will be called with a nil parameter.  If the SubConn is not a valid
+	// type, Done may not be called.  May be nil if the balancer does not wish
+	// to be notified when the RPC completes.
+	Done func(DoneInfo)
+}
+
+type transientFailureError struct {
+	error
+}
+
+func (e *transientFailureError) IsTransientFailure() bool { return true }
+
+// TransientFailureError wraps err in an error implementing
+// IsTransientFailure() bool, returning true.
+func TransientFailureError(err error) error {
+	return &transientFailureError{error: err}
+}
+
+// V2Picker is used by gRPC to pick a SubConn to send an RPC.
+// Balancer is expected to generate a new picker from its snapshot every time its
+// internal state has changed.
+//
+// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
+type V2Picker interface {
+	// Pick returns the connection to use for this RPC and related information.
+	//
+	// Pick should not block.  If the balancer needs to do I/O or any blocking
+	// or time-consuming work to service this call, it should return
+	// ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
+	// the Picker is updated (using ClientConn.UpdateState).
+	//
+	// If an error is returned:
+	//
+	// - If the error is ErrNoSubConnAvailable, gRPC will block until a new
+	//   Picker is provided by the balancer (using ClientConn.UpdateState).
+	//
+	// - If the error implements IsTransientFailure() bool, returning true,
+	//   wait for ready RPCs will wait, but non-wait for ready RPCs will be
+	//   terminated with this error's Error() string and status code
+	//   Unavailable.
+	//
+	// - Any other errors terminate all RPCs with the code and message
+	//   provided.  If the error is not a status error, it will be converted by
+	//   gRPC to a status error with code Unknown.
+	Pick(info PickInfo) (PickResult, error)
 }
 
 // Balancer takes input from gRPC, manages SubConns, and collects and aggregates
@@ -292,8 +373,11 @@
 
 // SubConnState describes the state of a SubConn.
 type SubConnState struct {
+	// ConnectivityState is the connectivity state of the SubConn.
 	ConnectivityState connectivity.State
-	// TODO: add last connection error
+	// ConnectionError is set if the ConnectivityState is TransientFailure,
+	// describing the reason the SubConn failed.  Otherwise, it is nil.
+	ConnectionError error
 }
 
 // ClientConnState describes the state of a ClientConn relevant to the
@@ -335,9 +419,8 @@
 //
 // It's not thread safe.
 type ConnectivityStateEvaluator struct {
-	numReady            uint64 // Number of addrConns in ready state.
-	numConnecting       uint64 // Number of addrConns in connecting state.
-	numTransientFailure uint64 // Number of addrConns in transientFailure.
+	numReady      uint64 // Number of addrConns in ready state.
+	numConnecting uint64 // Number of addrConns in connecting state.
 }
 
 // RecordTransition records state change happening in subConn and based on that
@@ -357,8 +440,6 @@
 			cse.numReady += updateVal
 		case connectivity.Connecting:
 			cse.numConnecting += updateVal
-		case connectivity.TransientFailure:
-			cse.numTransientFailure += updateVal
 		}
 	}
 
diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go
index 1a5c1aa..d952f09 100644
--- a/vendor/google.golang.org/grpc/balancer/base/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go
@@ -20,6 +20,7 @@
 
 import (
 	"context"
+	"errors"
 
 	"google.golang.org/grpc/balancer"
 	"google.golang.org/grpc/connectivity"
@@ -28,25 +29,32 @@
 )
 
 type baseBuilder struct {
-	name          string
-	pickerBuilder PickerBuilder
-	config        Config
+	name            string
+	pickerBuilder   PickerBuilder
+	v2PickerBuilder V2PickerBuilder
+	config          Config
 }
 
 func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
-	return &baseBalancer{
-		cc:            cc,
-		pickerBuilder: bb.pickerBuilder,
+	bal := &baseBalancer{
+		cc:              cc,
+		pickerBuilder:   bb.pickerBuilder,
+		v2PickerBuilder: bb.v2PickerBuilder,
 
 		subConns: make(map[resolver.Address]balancer.SubConn),
 		scStates: make(map[balancer.SubConn]connectivity.State),
 		csEvltr:  &balancer.ConnectivityStateEvaluator{},
-		// Initialize picker to a picker that always return
-		// ErrNoSubConnAvailable, because when state of a SubConn changes, we
-		// may call UpdateBalancerState with this picker.
-		picker: NewErrPicker(balancer.ErrNoSubConnAvailable),
-		config: bb.config,
+		config:   bb.config,
 	}
+	// Initialize picker to a picker that always returns
+	// ErrNoSubConnAvailable, because when state of a SubConn changes, we
+	// may call UpdateState with this picker.
+	if bb.pickerBuilder != nil {
+		bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
+	} else {
+		bal.v2Picker = NewErrPickerV2(balancer.ErrNoSubConnAvailable)
+	}
+	return bal
 }
 
 func (bb *baseBuilder) Name() string {
@@ -56,8 +64,9 @@
 var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer
 
 type baseBalancer struct {
-	cc            balancer.ClientConn
-	pickerBuilder PickerBuilder
+	cc              balancer.ClientConn
+	pickerBuilder   PickerBuilder
+	v2PickerBuilder V2PickerBuilder
 
 	csEvltr *balancer.ConnectivityStateEvaluator
 	state   connectivity.State
@@ -65,6 +74,7 @@
 	subConns map[resolver.Address]balancer.SubConn
 	scStates map[balancer.SubConn]connectivity.State
 	picker   balancer.Picker
+	v2Picker balancer.V2Picker
 	config   Config
 }
 
@@ -72,8 +82,15 @@
 	panic("not implemented")
 }
 
-func (b *baseBalancer) ResolverError(error) {
-	// Ignore
+func (b *baseBalancer) ResolverError(err error) {
+	switch b.state {
+	case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
+		if b.picker != nil {
+			b.picker = NewErrPicker(err)
+		} else {
+			b.v2Picker = NewErrPickerV2(err)
+		}
+	}
 }
 
 func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
@@ -114,20 +131,44 @@
 // from it. The picker is
 //  - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
 //  - built by the pickerBuilder with all READY SubConns otherwise.
-func (b *baseBalancer) regeneratePicker() {
+func (b *baseBalancer) regeneratePicker(err error) {
 	if b.state == connectivity.TransientFailure {
-		b.picker = NewErrPicker(balancer.ErrTransientFailure)
+		if b.pickerBuilder != nil {
+			b.picker = NewErrPicker(balancer.ErrTransientFailure)
+		} else {
+			if err != nil {
+				b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err))
+			} else {
+				// This means the last subchannel transition was not to
+				// TransientFailure (otherwise err must be set), but the
+				// aggregate state of the balancer is TransientFailure, meaning
+				// there are no other addresses.
+				b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses")))
+			}
+		}
 		return
 	}
-	readySCs := make(map[resolver.Address]balancer.SubConn)
+	if b.pickerBuilder != nil {
+		readySCs := make(map[resolver.Address]balancer.SubConn)
 
-	// Filter out all ready SCs from full subConn map.
-	for addr, sc := range b.subConns {
-		if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
-			readySCs[addr] = sc
+		// Filter out all ready SCs from full subConn map.
+		for addr, sc := range b.subConns {
+			if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
+				readySCs[addr] = sc
+			}
 		}
+		b.picker = b.pickerBuilder.Build(readySCs)
+	} else {
+		readySCs := make(map[balancer.SubConn]SubConnInfo)
+
+		// Filter out all ready SCs from full subConn map.
+		for addr, sc := range b.subConns {
+			if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
+				readySCs[sc] = SubConnInfo{Address: addr}
+			}
+		}
+		b.v2Picker = b.v2PickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
 	}
-	b.picker = b.pickerBuilder.Build(readySCs)
 }
 
 func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
@@ -166,10 +207,14 @@
 	//  - the aggregated state of balancer became non-TransientFailure from TransientFailure
 	if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
 		(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
-		b.regeneratePicker()
+		b.regeneratePicker(state.ConnectionError)
 	}
 
-	b.cc.UpdateBalancerState(b.state, b.picker)
+	if b.picker != nil {
+		b.cc.UpdateBalancerState(b.state, b.picker)
+	} else {
+		b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.v2Picker})
+	}
 }
 
 // Close is a nop because base balancer doesn't have internal state to clean up,
@@ -186,6 +231,19 @@
 	err error // Pick() always returns this err.
 }
 
-func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
+func (p *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
 	return nil, nil, p.err
 }
+
+// NewErrPickerV2 returns a V2Picker that always returns err on Pick().
+func NewErrPickerV2(err error) balancer.V2Picker {
+	return &errPickerV2{err: err}
+}
+
+type errPickerV2 struct {
+	err error // Pick() always returns this err.
+}
+
+func (p *errPickerV2) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
+	return balancer.PickResult{}, p.err
+}
diff --git a/vendor/google.golang.org/grpc/balancer/base/base.go b/vendor/google.golang.org/grpc/balancer/base/base.go
index 34b1f29..4192918 100644
--- a/vendor/google.golang.org/grpc/balancer/base/base.go
+++ b/vendor/google.golang.org/grpc/balancer/base/base.go
@@ -42,6 +42,26 @@
 	Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
 }
 
+// V2PickerBuilder creates balancer.V2Picker.
+type V2PickerBuilder interface {
+	// Build returns a picker that will be used by gRPC to pick a SubConn.
+	Build(info PickerBuildInfo) balancer.V2Picker
+}
+
+// PickerBuildInfo contains information needed by the picker builder to
+// construct a picker.
+type PickerBuildInfo struct {
+	// ReadySCs is a map from all ready SubConns to the Addresses used to
+	// create them.
+	ReadySCs map[balancer.SubConn]SubConnInfo
+}
+
+// SubConnInfo contains information about a SubConn created by the base
+// balancer.
+type SubConnInfo struct {
+	Address resolver.Address // the address used to create this SubConn
+}
+
 // NewBalancerBuilder returns a balancer builder. The balancers
 // built by this builder will use the picker builder to build pickers.
 func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
@@ -62,3 +82,12 @@
 		config:        config,
 	}
 }
+
+// NewBalancerBuilderV2 returns a base balancer builder configured by the provided config.
+func NewBalancerBuilderV2(name string, pb V2PickerBuilder, config Config) balancer.Builder {
+	return &baseBuilder{
+		name:            name,
+		v2PickerBuilder: pb,
+		config:          config,
+	}
+}
diff --git a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
index 29f7a4d..d4d6455 100644
--- a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
+++ b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
@@ -22,14 +22,12 @@
 package roundrobin
 
 import (
-	"context"
 	"sync"
 
 	"google.golang.org/grpc/balancer"
 	"google.golang.org/grpc/balancer/base"
 	"google.golang.org/grpc/grpclog"
 	"google.golang.org/grpc/internal/grpcrand"
-	"google.golang.org/grpc/resolver"
 )
 
 // Name is the name of round_robin balancer.
@@ -37,7 +35,7 @@
 
 // newBuilder creates a new roundrobin balancer builder.
 func newBuilder() balancer.Builder {
-	return base.NewBalancerBuilderWithConfig(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
+	return base.NewBalancerBuilderV2(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
 }
 
 func init() {
@@ -46,13 +44,13 @@
 
 type rrPickerBuilder struct{}
 
-func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
-	grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
-	if len(readySCs) == 0 {
-		return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
+func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.V2Picker {
+	grpclog.Infof("roundrobinPicker: newPicker called with info: %v", info)
+	if len(info.ReadySCs) == 0 {
+		return base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)
 	}
 	var scs []balancer.SubConn
-	for _, sc := range readySCs {
+	for sc := range info.ReadySCs {
 		scs = append(scs, sc)
 	}
 	return &rrPicker{
@@ -74,10 +72,10 @@
 	next int
 }
 
-func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
+func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
 	p.mu.Lock()
 	sc := p.subConns[p.next]
 	p.next = (p.next + 1) % len(p.subConns)
 	p.mu.Unlock()
-	return sc, nil, nil
+	return balancer.PickResult{SubConn: sc}, nil
 }