[SEBA-930] update GRPC version to 1.27 and change kafka message producing
Change-Id: I14145a1351eb2523fa54e66381ad97abc5eedf50
diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go
index 917c242..9258858 100644
--- a/vendor/google.golang.org/grpc/balancer/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/balancer.go
@@ -117,6 +117,15 @@
HealthCheckEnabled bool
}
+// State contains the balancer's state relevant to the gRPC ClientConn.
+type State struct {
+ // State contains the connectivity state of the balancer, which is used to
+ // determine the state of the ClientConn.
+ ConnectivityState connectivity.State
+ // Picker is used to choose connections (SubConns) for RPCs.
+ Picker V2Picker
+}
+
// ClientConn represents a gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
@@ -137,10 +146,19 @@
//
// gRPC will update the connectivity state of the ClientConn, and will call pick
// on the new picker to pick new SubConn.
+ //
+ // Deprecated: use UpdateState instead
UpdateBalancerState(s connectivity.State, p Picker)
+ // UpdateState notifies gRPC that the balancer's internal state has
+ // changed.
+ //
+ // gRPC will update the connectivity state of the ClientConn, and will call pick
+ // on the new picker to pick new SubConns.
+ UpdateState(State)
+
// ResolveNow is called by balancer to notify gRPC to do a name resolving.
- ResolveNow(resolver.ResolveNowOption)
+ ResolveNow(resolver.ResolveNowOptions)
// Target returns the dial target for this ClientConn.
//
@@ -185,11 +203,14 @@
ParseConfig(LoadBalancingConfigJSON json.RawMessage) (serviceconfig.LoadBalancingConfig, error)
}
-// PickOptions contains addition information for the Pick operation.
-type PickOptions struct {
+// PickInfo contains additional information for the Pick operation.
+type PickInfo struct {
// FullMethodName is the method name that NewClientStream() is called
// with. The canonical format is /service/Method.
FullMethodName string
+ // Ctx is the RPC's context, and may contain relevant RPC-level information
+ // like the outgoing header metadata.
+ Ctx context.Context
}
// DoneInfo contains additional information for done.
@@ -215,7 +236,7 @@
ErrNoSubConnAvailable = errors.New("no SubConn is available")
// ErrTransientFailure indicates all SubConns are in TransientFailure.
// WaitForReady RPCs will block, non-WaitForReady RPCs will fail.
- ErrTransientFailure = errors.New("all SubConns are in TransientFailure")
+ ErrTransientFailure = TransientFailureError(errors.New("all SubConns are in TransientFailure"))
)
// Picker is used by gRPC to pick a SubConn to send an RPC.
@@ -223,6 +244,8 @@
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
+//
+// Deprecated: use V2Picker instead
type Picker interface {
// Pick returns the SubConn to be used to send the RPC.
// The returned SubConn must be one returned by NewSubConn().
@@ -243,18 +266,76 @@
//
// If the returned error is not nil:
// - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
- // - If the error is ErrTransientFailure:
+ // - If the error is ErrTransientFailure or implements IsTransientFailure()
+ // bool, returning true:
// - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState()
// is called to pick again;
// - Otherwise, RPC will fail with unavailable error.
// - Else (error is other non-nil error):
- // - The RPC will fail with unavailable error.
+ // - The RPC will fail with the error's status code, or Unknown if it is
+ // not a status error.
//
// The returned done() function will be called once the rpc has finished,
// with the final status of that RPC. If the SubConn returned is not a
// valid SubConn type, done may not be called. done may be nil if balancer
// doesn't care about the RPC status.
- Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
+ Pick(ctx context.Context, info PickInfo) (conn SubConn, done func(DoneInfo), err error)
+}
+
+// PickResult contains information related to a connection chosen for an RPC.
+type PickResult struct {
+ // SubConn is the connection to use for this pick, if its state is Ready.
+ // If the state is not Ready, gRPC will block the RPC until a new Picker is
+ // provided by the balancer (using ClientConn.UpdateState). The SubConn
+ // must be one returned by ClientConn.NewSubConn.
+ SubConn SubConn
+
+ // Done is called when the RPC is completed. If the SubConn is not ready,
+ // this will be called with a nil parameter. If the SubConn is not a valid
+ // type, Done may not be called. May be nil if the balancer does not wish
+ // to be notified when the RPC completes.
+ Done func(DoneInfo)
+}
+
+type transientFailureError struct {
+ error
+}
+
+func (e *transientFailureError) IsTransientFailure() bool { return true }
+
+// TransientFailureError wraps err in an error implementing
+// IsTransientFailure() bool, returning true.
+func TransientFailureError(err error) error {
+ return &transientFailureError{error: err}
+}
+
+// V2Picker is used by gRPC to pick a SubConn to send an RPC.
+// Balancer is expected to generate a new picker from its snapshot every time its
+// internal state has changed.
+//
+// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
+type V2Picker interface {
+ // Pick returns the connection to use for this RPC and related information.
+ //
+ // Pick should not block. If the balancer needs to do I/O or any blocking
+ // or time-consuming work to service this call, it should return
+ // ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
+ // the Picker is updated (using ClientConn.UpdateState).
+ //
+ // If an error is returned:
+ //
+ // - If the error is ErrNoSubConnAvailable, gRPC will block until a new
+ // Picker is provided by the balancer (using ClientConn.UpdateState).
+ //
+ // - If the error implements IsTransientFailure() bool, returning true,
+ // wait for ready RPCs will wait, but non-wait for ready RPCs will be
+ // terminated with this error's Error() string and status code
+ // Unavailable.
+ //
+ // - Any other errors terminate all RPCs with the code and message
+ // provided. If the error is not a status error, it will be converted by
+ // gRPC to a status error with code Unknown.
+ Pick(info PickInfo) (PickResult, error)
}
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
@@ -292,8 +373,11 @@
// SubConnState describes the state of a SubConn.
type SubConnState struct {
+ // ConnectivityState is the connectivity state of the SubConn.
ConnectivityState connectivity.State
- // TODO: add last connection error
+ // ConnectionError is set if the ConnectivityState is TransientFailure,
+ // describing the reason the SubConn failed. Otherwise, it is nil.
+ ConnectionError error
}
// ClientConnState describes the state of a ClientConn relevant to the
@@ -335,9 +419,8 @@
//
// It's not thread safe.
type ConnectivityStateEvaluator struct {
- numReady uint64 // Number of addrConns in ready state.
- numConnecting uint64 // Number of addrConns in connecting state.
- numTransientFailure uint64 // Number of addrConns in transientFailure.
+ numReady uint64 // Number of addrConns in ready state.
+ numConnecting uint64 // Number of addrConns in connecting state.
}
// RecordTransition records state change happening in subConn and based on that
@@ -357,8 +440,6 @@
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
- case connectivity.TransientFailure:
- cse.numTransientFailure += updateVal
}
}
diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go
index 1a5c1aa..d952f09 100644
--- a/vendor/google.golang.org/grpc/balancer/base/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go
@@ -20,6 +20,7 @@
import (
"context"
+ "errors"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
@@ -28,25 +29,32 @@
)
type baseBuilder struct {
- name string
- pickerBuilder PickerBuilder
- config Config
+ name string
+ pickerBuilder PickerBuilder
+ v2PickerBuilder V2PickerBuilder
+ config Config
}
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
- return &baseBalancer{
- cc: cc,
- pickerBuilder: bb.pickerBuilder,
+ bal := &baseBalancer{
+ cc: cc,
+ pickerBuilder: bb.pickerBuilder,
+ v2PickerBuilder: bb.v2PickerBuilder,
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
- // Initialize picker to a picker that always return
- // ErrNoSubConnAvailable, because when state of a SubConn changes, we
- // may call UpdateBalancerState with this picker.
- picker: NewErrPicker(balancer.ErrNoSubConnAvailable),
- config: bb.config,
+ config: bb.config,
}
+ // Initialize picker to a picker that always returns
+ // ErrNoSubConnAvailable, because when state of a SubConn changes, we
+ // may call UpdateState with this picker.
+ if bb.pickerBuilder != nil {
+ bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
+ } else {
+ bal.v2Picker = NewErrPickerV2(balancer.ErrNoSubConnAvailable)
+ }
+ return bal
}
func (bb *baseBuilder) Name() string {
@@ -56,8 +64,9 @@
var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer
type baseBalancer struct {
- cc balancer.ClientConn
- pickerBuilder PickerBuilder
+ cc balancer.ClientConn
+ pickerBuilder PickerBuilder
+ v2PickerBuilder V2PickerBuilder
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
@@ -65,6 +74,7 @@
subConns map[resolver.Address]balancer.SubConn
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
+ v2Picker balancer.V2Picker
config Config
}
@@ -72,8 +82,15 @@
panic("not implemented")
}
-func (b *baseBalancer) ResolverError(error) {
- // Ignore
+func (b *baseBalancer) ResolverError(err error) {
+ switch b.state {
+ case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
+ if b.picker != nil {
+ b.picker = NewErrPicker(err)
+ } else {
+ b.v2Picker = NewErrPickerV2(err)
+ }
+ }
}
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
@@ -114,20 +131,44 @@
// from it. The picker is
// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
// - built by the pickerBuilder with all READY SubConns otherwise.
-func (b *baseBalancer) regeneratePicker() {
+func (b *baseBalancer) regeneratePicker(err error) {
if b.state == connectivity.TransientFailure {
- b.picker = NewErrPicker(balancer.ErrTransientFailure)
+ if b.pickerBuilder != nil {
+ b.picker = NewErrPicker(balancer.ErrTransientFailure)
+ } else {
+ if err != nil {
+ b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err))
+ } else {
+ // This means the last subchannel transition was not to
+ // TransientFailure (otherwise err must be set), but the
+ // aggregate state of the balancer is TransientFailure, meaning
+ // there are no other addresses.
+ b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses")))
+ }
+ }
return
}
- readySCs := make(map[resolver.Address]balancer.SubConn)
+ if b.pickerBuilder != nil {
+ readySCs := make(map[resolver.Address]balancer.SubConn)
- // Filter out all ready SCs from full subConn map.
- for addr, sc := range b.subConns {
- if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
- readySCs[addr] = sc
+ // Filter out all ready SCs from full subConn map.
+ for addr, sc := range b.subConns {
+ if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
+ readySCs[addr] = sc
+ }
}
+ b.picker = b.pickerBuilder.Build(readySCs)
+ } else {
+ readySCs := make(map[balancer.SubConn]SubConnInfo)
+
+ // Filter out all ready SCs from full subConn map.
+ for addr, sc := range b.subConns {
+ if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
+ readySCs[sc] = SubConnInfo{Address: addr}
+ }
+ }
+ b.v2Picker = b.v2PickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}
- b.picker = b.pickerBuilder.Build(readySCs)
}
func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
@@ -166,10 +207,14 @@
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
- b.regeneratePicker()
+ b.regeneratePicker(state.ConnectionError)
}
- b.cc.UpdateBalancerState(b.state, b.picker)
+ if b.picker != nil {
+ b.cc.UpdateBalancerState(b.state, b.picker)
+ } else {
+ b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.v2Picker})
+ }
}
// Close is a nop because base balancer doesn't have internal state to clean up,
@@ -186,6 +231,19 @@
err error // Pick() always returns this err.
}
-func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
+func (p *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, p.err
}
+
+// NewErrPickerV2 returns a V2Picker that always returns err on Pick().
+func NewErrPickerV2(err error) balancer.V2Picker {
+ return &errPickerV2{err: err}
+}
+
+type errPickerV2 struct {
+ err error // Pick() always returns this err.
+}
+
+func (p *errPickerV2) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
+ return balancer.PickResult{}, p.err
+}
diff --git a/vendor/google.golang.org/grpc/balancer/base/base.go b/vendor/google.golang.org/grpc/balancer/base/base.go
index 34b1f29..4192918 100644
--- a/vendor/google.golang.org/grpc/balancer/base/base.go
+++ b/vendor/google.golang.org/grpc/balancer/base/base.go
@@ -42,6 +42,26 @@
Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
}
+// V2PickerBuilder creates balancer.V2Picker.
+type V2PickerBuilder interface {
+ // Build returns a picker that will be used by gRPC to pick a SubConn.
+ Build(info PickerBuildInfo) balancer.V2Picker
+}
+
+// PickerBuildInfo contains information needed by the picker builder to
+// construct a picker.
+type PickerBuildInfo struct {
+ // ReadySCs is a map from all ready SubConns to the Addresses used to
+ // create them.
+ ReadySCs map[balancer.SubConn]SubConnInfo
+}
+
+// SubConnInfo contains information about a SubConn created by the base
+// balancer.
+type SubConnInfo struct {
+ Address resolver.Address // the address used to create this SubConn
+}
+
// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
@@ -62,3 +82,12 @@
config: config,
}
}
+
+// NewBalancerBuilderV2 returns a base balancer builder configured by the provided config.
+func NewBalancerBuilderV2(name string, pb V2PickerBuilder, config Config) balancer.Builder {
+ return &baseBuilder{
+ name: name,
+ v2PickerBuilder: pb,
+ config: config,
+ }
+}
diff --git a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
index 29f7a4d..d4d6455 100644
--- a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
+++ b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go
@@ -22,14 +22,12 @@
package roundrobin
import (
- "context"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcrand"
- "google.golang.org/grpc/resolver"
)
// Name is the name of round_robin balancer.
@@ -37,7 +35,7 @@
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
- return base.NewBalancerBuilderWithConfig(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
+ return base.NewBalancerBuilderV2(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
}
func init() {
@@ -46,13 +44,13 @@
type rrPickerBuilder struct{}
-func (*rrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
- grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
- if len(readySCs) == 0 {
- return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
+func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.V2Picker {
+ grpclog.Infof("roundrobinPicker: newPicker called with info: %v", info)
+ if len(info.ReadySCs) == 0 {
+ return base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)
}
var scs []balancer.SubConn
- for _, sc := range readySCs {
+ for sc := range info.ReadySCs {
scs = append(scs, sc)
}
return &rrPicker{
@@ -74,10 +72,10 @@
next int
}
-func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
+func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock()
- return sc, nil, nil
+ return balancer.PickResult{SubConn: sc}, nil
}