VOL-1845 : Support for delete device in openolt adapter
This commit is for the handling of delete device.
The changes are done to handle the request for delete
device. This includes the clearing of all data related
to the device in KV store and reboot of device to reset
the device.
This commit has dependency in voltha-go so that needs to
be merged first. Please refer this review link
https://gerrit.opencord.org/#/c/15084/
Updated to dep ensure above voltha-go patch set. Also typo
and make lint/sca fixes.
Change-Id: I53f16022c6902d498dad30e9b7d0ff50bf156347
diff --git a/vendor/google.golang.org/grpc/balancer.go b/vendor/google.golang.org/grpc/balancer.go
index a78e702..a8eb0f4 100644
--- a/vendor/google.golang.org/grpc/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer.go
@@ -43,7 +43,7 @@
// BalancerConfig specifies the configurations for Balancer.
//
-// Deprecated: please use package balancer.
+// Deprecated: please use package balancer. May be removed in a future 1.x release.
type BalancerConfig struct {
// DialCreds is the transport credential the Balancer implementation can
// use to dial to a remote load balancer server. The Balancer implementations
@@ -57,7 +57,7 @@
// BalancerGetOptions configures a Get call.
//
-// Deprecated: please use package balancer.
+// Deprecated: please use package balancer. May be removed in a future 1.x release.
type BalancerGetOptions struct {
// BlockingWait specifies whether Get should block when there is no
// connected address.
@@ -66,7 +66,7 @@
// Balancer chooses network addresses for RPCs.
//
-// Deprecated: please use package balancer.
+// Deprecated: please use package balancer. May be removed in a future 1.x release.
type Balancer interface {
// Start does the initialization work to bootstrap a Balancer. For example,
// this function may start the name resolution and watch the updates. It will
@@ -120,7 +120,7 @@
// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch
// the name resolution updates and updates the addresses available correspondingly.
//
-// Deprecated: please use package balancer/roundrobin.
+// Deprecated: please use package balancer/roundrobin. May be removed in a future 1.x release.
func RoundRobin(r naming.Resolver) Balancer {
return &roundRobin{r: r}
}
diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go
index e587d8d..1af88f0 100644
--- a/vendor/google.golang.org/grpc/balancer/base/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go
@@ -73,7 +73,9 @@
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
// TODO: handle s.ResolverState.Err (log if not nil) once implemented.
// TODO: handle s.ResolverState.ServiceConfig?
- grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
+ if grpclog.V(2) {
+ grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
+ }
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range s.ResolverState.Addresses {
@@ -127,10 +129,14 @@
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
- grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
+ if grpclog.V(2) {
+ grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
+ }
oldS, ok := b.scStates[sc]
if !ok {
- grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
+ if grpclog.V(2) {
+ grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
+ }
return
}
b.scStates[sc] = s
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index 7bc6621..8df4095 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -183,7 +183,7 @@
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
if ccb.cc.curBalancerName != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
- s := ccs.ResolverState
+ s := &ccs.ResolverState
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 84e31a2..a7643df 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -38,7 +38,6 @@
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
- "google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
@@ -1061,8 +1060,8 @@
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
- newTr.Close()
ac.mu.Unlock()
+ newTr.Close()
return
}
ac.curAddr = addr
@@ -1077,20 +1076,16 @@
// we restart from the top of the addr list.
<-reconnect.Done()
hcancel()
-
- // Need to reconnect after a READY, the addrConn enters
- // TRANSIENT_FAILURE.
+ // restart connecting - the top of the loop will set state to
+ // CONNECTING. This is against the current connectivity semantics doc,
+ // however it allows for graceful behavior for RPCs not yet dispatched
+ // - unfortunate timing would otherwise lead to the RPC failing even
+ // though the TRANSIENT_FAILURE state (called for by the doc) would be
+ // instantaneous.
//
- // This will set addrConn to TRANSIENT_FAILURE for a very short period
- // of time, and turns CONNECTING. It seems reasonable to skip this, but
- // READY-CONNECTING is not a valid transition.
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
- ac.updateConnectivityState(connectivity.TransientFailure)
- ac.mu.Unlock()
+ // Ideally we should transition to Idle here and block until there is
+ // RPC activity that leads to the balancer requesting a reconnect of
+ // the associated SubConn.
}
}
@@ -1147,14 +1142,35 @@
Authority: ac.cc.authority,
}
+ once := sync.Once{}
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
+ once.Do(func() {
+ if ac.state == connectivity.Ready {
+ // Prevent this SubConn from being used for new RPCs by setting its
+ // state to Connecting.
+ //
+ // TODO: this should be Idle when grpc-go properly supports it.
+ ac.updateConnectivityState(connectivity.Connecting)
+ }
+ })
ac.mu.Unlock()
reconnect.Fire()
}
onClose := func() {
+ ac.mu.Lock()
+ once.Do(func() {
+ if ac.state == connectivity.Ready {
+ // Prevent this SubConn from being used for new RPCs by setting its
+ // state to Connecting.
+ //
+ // TODO: this should be Idle when grpc-go properly supports it.
+ ac.updateConnectivityState(connectivity.Connecting)
+ }
+ })
+ ac.mu.Unlock()
close(onCloseCalled)
reconnect.Fire()
}
@@ -1176,20 +1192,18 @@
return nil, nil, err
}
- if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
- select {
- case <-time.After(connectDeadline.Sub(time.Now())):
- // We didn't get the preface in time.
- newTr.Close()
- grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
- return nil, nil, errors.New("timed out waiting for server handshake")
- case <-prefaceReceived:
- // We got the preface - huzzah! things are good.
- case <-onCloseCalled:
- // The transport has already closed - noop.
- return nil, nil, errors.New("connection closed")
- // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
- }
+ select {
+ case <-time.After(connectDeadline.Sub(time.Now())):
+ // We didn't get the preface in time.
+ newTr.Close()
+ grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
+ return nil, nil, errors.New("timed out waiting for server handshake")
+ case <-prefaceReceived:
+ // We got the preface - huzzah! things are good.
+ case <-onCloseCalled:
+ // The transport has already closed - noop.
+ return nil, nil, errors.New("connection closed")
+ // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
return newTr, reconnect, nil
}
diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go
index 69c0031..e8f34d0 100644
--- a/vendor/google.golang.org/grpc/dialoptions.go
+++ b/vendor/google.golang.org/grpc/dialoptions.go
@@ -60,7 +60,6 @@
balancerBuilder balancer.Builder
// This is to support grpclb.
resolverBuilder resolver.Builder
- reqHandshake envconfig.RequireHandshakeSetting
channelzParentID int64
disableServiceConfig bool
disableRetry bool
@@ -100,17 +99,6 @@
}
}
-// WithWaitForHandshake blocks until the initial settings frame is received from
-// the server before assigning RPCs to the connection.
-//
-// Deprecated: this is the default behavior, and this option will be removed
-// after the 1.18 release.
-func WithWaitForHandshake() DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.reqHandshake = envconfig.RequireHandshakeOn
- })
-}
-
// WithWriteBufferSize determines how much data can be batched before doing a
// write on the wire. The corresponding memory allocation for this buffer will
// be twice the size to keep syscalls low. The default value for this buffer is
@@ -156,7 +144,8 @@
// WithMaxMsgSize returns a DialOption which sets the maximum message size the
// client can receive.
//
-// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
+// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. Will
+// be supported throughout 1.x.
func WithMaxMsgSize(s int) DialOption {
return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
}
@@ -172,7 +161,8 @@
// WithCodec returns a DialOption which sets a codec for message marshaling and
// unmarshaling.
//
-// Deprecated: use WithDefaultCallOptions(ForceCodec(_)) instead.
+// Deprecated: use WithDefaultCallOptions(ForceCodec(_)) instead. Will be
+// supported throughout 1.x.
func WithCodec(c Codec) DialOption {
return WithDefaultCallOptions(CallCustomCodec(c))
}
@@ -181,7 +171,7 @@
// message compression. It has lower priority than the compressor set by the
// UseCompressor CallOption.
//
-// Deprecated: use UseCompressor instead.
+// Deprecated: use UseCompressor instead. Will be supported throughout 1.x.
func WithCompressor(cp Compressor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.cp = cp
@@ -196,7 +186,8 @@
// message. If no compressor is registered for the encoding, an Unimplemented
// status error will be returned.
//
-// Deprecated: use encoding.RegisterCompressor instead.
+// Deprecated: use encoding.RegisterCompressor instead. Will be supported
+// throughout 1.x.
func WithDecompressor(dc Decompressor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.dc = dc
@@ -207,7 +198,7 @@
// Name resolver will be ignored if this DialOption is specified.
//
// Deprecated: use the new balancer APIs in balancer package and
-// WithBalancerName.
+// WithBalancerName. Will be removed in a future 1.x release.
func WithBalancer(b Balancer) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.balancerBuilder = &balancerWrapperBuilder{
@@ -223,7 +214,8 @@
// The balancer cannot be overridden by balancer option specified by service
// config.
//
-// This is an EXPERIMENTAL API.
+// Deprecated: use WithDefaultServiceConfig and WithDisableServiceConfig
+// instead. Will be removed in a future 1.x release.
func WithBalancerName(balancerName string) DialOption {
builder := balancer.Get(balancerName)
if builder == nil {
@@ -244,9 +236,10 @@
// WithServiceConfig returns a DialOption which has a channel to read the
// service configuration.
//
-// Deprecated: service config should be received through name resolver, as
-// specified here.
-// https://github.com/grpc/grpc/blob/master/doc/service_config.md
+// Deprecated: service config should be received through name resolver or via
+// WithDefaultServiceConfig, as specified at
+// https://github.com/grpc/grpc/blob/master/doc/service_config.md. Will be
+// removed in a future 1.x release.
func WithServiceConfig(c <-chan ServiceConfig) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.scChan = c
@@ -329,7 +322,8 @@
// WithTimeout returns a DialOption that configures a timeout for dialing a
// ClientConn initially. This is valid if and only if WithBlock() is present.
//
-// Deprecated: use DialContext and context.WithTimeout instead.
+// Deprecated: use DialContext and context.WithTimeout instead. Will be
+// supported throughout 1.x.
func WithTimeout(d time.Duration) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.timeout = d
@@ -356,7 +350,8 @@
// is returned by f, gRPC checks the error's Temporary() method to decide if it
// should try to reconnect to the network address.
//
-// Deprecated: use WithContextDialer instead
+// Deprecated: use WithContextDialer instead. Will be supported throughout
+// 1.x.
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
return WithContextDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
@@ -480,8 +475,10 @@
// WithDefaultServiceConfig returns a DialOption that configures the default
// service config, which will be used in cases where:
-// 1. WithDisableServiceConfig is called.
-// 2. Resolver does not return service config or if the resolver gets and invalid config.
+//
+// 1. WithDisableServiceConfig is also used.
+// 2. Resolver does not return a service config or if the resolver returns an
+// invalid service config.
//
// This API is EXPERIMENTAL.
func WithDefaultServiceConfig(s string) DialOption {
@@ -537,7 +534,6 @@
func defaultDialOptions() dialOptions {
return dialOptions{
disableRetry: !envconfig.Retry,
- reqHandshake: envconfig.RequireHandshake,
healthCheckFunc: internal.HealthCheckFunc,
copts: transport.ConnectOptions{
WriteBufferSize: defaultWriteBufSize,
diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go
deleted file mode 100644
index c2f2c77..0000000
--- a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go
+++ /dev/null
@@ -1,327 +0,0 @@
-// Code generated by protoc-gen-go. DO NOT EDIT.
-// source: grpc/health/v1/health.proto
-
-package grpc_health_v1 // import "google.golang.org/grpc/health/grpc_health_v1"
-
-import proto "github.com/golang/protobuf/proto"
-import fmt "fmt"
-import math "math"
-
-import (
- context "golang.org/x/net/context"
- grpc "google.golang.org/grpc"
-)
-
-// Reference imports to suppress errors if they are not otherwise used.
-var _ = proto.Marshal
-var _ = fmt.Errorf
-var _ = math.Inf
-
-// This is a compile-time assertion to ensure that this generated file
-// is compatible with the proto package it is being compiled against.
-// A compilation error at this line likely means your copy of the
-// proto package needs to be updated.
-const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
-
-type HealthCheckResponse_ServingStatus int32
-
-const (
- HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0
- HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1
- HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2
- HealthCheckResponse_SERVICE_UNKNOWN HealthCheckResponse_ServingStatus = 3
-)
-
-var HealthCheckResponse_ServingStatus_name = map[int32]string{
- 0: "UNKNOWN",
- 1: "SERVING",
- 2: "NOT_SERVING",
- 3: "SERVICE_UNKNOWN",
-}
-var HealthCheckResponse_ServingStatus_value = map[string]int32{
- "UNKNOWN": 0,
- "SERVING": 1,
- "NOT_SERVING": 2,
- "SERVICE_UNKNOWN": 3,
-}
-
-func (x HealthCheckResponse_ServingStatus) String() string {
- return proto.EnumName(HealthCheckResponse_ServingStatus_name, int32(x))
-}
-func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor_health_6b1a06aa67f91efd, []int{1, 0}
-}
-
-type HealthCheckRequest struct {
- Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
-}
-
-func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} }
-func (m *HealthCheckRequest) String() string { return proto.CompactTextString(m) }
-func (*HealthCheckRequest) ProtoMessage() {}
-func (*HealthCheckRequest) Descriptor() ([]byte, []int) {
- return fileDescriptor_health_6b1a06aa67f91efd, []int{0}
-}
-func (m *HealthCheckRequest) XXX_Unmarshal(b []byte) error {
- return xxx_messageInfo_HealthCheckRequest.Unmarshal(m, b)
-}
-func (m *HealthCheckRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
- return xxx_messageInfo_HealthCheckRequest.Marshal(b, m, deterministic)
-}
-func (dst *HealthCheckRequest) XXX_Merge(src proto.Message) {
- xxx_messageInfo_HealthCheckRequest.Merge(dst, src)
-}
-func (m *HealthCheckRequest) XXX_Size() int {
- return xxx_messageInfo_HealthCheckRequest.Size(m)
-}
-func (m *HealthCheckRequest) XXX_DiscardUnknown() {
- xxx_messageInfo_HealthCheckRequest.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_HealthCheckRequest proto.InternalMessageInfo
-
-func (m *HealthCheckRequest) GetService() string {
- if m != nil {
- return m.Service
- }
- return ""
-}
-
-type HealthCheckResponse struct {
- Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,proto3,enum=grpc.health.v1.HealthCheckResponse_ServingStatus" json:"status,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
-}
-
-func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} }
-func (m *HealthCheckResponse) String() string { return proto.CompactTextString(m) }
-func (*HealthCheckResponse) ProtoMessage() {}
-func (*HealthCheckResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_health_6b1a06aa67f91efd, []int{1}
-}
-func (m *HealthCheckResponse) XXX_Unmarshal(b []byte) error {
- return xxx_messageInfo_HealthCheckResponse.Unmarshal(m, b)
-}
-func (m *HealthCheckResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
- return xxx_messageInfo_HealthCheckResponse.Marshal(b, m, deterministic)
-}
-func (dst *HealthCheckResponse) XXX_Merge(src proto.Message) {
- xxx_messageInfo_HealthCheckResponse.Merge(dst, src)
-}
-func (m *HealthCheckResponse) XXX_Size() int {
- return xxx_messageInfo_HealthCheckResponse.Size(m)
-}
-func (m *HealthCheckResponse) XXX_DiscardUnknown() {
- xxx_messageInfo_HealthCheckResponse.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_HealthCheckResponse proto.InternalMessageInfo
-
-func (m *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus {
- if m != nil {
- return m.Status
- }
- return HealthCheckResponse_UNKNOWN
-}
-
-func init() {
- proto.RegisterType((*HealthCheckRequest)(nil), "grpc.health.v1.HealthCheckRequest")
- proto.RegisterType((*HealthCheckResponse)(nil), "grpc.health.v1.HealthCheckResponse")
- proto.RegisterEnum("grpc.health.v1.HealthCheckResponse_ServingStatus", HealthCheckResponse_ServingStatus_name, HealthCheckResponse_ServingStatus_value)
-}
-
-// Reference imports to suppress errors if they are not otherwise used.
-var _ context.Context
-var _ grpc.ClientConn
-
-// This is a compile-time assertion to ensure that this generated file
-// is compatible with the grpc package it is being compiled against.
-const _ = grpc.SupportPackageIsVersion4
-
-// HealthClient is the client API for Health service.
-//
-// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
-type HealthClient interface {
- // If the requested service is unknown, the call will fail with status
- // NOT_FOUND.
- Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
- // Performs a watch for the serving status of the requested service.
- // The server will immediately send back a message indicating the current
- // serving status. It will then subsequently send a new message whenever
- // the service's serving status changes.
- //
- // If the requested service is unknown when the call is received, the
- // server will send a message setting the serving status to
- // SERVICE_UNKNOWN but will *not* terminate the call. If at some
- // future point, the serving status of the service becomes known, the
- // server will send a new message with the service's serving status.
- //
- // If the call terminates with status UNIMPLEMENTED, then clients
- // should assume this method is not supported and should not retry the
- // call. If the call terminates with any other status (including OK),
- // clients should retry the call with appropriate exponential backoff.
- Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error)
-}
-
-type healthClient struct {
- cc *grpc.ClientConn
-}
-
-func NewHealthClient(cc *grpc.ClientConn) HealthClient {
- return &healthClient{cc}
-}
-
-func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
- out := new(HealthCheckResponse)
- err := c.cc.Invoke(ctx, "/grpc.health.v1.Health/Check", in, out, opts...)
- if err != nil {
- return nil, err
- }
- return out, nil
-}
-
-func (c *healthClient) Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) {
- stream, err := c.cc.NewStream(ctx, &_Health_serviceDesc.Streams[0], "/grpc.health.v1.Health/Watch", opts...)
- if err != nil {
- return nil, err
- }
- x := &healthWatchClient{stream}
- if err := x.ClientStream.SendMsg(in); err != nil {
- return nil, err
- }
- if err := x.ClientStream.CloseSend(); err != nil {
- return nil, err
- }
- return x, nil
-}
-
-type Health_WatchClient interface {
- Recv() (*HealthCheckResponse, error)
- grpc.ClientStream
-}
-
-type healthWatchClient struct {
- grpc.ClientStream
-}
-
-func (x *healthWatchClient) Recv() (*HealthCheckResponse, error) {
- m := new(HealthCheckResponse)
- if err := x.ClientStream.RecvMsg(m); err != nil {
- return nil, err
- }
- return m, nil
-}
-
-// HealthServer is the server API for Health service.
-type HealthServer interface {
- // If the requested service is unknown, the call will fail with status
- // NOT_FOUND.
- Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
- // Performs a watch for the serving status of the requested service.
- // The server will immediately send back a message indicating the current
- // serving status. It will then subsequently send a new message whenever
- // the service's serving status changes.
- //
- // If the requested service is unknown when the call is received, the
- // server will send a message setting the serving status to
- // SERVICE_UNKNOWN but will *not* terminate the call. If at some
- // future point, the serving status of the service becomes known, the
- // server will send a new message with the service's serving status.
- //
- // If the call terminates with status UNIMPLEMENTED, then clients
- // should assume this method is not supported and should not retry the
- // call. If the call terminates with any other status (including OK),
- // clients should retry the call with appropriate exponential backoff.
- Watch(*HealthCheckRequest, Health_WatchServer) error
-}
-
-func RegisterHealthServer(s *grpc.Server, srv HealthServer) {
- s.RegisterService(&_Health_serviceDesc, srv)
-}
-
-func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
- in := new(HealthCheckRequest)
- if err := dec(in); err != nil {
- return nil, err
- }
- if interceptor == nil {
- return srv.(HealthServer).Check(ctx, in)
- }
- info := &grpc.UnaryServerInfo{
- Server: srv,
- FullMethod: "/grpc.health.v1.Health/Check",
- }
- handler := func(ctx context.Context, req interface{}) (interface{}, error) {
- return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest))
- }
- return interceptor(ctx, in, info, handler)
-}
-
-func _Health_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
- m := new(HealthCheckRequest)
- if err := stream.RecvMsg(m); err != nil {
- return err
- }
- return srv.(HealthServer).Watch(m, &healthWatchServer{stream})
-}
-
-type Health_WatchServer interface {
- Send(*HealthCheckResponse) error
- grpc.ServerStream
-}
-
-type healthWatchServer struct {
- grpc.ServerStream
-}
-
-func (x *healthWatchServer) Send(m *HealthCheckResponse) error {
- return x.ServerStream.SendMsg(m)
-}
-
-var _Health_serviceDesc = grpc.ServiceDesc{
- ServiceName: "grpc.health.v1.Health",
- HandlerType: (*HealthServer)(nil),
- Methods: []grpc.MethodDesc{
- {
- MethodName: "Check",
- Handler: _Health_Check_Handler,
- },
- },
- Streams: []grpc.StreamDesc{
- {
- StreamName: "Watch",
- Handler: _Health_Watch_Handler,
- ServerStreams: true,
- },
- },
- Metadata: "grpc/health/v1/health.proto",
-}
-
-func init() { proto.RegisterFile("grpc/health/v1/health.proto", fileDescriptor_health_6b1a06aa67f91efd) }
-
-var fileDescriptor_health_6b1a06aa67f91efd = []byte{
- // 297 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x4e, 0x2f, 0x2a, 0x48,
- 0xd6, 0xcf, 0x48, 0x4d, 0xcc, 0x29, 0xc9, 0xd0, 0x2f, 0x33, 0x84, 0xb2, 0xf4, 0x0a, 0x8a, 0xf2,
- 0x4b, 0xf2, 0x85, 0xf8, 0x40, 0x92, 0x7a, 0x50, 0xa1, 0x32, 0x43, 0x25, 0x3d, 0x2e, 0x21, 0x0f,
- 0x30, 0xc7, 0x39, 0x23, 0x35, 0x39, 0x3b, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x82,
- 0x8b, 0xbd, 0x38, 0xb5, 0xa8, 0x2c, 0x33, 0x39, 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x33, 0x08,
- 0xc6, 0x55, 0xda, 0xc8, 0xc8, 0x25, 0x8c, 0xa2, 0xa1, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x55, 0xc8,
- 0x93, 0x8b, 0xad, 0xb8, 0x24, 0xb1, 0xa4, 0xb4, 0x18, 0xac, 0x81, 0xcf, 0xc8, 0x50, 0x0f, 0xd5,
- 0x22, 0x3d, 0x2c, 0x9a, 0xf4, 0x82, 0x41, 0x86, 0xe6, 0xa5, 0x07, 0x83, 0x35, 0x06, 0x41, 0x0d,
- 0x50, 0xf2, 0xe7, 0xe2, 0x45, 0x91, 0x10, 0xe2, 0xe6, 0x62, 0x0f, 0xf5, 0xf3, 0xf6, 0xf3, 0x0f,
- 0xf7, 0x13, 0x60, 0x00, 0x71, 0x82, 0x5d, 0x83, 0xc2, 0x3c, 0xfd, 0xdc, 0x05, 0x18, 0x85, 0xf8,
- 0xb9, 0xb8, 0xfd, 0xfc, 0x43, 0xe2, 0x61, 0x02, 0x4c, 0x42, 0xc2, 0x5c, 0xfc, 0x60, 0x8e, 0xb3,
- 0x6b, 0x3c, 0x4c, 0x0b, 0xb3, 0xd1, 0x3a, 0x46, 0x2e, 0x36, 0x88, 0xf5, 0x42, 0x01, 0x5c, 0xac,
- 0x60, 0x27, 0x08, 0x29, 0xe1, 0x75, 0x1f, 0x38, 0x14, 0xa4, 0x94, 0x89, 0xf0, 0x83, 0x50, 0x10,
- 0x17, 0x6b, 0x78, 0x62, 0x49, 0x72, 0x06, 0xd5, 0x4c, 0x34, 0x60, 0x74, 0x4a, 0xe4, 0x12, 0xcc,
- 0xcc, 0x47, 0x53, 0xea, 0xc4, 0x0d, 0x51, 0x1b, 0x00, 0x8a, 0xc6, 0x00, 0xc6, 0x28, 0x9d, 0xf4,
- 0xfc, 0xfc, 0xf4, 0x9c, 0x54, 0xbd, 0xf4, 0xfc, 0x9c, 0xc4, 0xbc, 0x74, 0xbd, 0xfc, 0xa2, 0x74,
- 0x7d, 0xe4, 0x78, 0x07, 0xb1, 0xe3, 0x21, 0xec, 0xf8, 0x32, 0xc3, 0x55, 0x4c, 0x7c, 0xee, 0x20,
- 0xd3, 0x20, 0x46, 0xe8, 0x85, 0x19, 0x26, 0xb1, 0x81, 0x93, 0x83, 0x31, 0x20, 0x00, 0x00, 0xff,
- 0xff, 0x12, 0x7d, 0x96, 0xcb, 0x2d, 0x02, 0x00, 0x00,
-}
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index 11be7cd..3ee8740 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -25,40 +25,11 @@
)
const (
- prefix = "GRPC_GO_"
- retryStr = prefix + "RETRY"
- requireHandshakeStr = prefix + "REQUIRE_HANDSHAKE"
-)
-
-// RequireHandshakeSetting describes the settings for handshaking.
-type RequireHandshakeSetting int
-
-const (
- // RequireHandshakeOn indicates to wait for handshake before considering a
- // connection ready/successful.
- RequireHandshakeOn RequireHandshakeSetting = iota
- // RequireHandshakeOff indicates to not wait for handshake before
- // considering a connection ready/successful.
- RequireHandshakeOff
+ prefix = "GRPC_GO_"
+ retryStr = prefix + "RETRY"
)
var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
- // RequireHandshake is set based upon the GRPC_GO_REQUIRE_HANDSHAKE
- // environment variable.
- //
- // Will be removed after the 1.18 release.
- RequireHandshake = RequireHandshakeOn
)
-
-func init() {
- switch strings.ToLower(os.Getenv(requireHandshakeStr)) {
- case "on":
- fallthrough
- default:
- RequireHandshake = RequireHandshakeOn
- case "off":
- RequireHandshake = RequireHandshakeOff
- }
-}
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index 204ba15..b8e0aa4 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -23,6 +23,7 @@
"fmt"
"runtime"
"sync"
+ "sync/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
@@ -84,12 +85,24 @@
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.
+// maxQueuedTransportResponseFrames is the most queued "transport response"
+// frames we will buffer before preventing new reads from occurring on the
+// transport. These are control frames sent in response to client requests,
+// such as RST_STREAM due to bad headers or settings acks.
+const maxQueuedTransportResponseFrames = 50
+
+type cbItem interface {
+ isTransportResponseFrame() bool
+}
+
// registerStream is used to register an incoming stream with loopy writer.
type registerStream struct {
streamID uint32
wq *writeQuota
}
+func (*registerStream) isTransportResponseFrame() bool { return false }
+
// headerFrame is also used to register stream on the client-side.
type headerFrame struct {
streamID uint32
@@ -102,6 +115,10 @@
onOrphaned func(error) // Valid on client-side
}
+func (h *headerFrame) isTransportResponseFrame() bool {
+ return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
+}
+
type cleanupStream struct {
streamID uint32
rst bool
@@ -109,6 +126,8 @@
onWrite func()
}
+func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
+
type dataFrame struct {
streamID uint32
endStream bool
@@ -119,27 +138,41 @@
onEachWrite func()
}
+func (*dataFrame) isTransportResponseFrame() bool { return false }
+
type incomingWindowUpdate struct {
streamID uint32
increment uint32
}
+func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
+
type outgoingWindowUpdate struct {
streamID uint32
increment uint32
}
+func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
+ return false // window updates are throttled by thresholds
+}
+
type incomingSettings struct {
ss []http2.Setting
}
+func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
+
type outgoingSettings struct {
ss []http2.Setting
}
+func (*outgoingSettings) isTransportResponseFrame() bool { return false }
+
type incomingGoAway struct {
}
+func (*incomingGoAway) isTransportResponseFrame() bool { return false }
+
type goAway struct {
code http2.ErrCode
debugData []byte
@@ -147,15 +180,21 @@
closeConn bool
}
+func (*goAway) isTransportResponseFrame() bool { return false }
+
type ping struct {
ack bool
data [8]byte
}
+func (*ping) isTransportResponseFrame() bool { return true }
+
type outFlowControlSizeRequest struct {
resp chan uint32
}
+func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
+
type outStreamState int
const (
@@ -238,6 +277,14 @@
consumerWaiting bool
list *itemList
err error
+
+ // transportResponseFrames counts the number of queued items that represent
+ // the response of an action initiated by the peer. trfChan is created
+ // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
+ // closed and nilled when transportResponseFrames drops below the
+ // threshold. Both fields are protected by mu.
+ transportResponseFrames int
+ trfChan atomic.Value // *chan struct{}
}
func newControlBuffer(done <-chan struct{}) *controlBuffer {
@@ -248,12 +295,24 @@
}
}
-func (c *controlBuffer) put(it interface{}) error {
+// throttle blocks if there are too many incomingSettings/cleanupStreams in the
+// controlbuf.
+func (c *controlBuffer) throttle() {
+ ch, _ := c.trfChan.Load().(*chan struct{})
+ if ch != nil {
+ select {
+ case <-*ch:
+ case <-c.done:
+ }
+ }
+}
+
+func (c *controlBuffer) put(it cbItem) error {
_, err := c.executeAndPut(nil, it)
return err
}
-func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
+func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
var wakeUp bool
c.mu.Lock()
if c.err != nil {
@@ -271,6 +330,15 @@
c.consumerWaiting = false
}
c.list.enqueue(it)
+ if it.isTransportResponseFrame() {
+ c.transportResponseFrames++
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
+ // We are adding the frame that puts us over the threshold; create
+ // a throttling channel.
+ ch := make(chan struct{})
+ c.trfChan.Store(&ch)
+ }
+ }
c.mu.Unlock()
if wakeUp {
select {
@@ -304,7 +372,17 @@
return nil, c.err
}
if !c.list.isEmpty() {
- h := c.list.dequeue()
+ h := c.list.dequeue().(cbItem)
+ if h.isTransportResponseFrame() {
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
+ // We are removing the frame that put us over the
+ // threshold; close and clear the throttling channel.
+ ch := c.trfChan.Load().(*chan struct{})
+ close(*ch)
+ c.trfChan.Store((*chan struct{})(nil))
+ }
+ c.transportResponseFrames--
+ }
c.mu.Unlock()
return h, nil
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
index 5ea997a..f262edd 100644
--- a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
+++ b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
@@ -149,6 +149,7 @@
n = uint32(math.MaxInt32)
}
f.mu.Lock()
+ defer f.mu.Unlock()
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
// can send without a window update.
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
@@ -169,10 +170,8 @@
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
f.delta = n
}
- f.mu.Unlock()
return f.delta
}
- f.mu.Unlock()
return 0
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index c96178d..41a79c5 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -493,6 +493,9 @@
}
func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
+ if len(t.perRPCCreds) == 0 {
+ return nil, nil
+ }
authData := map[string]string{}
for _, c := range t.perRPCCreds {
data, err := c.GetRequestMetadata(ctx, audience)
@@ -513,7 +516,7 @@
}
func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
- callAuthData := map[string]string{}
+ var callAuthData map[string]string
// Check if credentials.PerRPCCredentials were provided via call options.
// Note: if these credentials are provided both via dial options and call
// options, then both sets of credentials will be applied.
@@ -525,6 +528,7 @@
if err != nil {
return nil, status.Errorf(codes.Internal, "transport: %v", err)
}
+ callAuthData = make(map[string]string, len(data))
for k, v := range data {
// Capital header names are illegal in HTTP/2
k = strings.ToLower(k)
@@ -556,7 +560,6 @@
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
close(s.headerChan)
}
-
}
hdr := &headerFrame{
hf: headerFields,
@@ -769,6 +772,9 @@
t.mu.Unlock()
return nil
}
+ // Call t.onClose before setting the state to closing to prevent the client
+ // from attempting to create new streams ASAP.
+ t.onClose()
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
@@ -789,7 +795,6 @@
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
- t.onClose()
return err
}
@@ -978,9 +983,9 @@
statusCode = codes.Unknown
}
if statusCode == codes.Canceled {
- // Our deadline was already exceeded, and that was likely the cause of
- // this cancelation. Alter the status code accordingly.
- if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
+ if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
+ // Our deadline was already exceeded, and that was likely the cause
+ // of this cancelation. Alter the status code accordingly.
statusCode = codes.DeadlineExceeded
}
}
@@ -1085,11 +1090,12 @@
default:
t.setGoAwayReason(f)
close(t.goAway)
- t.state = draining
t.controlBuf.put(&incomingGoAway{})
-
- // This has to be a new goroutine because we're still using the current goroutine to read in the transport.
+ // Notify the clientconn about the GOAWAY before we set the state to
+ // draining, to allow the client to stop attempting to create streams
+ // before disallowing new streams on this connection.
t.onGoAway(t.goAwayReason)
+ t.state = draining
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
@@ -1239,6 +1245,7 @@
// loop to keep reading incoming messages on this transport.
for {
+ t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
@@ -1326,6 +1333,7 @@
timer.Reset(t.kp.Time)
continue
}
+ infof("transport: closing client transport due to idleness.")
t.Close()
return
case <-t.ctx.Done():
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 150b73e..4e26f6a 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -138,7 +138,10 @@
}
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
// Send initial settings as connection preface to client.
- var isettings []http2.Setting
+ isettings := []http2.Setting{{
+ ID: http2.SettingMaxFrameSize,
+ Val: http2MaxFrameLen,
+ }}
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config.MaxStreams
@@ -436,6 +439,7 @@
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
+ t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1)
if err != nil {
@@ -766,6 +770,10 @@
return nil
}
+func (t *http2Server) setResetPingStrikes() {
+ atomic.StoreUint32(&t.resetPingStrikes, 1)
+}
+
func (t *http2Server) writeHeaderLocked(s *Stream) error {
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
@@ -780,9 +788,7 @@
streamID: s.id,
hf: headerFields,
endStream: false,
- onWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ onWrite: t.setResetPingStrikes,
})
if !success {
if err != nil {
@@ -842,9 +848,7 @@
streamID: s.id,
hf: headerFields,
endStream: true,
- onWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ onWrite: t.setResetPingStrikes,
}
s.hdrMu.Unlock()
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
@@ -896,12 +900,10 @@
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
df := &dataFrame{
- streamID: s.id,
- h: hdr,
- d: data,
- onEachWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ streamID: s.id,
+ h: hdr,
+ d: data,
+ onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
@@ -967,6 +969,7 @@
select {
case <-maxAge.C:
// Close the connection after grace period.
+ infof("transport: closing server transport due to maximum connection age.")
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
@@ -980,6 +983,7 @@
continue
}
if pingSent {
+ infof("transport: closing server transport due to idleness.")
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 9d21286..8f5f334 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -667,6 +667,7 @@
writer: w,
fr: http2.NewFramer(w, r),
}
+ f.fr.SetMaxReadFrameSize(http2MaxFrameLen)
// Opt-in to Frame reuse API on framer to reduce garbage.
// Frames aren't safe to read from after a subsequent call to ReadFrame.
f.fr.SetReuseFrames()
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 0f33c9c..1c1d106 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -184,6 +184,19 @@
// r.readAdditional acts on that message and returns the necessary error.
select {
case <-r.ctxDone:
+ // Note that this adds the ctx error to the end of recv buffer, and
+ // reads from the head. This will delay the error until recv buffer is
+ // empty, thus will delay ctx cancellation in Recv().
+ //
+ // It's done this way to fix a race between ctx cancel and trailer. The
+ // race was, stream.Recv() may return ctx error if ctxDone wins the
+ // race, but stream.Trailer() may return a non-nil md because the stream
+ // was not marked as done when trailer is received. This closeStream
+ // call will mark stream as done, thus fix the race.
+ //
+ // TODO: delaying ctx error seems like a unnecessary side effect. What
+ // we really want is to mark the stream as done, and return ctx error
+ // faster.
r.closeStream(ContextErr(r.ctx.Err()))
m := <-r.recv.get()
return r.readAdditional(m, p)
@@ -298,6 +311,14 @@
}
select {
case <-s.ctx.Done():
+ // We prefer success over failure when reading messages because we delay
+ // context error in stream.Read(). To keep behavior consistent, we also
+ // prefer success here.
+ select {
+ case <-s.headerChan:
+ return nil
+ default:
+ }
return ContextErr(s.ctx.Err())
case <-s.headerChan:
return nil
diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go
index d1e38aa..ed05b02 100644
--- a/vendor/google.golang.org/grpc/pickfirst.go
+++ b/vendor/google.golang.org/grpc/pickfirst.go
@@ -51,14 +51,18 @@
func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
- grpclog.Infof("pickfirstBalancer: HandleResolvedAddrs called with error %v", err)
+ if grpclog.V(2) {
+ grpclog.Infof("pickfirstBalancer: HandleResolvedAddrs called with error %v", err)
+ }
return
}
if b.sc == nil {
b.sc, err = b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
if err != nil {
//TODO(yuxuanli): why not change the cc state to Idle?
- grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
+ if grpclog.V(2) {
+ grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
+ }
return
}
b.cc.UpdateBalancerState(connectivity.Idle, &picker{sc: b.sc})
@@ -70,9 +74,13 @@
}
func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
- grpclog.Infof("pickfirstBalancer: HandleSubConnStateChange: %p, %v", sc, s)
+ if grpclog.V(2) {
+ grpclog.Infof("pickfirstBalancer: HandleSubConnStateChange: %p, %v", sc, s)
+ }
if b.sc != sc {
- grpclog.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
+ if grpclog.V(2) {
+ grpclog.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
+ }
return
}
if s == connectivity.Shutdown {
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 1766136..f064b73 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -42,6 +42,7 @@
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@@ -56,6 +57,8 @@
defaultServerMaxSendMessageSize = math.MaxInt32
)
+var statusOK = status.New(codes.OK, "")
+
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
// MethodDesc represents an RPC service's method specification.
@@ -97,10 +100,8 @@
m map[string]*service // service name -> service info
events trace.EventLog
- quit chan struct{}
- done chan struct{}
- quitOnce sync.Once
- doneOnce sync.Once
+ quit *grpcsync.Event
+ done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
@@ -388,8 +389,8 @@
opts: opts,
conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service),
- quit: make(chan struct{}),
- done: make(chan struct{}),
+ quit: grpcsync.NewEvent(),
+ done: grpcsync.NewEvent(),
czData: new(channelzData),
}
s.cv = sync.NewCond(&s.mu)
@@ -556,11 +557,9 @@
s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
- select {
- // Stop or GracefulStop called; block until done and return nil.
- case <-s.quit:
- <-s.done
- default:
+ if s.quit.HasFired() {
+ // Stop or GracefulStop called; block until done and return nil.
+ <-s.done.Done()
}
}()
@@ -603,7 +602,7 @@
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
- case <-s.quit:
+ case <-s.quit.Done():
timer.Stop()
return nil
}
@@ -613,10 +612,8 @@
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
- select {
- case <-s.quit:
+ if s.quit.HasFired() {
return nil
- default:
}
return err
}
@@ -637,6 +634,10 @@
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
+ if s.quit.HasFired() {
+ rawConn.Close()
+ return
+ }
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
@@ -653,14 +654,6 @@
return
}
- s.mu.Lock()
- if s.conns == nil {
- s.mu.Unlock()
- conn.Close()
- return
- }
- s.mu.Unlock()
-
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
@@ -768,6 +761,9 @@
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
// If tracing is not enabled, it returns nil.
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
+ if !EnableTracing {
+ return nil
+ }
tr, ok := trace.FromContext(stream.Context())
if !ok {
return nil
@@ -978,10 +974,11 @@
}
if sh != nil {
sh.HandleRPC(stream.Context(), &stats.InPayload{
- RecvTime: time.Now(),
- Payload: v,
- Data: d,
- Length: len(d),
+ RecvTime: time.Now(),
+ Payload: v,
+ WireLength: payInfo.wireLength,
+ Data: d,
+ Length: len(d),
})
}
if binlog != nil {
@@ -1077,7 +1074,7 @@
// TODO: Should we be logging if writing status failed here, like above?
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
// error or allow the stats handler to see it?
- err = t.WriteStatus(stream, status.New(codes.OK, ""))
+ err = t.WriteStatus(stream, statusOK)
if binlog != nil {
binlog.Log(&binarylog.ServerTrailer{
Trailer: stream.Trailer(),
@@ -1235,7 +1232,7 @@
ss.trInfo.tr.LazyLog(stringer("OK"), false)
ss.mu.Unlock()
}
- err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
+ err = t.WriteStatus(ss.s, statusOK)
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
@@ -1352,15 +1349,11 @@
// pending RPCs on the client side will get notified by connection
// errors.
func (s *Server) Stop() {
- s.quitOnce.Do(func() {
- close(s.quit)
- })
+ s.quit.Fire()
defer func() {
s.serveWG.Wait()
- s.doneOnce.Do(func() {
- close(s.done)
- })
+ s.done.Fire()
}()
s.channelzRemoveOnce.Do(func() {
@@ -1397,15 +1390,8 @@
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
- s.quitOnce.Do(func() {
- close(s.quit)
- })
-
- defer func() {
- s.doneOnce.Do(func() {
- close(s.done)
- })
- }()
+ s.quit.Fire()
+ defer s.done.Fire()
s.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
diff --git a/vendor/google.golang.org/grpc/status/status.go b/vendor/google.golang.org/grpc/status/status.go
index 641c45c..a1348e9 100644
--- a/vendor/google.golang.org/grpc/status/status.go
+++ b/vendor/google.golang.org/grpc/status/status.go
@@ -58,6 +58,17 @@
return &Status{s: (*spb.Status)(se)}
}
+// Is implements future error.Is functionality.
+// A statusError is equivalent if the code and message are identical.
+func (se *statusError) Is(target error) bool {
+ tse, ok := target.(*statusError)
+ if !ok {
+ return false
+ }
+
+ return proto.Equal((*spb.Status)(se), (*spb.Status)(tse))
+}
+
// Status represents an RPC status code, message, and details. It is immutable
// and should be created with New, Newf, or FromProto.
type Status struct {
@@ -132,7 +143,7 @@
// Status is returned with codes.Unknown and the original error message.
func FromError(err error) (s *Status, ok bool) {
if err == nil {
- return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true
+ return nil, true
}
if se, ok := err.(interface {
GRPCStatus() *Status
@@ -206,7 +217,7 @@
func FromContextError(err error) *Status {
switch err {
case nil:
- return New(codes.OK, "")
+ return nil
case context.DeadlineExceeded:
return New(codes.DeadlineExceeded, err.Error())
case context.Canceled:
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index db14c32..134a624 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -327,13 +327,23 @@
return cs, nil
}
-func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error {
- cs.attempt = &csAttempt{
+// newAttemptLocked creates a new attempt with a transport.
+// If it succeeds, then it replaces clientStream's attempt with this new attempt.
+func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
+ newAttempt := &csAttempt{
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
}
+ defer func() {
+ if retErr != nil {
+ // This attempt is not set in the clientStream, so it's finish won't
+ // be called. Call it here for stats and trace in case they are not
+ // nil.
+ newAttempt.finish(retErr)
+ }
+ }()
if err := cs.ctx.Err(); err != nil {
return toRPCErr(err)
@@ -345,8 +355,9 @@
if trInfo != nil {
trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
}
- cs.attempt.t = t
- cs.attempt.done = done
+ newAttempt.t = t
+ newAttempt.done = done
+ cs.attempt = newAttempt
return nil
}
@@ -395,11 +406,18 @@
serverHeaderBinlogged bool
mu sync.Mutex
- firstAttempt bool // if true, transparent retry is valid
- numRetries int // exclusive of transparent retry attempt(s)
- numRetriesSincePushback int // retries since pushback; to reset backoff
- finished bool // TODO: replace with atomic cmpxchg or sync.Once?
- attempt *csAttempt // the active client stream attempt
+ firstAttempt bool // if true, transparent retry is valid
+ numRetries int // exclusive of transparent retry attempt(s)
+ numRetriesSincePushback int // retries since pushback; to reset backoff
+ finished bool // TODO: replace with atomic cmpxchg or sync.Once?
+ // attempt is the active client stream attempt.
+ // The only place where it is written is the newAttemptLocked method and this method never writes nil.
+ // So, attempt can be nil only inside newClientStream function when clientStream is first created.
+ // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
+ // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
+ // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
+ // place where we need to check if the attempt is nil.
+ attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
committed bool // active attempt committed for retry?
buffer []func(a *csAttempt) error // operations to replay on retry
@@ -457,8 +475,8 @@
if cs.attempt.s != nil {
<-cs.attempt.s.Done()
}
- if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
- // First attempt, wait-for-ready, stream unprocessed: transparently retry.
+ if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
+ // First attempt, stream unprocessed: transparently retry.
cs.firstAttempt = false
return nil
}
@@ -805,11 +823,11 @@
}
if cs.attempt != nil {
cs.attempt.finish(err)
- }
- // after functions all rely upon having a stream.
- if cs.attempt.s != nil {
- for _, o := range cs.opts {
- o.after(cs.callInfo)
+ // after functions all rely upon having a stream.
+ if cs.attempt.s != nil {
+ for _, o := range cs.opts {
+ o.after(cs.callInfo)
+ }
}
}
cs.cancel()
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index 3ab7889..5888505 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.22.0"
+const Version = "1.23.1"
diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh
index 11037b9..661e1e1 100755
--- a/vendor/google.golang.org/grpc/vet.sh
+++ b/vendor/google.golang.org/grpc/vet.sh
@@ -105,12 +105,14 @@
fi
# - Collection of static analysis checks
-# TODO(menghanl): fix errors in transport_test.
+# TODO(dfawley): don't use deprecated functions in examples.
staticcheck -go 1.9 -checks 'inherit,-ST1015' -ignore '
google.golang.org/grpc/balancer.go:SA1019
+google.golang.org/grpc/balancer/grpclb/grpclb_remote_balancer.go:SA1019
google.golang.org/grpc/balancer/roundrobin/roundrobin_test.go:SA1019
-google.golang.org/grpc/balancer/xds/edsbalancer/balancergroup.go:SA1019
-google.golang.org/grpc/balancer/xds/xds.go:SA1019
+google.golang.org/grpc/xds/internal/balancer/edsbalancer/balancergroup.go:SA1019
+google.golang.org/grpc/xds/internal/balancer/xds.go:SA1019
+google.golang.org/grpc/xds/internal/balancer/xds_client.go:SA1019
google.golang.org/grpc/balancer_conn_wrappers.go:SA1019
google.golang.org/grpc/balancer_test.go:SA1019
google.golang.org/grpc/benchmark/benchmain/main.go:SA1019
@@ -118,10 +120,13 @@
google.golang.org/grpc/clientconn.go:S1024
google.golang.org/grpc/clientconn_state_transition_test.go:SA1019
google.golang.org/grpc/clientconn_test.go:SA1019
+google.golang.org/grpc/examples/features/debugging/client/main.go:SA1019
+google.golang.org/grpc/examples/features/load_balancing/client/main.go:SA1019
google.golang.org/grpc/internal/transport/handler_server.go:SA1019
google.golang.org/grpc/internal/transport/handler_server_test.go:SA1019
google.golang.org/grpc/resolver/dns/dns_resolver.go:SA1019
google.golang.org/grpc/stats/stats_test.go:SA1019
+google.golang.org/grpc/test/balancer_test.go:SA1019
google.golang.org/grpc/test/channelz_test.go:SA1019
google.golang.org/grpc/test/end2end_test.go:SA1019
google.golang.org/grpc/test/healthcheck_test.go:SA1019