[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/auth.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/auth.go
deleted file mode 100644
index 62ce757..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/auth.go
+++ /dev/null
@@ -1,158 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import (
- "context"
-
- "go.etcd.io/etcd/etcdserver"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
-)
-
-type AuthServer struct {
- authenticator etcdserver.Authenticator
-}
-
-func NewAuthServer(s *etcdserver.EtcdServer) *AuthServer {
- return &AuthServer{authenticator: s}
-}
-
-func (as *AuthServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
- resp, err := as.authenticator.AuthEnable(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
- resp, err := as.authenticator.AuthDisable(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
- resp, err := as.authenticator.Authenticate(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
- resp, err := as.authenticator.RoleAdd(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
- resp, err := as.authenticator.RoleDelete(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
- resp, err := as.authenticator.RoleGet(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
- resp, err := as.authenticator.RoleList(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
- resp, err := as.authenticator.RoleRevokePermission(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
- resp, err := as.authenticator.RoleGrantPermission(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
- resp, err := as.authenticator.UserAdd(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
- resp, err := as.authenticator.UserDelete(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
- resp, err := as.authenticator.UserGet(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
- resp, err := as.authenticator.UserList(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
- resp, err := as.authenticator.UserGrantRole(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
- resp, err := as.authenticator.UserRevokeRole(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
-
-func (as *AuthServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
- resp, err := as.authenticator.UserChangePassword(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
- return resp, nil
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/codec.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/codec.go
deleted file mode 100644
index 17a2c87..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/codec.go
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import "github.com/gogo/protobuf/proto"
-
-type codec struct{}
-
-func (c *codec) Marshal(v interface{}) ([]byte, error) {
- b, err := proto.Marshal(v.(proto.Message))
- sentBytes.Add(float64(len(b)))
- return b, err
-}
-
-func (c *codec) Unmarshal(data []byte, v interface{}) error {
- receivedBytes.Add(float64(len(data)))
- return proto.Unmarshal(data, v.(proto.Message))
-}
-
-func (c *codec) String() string {
- return "proto"
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/grpc.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/grpc.go
deleted file mode 100644
index 3332016..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/grpc.go
+++ /dev/null
@@ -1,77 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import (
- "crypto/tls"
- "math"
-
- "go.etcd.io/etcd/etcdserver"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
-
- grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
- grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
- "go.etcd.io/etcd/clientv3/credentials"
- "google.golang.org/grpc"
- "google.golang.org/grpc/health"
- healthpb "google.golang.org/grpc/health/grpc_health_v1"
-)
-
-const (
- grpcOverheadBytes = 512 * 1024
- maxStreams = math.MaxUint32
- maxSendBytes = math.MaxInt32
-)
-
-func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
- var opts []grpc.ServerOption
- opts = append(opts, grpc.CustomCodec(&codec{}))
- if tls != nil {
- bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls})
- opts = append(opts, grpc.Creds(bundle.TransportCredentials()))
- }
- opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
- newLogUnaryInterceptor(s),
- newUnaryInterceptor(s),
- grpc_prometheus.UnaryServerInterceptor,
- )))
- opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
- newStreamInterceptor(s),
- grpc_prometheus.StreamServerInterceptor,
- )))
- opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
- opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
- opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
- grpcServer := grpc.NewServer(append(opts, gopts...)...)
-
- pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
- pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
- pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
- pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
- pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
- pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
-
- // server should register all the services manually
- // use empty service name for all etcd services' health status,
- // see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
- hsrv := health.NewServer()
- hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
- healthpb.RegisterHealthServer(grpcServer, hsrv)
-
- // set zero values for metrics registered for this grpc server
- grpc_prometheus.Register(grpcServer)
-
- return grpcServer
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/header.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/header.go
deleted file mode 100644
index f23b6a7..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/header.go
+++ /dev/null
@@ -1,49 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import (
- "go.etcd.io/etcd/etcdserver"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
-)
-
-type header struct {
- clusterID int64
- memberID int64
- sg etcdserver.RaftStatusGetter
- rev func() int64
-}
-
-func newHeader(s *etcdserver.EtcdServer) header {
- return header{
- clusterID: int64(s.Cluster().ID()),
- memberID: int64(s.ID()),
- sg: s,
- rev: func() int64 { return s.KV().Rev() },
- }
-}
-
-// fill populates pb.ResponseHeader using etcdserver information
-func (h *header) fill(rh *pb.ResponseHeader) {
- if rh == nil {
- plog.Panic("unexpected nil resp.Header")
- }
- rh.ClusterId = uint64(h.clusterID)
- rh.MemberId = uint64(h.memberID)
- rh.RaftTerm = h.sg.Term()
- if rh.Revision == 0 {
- rh.Revision = h.rev()
- }
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/interceptor.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/interceptor.go
deleted file mode 100644
index ce9047e..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/interceptor.go
+++ /dev/null
@@ -1,276 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import (
- "context"
- "sync"
- "time"
-
- "go.etcd.io/etcd/etcdserver"
- "go.etcd.io/etcd/etcdserver/api"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- "go.etcd.io/etcd/pkg/types"
- "go.etcd.io/etcd/raft"
-
- "github.com/coreos/pkg/capnslog"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- "go.uber.org/zap"
- "google.golang.org/grpc"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/peer"
-)
-
-const (
- maxNoLeaderCnt = 3
-)
-
-type streamsMap struct {
- mu sync.Mutex
- streams map[grpc.ServerStream]struct{}
-}
-
-func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
- return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
- if !api.IsCapabilityEnabled(api.V3rpcCapability) {
- return nil, rpctypes.ErrGRPCNotCapable
- }
-
- if s.IsMemberExist(s.ID()) && s.IsLearner() && !isRPCSupportedForLearner(req) {
- return nil, rpctypes.ErrGPRCNotSupportedForLearner
- }
-
- md, ok := metadata.FromIncomingContext(ctx)
- if ok {
- if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
- if s.Leader() == types.ID(raft.None) {
- return nil, rpctypes.ErrGRPCNoLeader
- }
- }
- }
-
- return handler(ctx, req)
- }
-}
-
-func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
- return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
- startTime := time.Now()
- resp, err := handler(ctx, req)
- lg := s.Logger()
- if (lg != nil && lg.Core().Enabled(zap.DebugLevel)) || // using zap logger and debug level is enabled
- (lg == nil && plog.LevelAt(capnslog.DEBUG)) { // or, using capnslog and debug level is enabled
- defer logUnaryRequestStats(ctx, lg, info, startTime, req, resp)
- }
- return resp, err
- }
-}
-
-func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
- duration := time.Since(startTime)
- remote := "No remote client info."
- peerInfo, ok := peer.FromContext(ctx)
- if ok {
- remote = peerInfo.Addr.String()
- }
- responseType := info.FullMethod
- var reqCount, respCount int64
- var reqSize, respSize int
- var reqContent string
- switch _resp := resp.(type) {
- case *pb.RangeResponse:
- _req, ok := req.(*pb.RangeRequest)
- if ok {
- reqCount = 0
- reqSize = _req.Size()
- reqContent = _req.String()
- }
- if _resp != nil {
- respCount = _resp.GetCount()
- respSize = _resp.Size()
- }
- case *pb.PutResponse:
- _req, ok := req.(*pb.PutRequest)
- if ok {
- reqCount = 1
- reqSize = _req.Size()
- reqContent = pb.NewLoggablePutRequest(_req).String()
- // redact value field from request content, see PR #9821
- }
- if _resp != nil {
- respCount = 0
- respSize = _resp.Size()
- }
- case *pb.DeleteRangeResponse:
- _req, ok := req.(*pb.DeleteRangeRequest)
- if ok {
- reqCount = 0
- reqSize = _req.Size()
- reqContent = _req.String()
- }
- if _resp != nil {
- respCount = _resp.GetDeleted()
- respSize = _resp.Size()
- }
- case *pb.TxnResponse:
- _req, ok := req.(*pb.TxnRequest)
- if ok && _resp != nil {
- if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
- reqCount = int64(len(_req.GetSuccess()))
- reqSize = 0
- for _, r := range _req.GetSuccess() {
- reqSize += r.Size()
- }
- } else {
- reqCount = int64(len(_req.GetFailure()))
- reqSize = 0
- for _, r := range _req.GetFailure() {
- reqSize += r.Size()
- }
- }
- reqContent = pb.NewLoggableTxnRequest(_req).String()
- // redact value field from request content, see PR #9821
- }
- if _resp != nil {
- respCount = 0
- respSize = _resp.Size()
- }
- default:
- reqCount = -1
- reqSize = -1
- respCount = -1
- respSize = -1
- }
-
- logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
-}
-
-func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
- reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
- if lg == nil {
- plog.Debugf("start time = %v, "+
- "time spent = %v, "+
- "remote = %s, "+
- "response type = %s, "+
- "request count = %d, "+
- "request size = %d, "+
- "response count = %d, "+
- "response size = %d, "+
- "request content = %s",
- startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
- )
- } else {
- lg.Debug("request stats",
- zap.Time("start time", startTime),
- zap.Duration("time spent", duration),
- zap.String("remote", remote),
- zap.String("response type", responseType),
- zap.Int64("request count", reqCount),
- zap.Int("request size", reqSize),
- zap.Int64("response count", respCount),
- zap.Int("response size", respSize),
- zap.String("request content", reqContent),
- )
- }
-}
-
-func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
- smap := monitorLeader(s)
-
- return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- if !api.IsCapabilityEnabled(api.V3rpcCapability) {
- return rpctypes.ErrGRPCNotCapable
- }
-
- if s.IsMemberExist(s.ID()) && s.IsLearner() { // learner does not support stream RPC
- return rpctypes.ErrGPRCNotSupportedForLearner
- }
-
- md, ok := metadata.FromIncomingContext(ss.Context())
- if ok {
- if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
- if s.Leader() == types.ID(raft.None) {
- return rpctypes.ErrGRPCNoLeader
- }
-
- cctx, cancel := context.WithCancel(ss.Context())
- ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
-
- smap.mu.Lock()
- smap.streams[ss] = struct{}{}
- smap.mu.Unlock()
-
- defer func() {
- smap.mu.Lock()
- delete(smap.streams, ss)
- smap.mu.Unlock()
- cancel()
- }()
-
- }
- }
-
- return handler(srv, ss)
- }
-}
-
-type serverStreamWithCtx struct {
- grpc.ServerStream
- ctx context.Context
- cancel *context.CancelFunc
-}
-
-func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
-
-func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
- smap := &streamsMap{
- streams: make(map[grpc.ServerStream]struct{}),
- }
-
- go func() {
- election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
- noLeaderCnt := 0
-
- for {
- select {
- case <-s.StopNotify():
- return
- case <-time.After(election):
- if s.Leader() == types.ID(raft.None) {
- noLeaderCnt++
- } else {
- noLeaderCnt = 0
- }
-
- // We are more conservative on canceling existing streams. Reconnecting streams
- // cost much more than just rejecting new requests. So we wait until the member
- // cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
- if noLeaderCnt >= maxNoLeaderCnt {
- smap.mu.Lock()
- for ss := range smap.streams {
- if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
- (*ssWithCtx.cancel)()
- <-ss.Context().Done()
- }
- }
- smap.streams = make(map[grpc.ServerStream]struct{})
- smap.mu.Unlock()
- }
- }
- }
- }()
-
- return smap
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/key.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/key.go
deleted file mode 100644
index ff59bac..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/key.go
+++ /dev/null
@@ -1,277 +0,0 @@
-// Copyright 2015 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package v3rpc implements etcd v3 RPC system based on gRPC.
-package v3rpc
-
-import (
- "context"
-
- "go.etcd.io/etcd/etcdserver"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- "go.etcd.io/etcd/pkg/adt"
-
- "github.com/coreos/pkg/capnslog"
-)
-
-var (
- plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "etcdserver/api/v3rpc")
-)
-
-type kvServer struct {
- hdr header
- kv etcdserver.RaftKV
- // maxTxnOps is the max operations per txn.
- // e.g suppose maxTxnOps = 128.
- // Txn.Success can have at most 128 operations,
- // and Txn.Failure can have at most 128 operations.
- maxTxnOps uint
-}
-
-func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
- return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
-}
-
-func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
- if err := checkRangeRequest(r); err != nil {
- return nil, err
- }
-
- resp, err := s.kv.Range(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
-
- s.hdr.fill(resp.Header)
- return resp, nil
-}
-
-func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
- if err := checkPutRequest(r); err != nil {
- return nil, err
- }
-
- resp, err := s.kv.Put(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
-
- s.hdr.fill(resp.Header)
- return resp, nil
-}
-
-func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
- if err := checkDeleteRequest(r); err != nil {
- return nil, err
- }
-
- resp, err := s.kv.DeleteRange(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
-
- s.hdr.fill(resp.Header)
- return resp, nil
-}
-
-func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
- if err := checkTxnRequest(r, int(s.maxTxnOps)); err != nil {
- return nil, err
- }
- // check for forbidden put/del overlaps after checking request to avoid quadratic blowup
- if _, _, err := checkIntervals(r.Success); err != nil {
- return nil, err
- }
- if _, _, err := checkIntervals(r.Failure); err != nil {
- return nil, err
- }
-
- resp, err := s.kv.Txn(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
-
- s.hdr.fill(resp.Header)
- return resp, nil
-}
-
-func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
- resp, err := s.kv.Compact(ctx, r)
- if err != nil {
- return nil, togRPCError(err)
- }
-
- s.hdr.fill(resp.Header)
- return resp, nil
-}
-
-func checkRangeRequest(r *pb.RangeRequest) error {
- if len(r.Key) == 0 {
- return rpctypes.ErrGRPCEmptyKey
- }
- return nil
-}
-
-func checkPutRequest(r *pb.PutRequest) error {
- if len(r.Key) == 0 {
- return rpctypes.ErrGRPCEmptyKey
- }
- if r.IgnoreValue && len(r.Value) != 0 {
- return rpctypes.ErrGRPCValueProvided
- }
- if r.IgnoreLease && r.Lease != 0 {
- return rpctypes.ErrGRPCLeaseProvided
- }
- return nil
-}
-
-func checkDeleteRequest(r *pb.DeleteRangeRequest) error {
- if len(r.Key) == 0 {
- return rpctypes.ErrGRPCEmptyKey
- }
- return nil
-}
-
-func checkTxnRequest(r *pb.TxnRequest, maxTxnOps int) error {
- opc := len(r.Compare)
- if opc < len(r.Success) {
- opc = len(r.Success)
- }
- if opc < len(r.Failure) {
- opc = len(r.Failure)
- }
- if opc > maxTxnOps {
- return rpctypes.ErrGRPCTooManyOps
- }
-
- for _, c := range r.Compare {
- if len(c.Key) == 0 {
- return rpctypes.ErrGRPCEmptyKey
- }
- }
- for _, u := range r.Success {
- if err := checkRequestOp(u, maxTxnOps-opc); err != nil {
- return err
- }
- }
- for _, u := range r.Failure {
- if err := checkRequestOp(u, maxTxnOps-opc); err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// checkIntervals tests whether puts and deletes overlap for a list of ops. If
-// there is an overlap, returns an error. If no overlap, return put and delete
-// sets for recursive evaluation.
-func checkIntervals(reqs []*pb.RequestOp) (map[string]struct{}, adt.IntervalTree, error) {
- dels := adt.NewIntervalTree()
-
- // collect deletes from this level; build first to check lower level overlapped puts
- for _, req := range reqs {
- tv, ok := req.Request.(*pb.RequestOp_RequestDeleteRange)
- if !ok {
- continue
- }
- dreq := tv.RequestDeleteRange
- if dreq == nil {
- continue
- }
- var iv adt.Interval
- if len(dreq.RangeEnd) != 0 {
- iv = adt.NewStringAffineInterval(string(dreq.Key), string(dreq.RangeEnd))
- } else {
- iv = adt.NewStringAffinePoint(string(dreq.Key))
- }
- dels.Insert(iv, struct{}{})
- }
-
- // collect children puts/deletes
- puts := make(map[string]struct{})
- for _, req := range reqs {
- tv, ok := req.Request.(*pb.RequestOp_RequestTxn)
- if !ok {
- continue
- }
- putsThen, delsThen, err := checkIntervals(tv.RequestTxn.Success)
- if err != nil {
- return nil, dels, err
- }
- putsElse, delsElse, err := checkIntervals(tv.RequestTxn.Failure)
- if err != nil {
- return nil, dels, err
- }
- for k := range putsThen {
- if _, ok := puts[k]; ok {
- return nil, dels, rpctypes.ErrGRPCDuplicateKey
- }
- if dels.Intersects(adt.NewStringAffinePoint(k)) {
- return nil, dels, rpctypes.ErrGRPCDuplicateKey
- }
- puts[k] = struct{}{}
- }
- for k := range putsElse {
- if _, ok := puts[k]; ok {
- // if key is from putsThen, overlap is OK since
- // either then/else are mutually exclusive
- if _, isSafe := putsThen[k]; !isSafe {
- return nil, dels, rpctypes.ErrGRPCDuplicateKey
- }
- }
- if dels.Intersects(adt.NewStringAffinePoint(k)) {
- return nil, dels, rpctypes.ErrGRPCDuplicateKey
- }
- puts[k] = struct{}{}
- }
- dels.Union(delsThen, adt.NewStringAffineInterval("\x00", ""))
- dels.Union(delsElse, adt.NewStringAffineInterval("\x00", ""))
- }
-
- // collect and check this level's puts
- for _, req := range reqs {
- tv, ok := req.Request.(*pb.RequestOp_RequestPut)
- if !ok || tv.RequestPut == nil {
- continue
- }
- k := string(tv.RequestPut.Key)
- if _, ok := puts[k]; ok {
- return nil, dels, rpctypes.ErrGRPCDuplicateKey
- }
- if dels.Intersects(adt.NewStringAffinePoint(k)) {
- return nil, dels, rpctypes.ErrGRPCDuplicateKey
- }
- puts[k] = struct{}{}
- }
- return puts, dels, nil
-}
-
-func checkRequestOp(u *pb.RequestOp, maxTxnOps int) error {
- // TODO: ensure only one of the field is set.
- switch uv := u.Request.(type) {
- case *pb.RequestOp_RequestRange:
- return checkRangeRequest(uv.RequestRange)
- case *pb.RequestOp_RequestPut:
- return checkPutRequest(uv.RequestPut)
- case *pb.RequestOp_RequestDeleteRange:
- return checkDeleteRequest(uv.RequestDeleteRange)
- case *pb.RequestOp_RequestTxn:
- return checkTxnRequest(uv.RequestTxn, maxTxnOps)
- default:
- // empty op / nil entry
- return rpctypes.ErrGRPCKeyNotFound
- }
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/lease.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/lease.go
deleted file mode 100644
index 7441bee..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/lease.go
+++ /dev/null
@@ -1,169 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import (
- "context"
- "io"
-
- "go.etcd.io/etcd/etcdserver"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- "go.etcd.io/etcd/lease"
-
- "go.uber.org/zap"
-)
-
-type LeaseServer struct {
- lg *zap.Logger
- hdr header
- le etcdserver.Lessor
-}
-
-func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
- return &LeaseServer{lg: s.Cfg.Logger, le: s, hdr: newHeader(s)}
-}
-
-func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
- resp, err := ls.le.LeaseGrant(ctx, cr)
-
- if err != nil {
- return nil, togRPCError(err)
- }
- ls.hdr.fill(resp.Header)
- return resp, nil
-}
-
-func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
- resp, err := ls.le.LeaseRevoke(ctx, rr)
- if err != nil {
- return nil, togRPCError(err)
- }
- ls.hdr.fill(resp.Header)
- return resp, nil
-}
-
-func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
- resp, err := ls.le.LeaseTimeToLive(ctx, rr)
- if err != nil && err != lease.ErrLeaseNotFound {
- return nil, togRPCError(err)
- }
- if err == lease.ErrLeaseNotFound {
- resp = &pb.LeaseTimeToLiveResponse{
- Header: &pb.ResponseHeader{},
- ID: rr.ID,
- TTL: -1,
- }
- }
- ls.hdr.fill(resp.Header)
- return resp, nil
-}
-
-func (ls *LeaseServer) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
- resp, err := ls.le.LeaseLeases(ctx, rr)
- if err != nil && err != lease.ErrLeaseNotFound {
- return nil, togRPCError(err)
- }
- if err == lease.ErrLeaseNotFound {
- resp = &pb.LeaseLeasesResponse{
- Header: &pb.ResponseHeader{},
- Leases: []*pb.LeaseStatus{},
- }
- }
- ls.hdr.fill(resp.Header)
- return resp, nil
-}
-
-func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) {
- errc := make(chan error, 1)
- go func() {
- errc <- ls.leaseKeepAlive(stream)
- }()
- select {
- case err = <-errc:
- case <-stream.Context().Done():
- // the only server-side cancellation is noleader for now.
- err = stream.Context().Err()
- if err == context.Canceled {
- err = rpctypes.ErrGRPCNoLeader
- }
- }
- return err
-}
-
-func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
- for {
- req, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- if isClientCtxErr(stream.Context().Err(), err) {
- if ls.lg != nil {
- ls.lg.Debug("failed to receive lease keepalive request from gRPC stream", zap.Error(err))
- } else {
- plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
- }
- } else {
- if ls.lg != nil {
- ls.lg.Warn("failed to receive lease keepalive request from gRPC stream", zap.Error(err))
- } else {
- plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
- }
- streamFailures.WithLabelValues("receive", "lease-keepalive").Inc()
- }
- return err
- }
-
- // Create header before we sent out the renew request.
- // This can make sure that the revision is strictly smaller or equal to
- // when the keepalive happened at the local server (when the local server is the leader)
- // or remote leader.
- // Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
- // at rev 4.
- resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
- ls.hdr.fill(resp.Header)
-
- ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
- if err == lease.ErrLeaseNotFound {
- err = nil
- ttl = 0
- }
-
- if err != nil {
- return togRPCError(err)
- }
-
- resp.TTL = ttl
- err = stream.Send(resp)
- if err != nil {
- if isClientCtxErr(stream.Context().Err(), err) {
- if ls.lg != nil {
- ls.lg.Debug("failed to send lease keepalive response to gRPC stream", zap.Error(err))
- } else {
- plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
- }
- } else {
- if ls.lg != nil {
- ls.lg.Warn("failed to send lease keepalive response to gRPC stream", zap.Error(err))
- } else {
- plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
- }
- streamFailures.WithLabelValues("send", "lease-keepalive").Inc()
- }
- return err
- }
- }
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/maintenance.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/maintenance.go
deleted file mode 100644
index c51271a..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/maintenance.go
+++ /dev/null
@@ -1,260 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import (
- "context"
- "crypto/sha256"
- "io"
-
- "go.etcd.io/etcd/auth"
- "go.etcd.io/etcd/etcdserver"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- "go.etcd.io/etcd/mvcc"
- "go.etcd.io/etcd/mvcc/backend"
- "go.etcd.io/etcd/raft"
- "go.etcd.io/etcd/version"
-
- "go.uber.org/zap"
-)
-
-type KVGetter interface {
- KV() mvcc.ConsistentWatchableKV
-}
-
-type BackendGetter interface {
- Backend() backend.Backend
-}
-
-type Alarmer interface {
- // Alarms is implemented in Server interface located in etcdserver/server.go
- // It returns a list of alarms present in the AlarmStore
- Alarms() []*pb.AlarmMember
- Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
-}
-
-type LeaderTransferrer interface {
- MoveLeader(ctx context.Context, lead, target uint64) error
-}
-
-type AuthGetter interface {
- AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error)
- AuthStore() auth.AuthStore
-}
-
-type ClusterStatusGetter interface {
- IsLearner() bool
-}
-
-type maintenanceServer struct {
- lg *zap.Logger
- rg etcdserver.RaftStatusGetter
- kg KVGetter
- bg BackendGetter
- a Alarmer
- lt LeaderTransferrer
- hdr header
- cs ClusterStatusGetter
-}
-
-func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
- srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s}
- return &authMaintenanceServer{srv, s}
-}
-
-func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
- if ms.lg != nil {
- ms.lg.Info("starting defragment")
- } else {
- plog.Noticef("starting to defragment the storage backend...")
- }
- err := ms.bg.Backend().Defrag()
- if err != nil {
- if ms.lg != nil {
- ms.lg.Warn("failed to defragment", zap.Error(err))
- } else {
- plog.Errorf("failed to defragment the storage backend (%v)", err)
- }
- return nil, err
- }
- if ms.lg != nil {
- ms.lg.Info("finished defragment")
- } else {
- plog.Noticef("finished defragmenting the storage backend")
- }
- return &pb.DefragmentResponse{}, nil
-}
-
-func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
- snap := ms.bg.Backend().Snapshot()
- pr, pw := io.Pipe()
-
- defer pr.Close()
-
- go func() {
- snap.WriteTo(pw)
- if err := snap.Close(); err != nil {
- if ms.lg != nil {
- ms.lg.Warn("failed to close snapshot", zap.Error(err))
- } else {
- plog.Errorf("error closing snapshot (%v)", err)
- }
- }
- pw.Close()
- }()
-
- // send file data
- h := sha256.New()
- br := int64(0)
- buf := make([]byte, 32*1024)
- sz := snap.Size()
- for br < sz {
- n, err := io.ReadFull(pr, buf)
- if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
- return togRPCError(err)
- }
- br += int64(n)
- resp := &pb.SnapshotResponse{
- RemainingBytes: uint64(sz - br),
- Blob: buf[:n],
- }
- if err = srv.Send(resp); err != nil {
- return togRPCError(err)
- }
- h.Write(buf[:n])
- }
-
- // send sha
- sha := h.Sum(nil)
- hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha}
- if err := srv.Send(hresp); err != nil {
- return togRPCError(err)
- }
-
- return nil
-}
-
-func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
- h, rev, err := ms.kg.KV().Hash()
- if err != nil {
- return nil, togRPCError(err)
- }
- resp := &pb.HashResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h}
- ms.hdr.fill(resp.Header)
- return resp, nil
-}
-
-func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
- h, rev, compactRev, err := ms.kg.KV().HashByRev(r.Revision)
- if err != nil {
- return nil, togRPCError(err)
- }
-
- resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev}
- ms.hdr.fill(resp.Header)
- return resp, nil
-}
-
-func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
- return ms.a.Alarm(ctx, ar)
-}
-
-func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
- hdr := &pb.ResponseHeader{}
- ms.hdr.fill(hdr)
- resp := &pb.StatusResponse{
- Header: hdr,
- Version: version.Version,
- Leader: uint64(ms.rg.Leader()),
- RaftIndex: ms.rg.CommittedIndex(),
- RaftAppliedIndex: ms.rg.AppliedIndex(),
- RaftTerm: ms.rg.Term(),
- DbSize: ms.bg.Backend().Size(),
- DbSizeInUse: ms.bg.Backend().SizeInUse(),
- IsLearner: ms.cs.IsLearner(),
- }
- if resp.Leader == raft.None {
- resp.Errors = append(resp.Errors, etcdserver.ErrNoLeader.Error())
- }
- for _, a := range ms.a.Alarms() {
- resp.Errors = append(resp.Errors, a.String())
- }
- return resp, nil
-}
-
-func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
- if ms.rg.ID() != ms.rg.Leader() {
- return nil, rpctypes.ErrGRPCNotLeader
- }
-
- if err := ms.lt.MoveLeader(ctx, uint64(ms.rg.Leader()), tr.TargetID); err != nil {
- return nil, togRPCError(err)
- }
- return &pb.MoveLeaderResponse{}, nil
-}
-
-type authMaintenanceServer struct {
- *maintenanceServer
- ag AuthGetter
-}
-
-func (ams *authMaintenanceServer) isAuthenticated(ctx context.Context) error {
- authInfo, err := ams.ag.AuthInfoFromCtx(ctx)
- if err != nil {
- return err
- }
-
- return ams.ag.AuthStore().IsAdminPermitted(authInfo)
-}
-
-func (ams *authMaintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
- if err := ams.isAuthenticated(ctx); err != nil {
- return nil, err
- }
-
- return ams.maintenanceServer.Defragment(ctx, sr)
-}
-
-func (ams *authMaintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
- if err := ams.isAuthenticated(srv.Context()); err != nil {
- return err
- }
-
- return ams.maintenanceServer.Snapshot(sr, srv)
-}
-
-func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
- if err := ams.isAuthenticated(ctx); err != nil {
- return nil, err
- }
-
- return ams.maintenanceServer.Hash(ctx, r)
-}
-
-func (ams *authMaintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
- if err := ams.isAuthenticated(ctx); err != nil {
- return nil, err
- }
- return ams.maintenanceServer.HashKV(ctx, r)
-}
-
-func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
- return ams.maintenanceServer.Status(ctx, ar)
-}
-
-func (ams *authMaintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
- return ams.maintenanceServer.MoveLeader(ctx, tr)
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/member.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/member.go
deleted file mode 100644
index b2ebc98..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/member.go
+++ /dev/null
@@ -1,119 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import (
- "context"
- "time"
-
- "go.etcd.io/etcd/etcdserver"
- "go.etcd.io/etcd/etcdserver/api"
- "go.etcd.io/etcd/etcdserver/api/membership"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- "go.etcd.io/etcd/pkg/types"
-)
-
-type ClusterServer struct {
- cluster api.Cluster
- server etcdserver.ServerV3
-}
-
-func NewClusterServer(s etcdserver.ServerV3) *ClusterServer {
- return &ClusterServer{
- cluster: s.Cluster(),
- server: s,
- }
-}
-
-func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) (*pb.MemberAddResponse, error) {
- urls, err := types.NewURLs(r.PeerURLs)
- if err != nil {
- return nil, rpctypes.ErrGRPCMemberBadURLs
- }
-
- now := time.Now()
- var m *membership.Member
- if r.IsLearner {
- m = membership.NewMemberAsLearner("", urls, "", &now)
- } else {
- m = membership.NewMember("", urls, "", &now)
- }
- membs, merr := cs.server.AddMember(ctx, *m)
- if merr != nil {
- return nil, togRPCError(merr)
- }
-
- return &pb.MemberAddResponse{
- Header: cs.header(),
- Member: &pb.Member{
- ID: uint64(m.ID),
- PeerURLs: m.PeerURLs,
- IsLearner: m.IsLearner,
- },
- Members: membersToProtoMembers(membs),
- }, nil
-}
-
-func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
- membs, err := cs.server.RemoveMember(ctx, r.ID)
- if err != nil {
- return nil, togRPCError(err)
- }
- return &pb.MemberRemoveResponse{Header: cs.header(), Members: membersToProtoMembers(membs)}, nil
-}
-
-func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) {
- m := membership.Member{
- ID: types.ID(r.ID),
- RaftAttributes: membership.RaftAttributes{PeerURLs: r.PeerURLs},
- }
- membs, err := cs.server.UpdateMember(ctx, m)
- if err != nil {
- return nil, togRPCError(err)
- }
- return &pb.MemberUpdateResponse{Header: cs.header(), Members: membersToProtoMembers(membs)}, nil
-}
-
-func (cs *ClusterServer) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) {
- membs := membersToProtoMembers(cs.cluster.Members())
- return &pb.MemberListResponse{Header: cs.header(), Members: membs}, nil
-}
-
-func (cs *ClusterServer) MemberPromote(ctx context.Context, r *pb.MemberPromoteRequest) (*pb.MemberPromoteResponse, error) {
- membs, err := cs.server.PromoteMember(ctx, r.ID)
- if err != nil {
- return nil, togRPCError(err)
- }
- return &pb.MemberPromoteResponse{Header: cs.header(), Members: membersToProtoMembers(membs)}, nil
-}
-
-func (cs *ClusterServer) header() *pb.ResponseHeader {
- return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.ID()), RaftTerm: cs.server.Term()}
-}
-
-func membersToProtoMembers(membs []*membership.Member) []*pb.Member {
- protoMembs := make([]*pb.Member, len(membs))
- for i := range membs {
- protoMembs[i] = &pb.Member{
- Name: membs[i].Name,
- ID: uint64(membs[i].ID),
- PeerURLs: membs[i].PeerURLs,
- ClientURLs: membs[i].ClientURLs,
- IsLearner: membs[i].IsLearner,
- }
- }
- return protoMembs
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/metrics.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/metrics.go
deleted file mode 100644
index d633d27..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/metrics.go
+++ /dev/null
@@ -1,48 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import "github.com/prometheus/client_golang/prometheus"
-
-var (
- sentBytes = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "etcd",
- Subsystem: "network",
- Name: "client_grpc_sent_bytes_total",
- Help: "The total number of bytes sent to grpc clients.",
- })
-
- receivedBytes = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "etcd",
- Subsystem: "network",
- Name: "client_grpc_received_bytes_total",
- Help: "The total number of bytes received from grpc clients.",
- })
-
- streamFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: "etcd",
- Subsystem: "network",
- Name: "server_stream_failures_total",
- Help: "The total number of stream failures from the local server.",
- },
- []string{"Type", "API"},
- )
-)
-
-func init() {
- prometheus.MustRegister(sentBytes)
- prometheus.MustRegister(receivedBytes)
- prometheus.MustRegister(streamFailures)
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/quota.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/quota.go
deleted file mode 100644
index a145b8b..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/quota.go
+++ /dev/null
@@ -1,90 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import (
- "context"
-
- "go.etcd.io/etcd/etcdserver"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- "go.etcd.io/etcd/pkg/types"
-)
-
-type quotaKVServer struct {
- pb.KVServer
- qa quotaAlarmer
-}
-
-type quotaAlarmer struct {
- q etcdserver.Quota
- a Alarmer
- id types.ID
-}
-
-// check whether request satisfies the quota. If there is not enough space,
-// ignore request and raise the free space alarm.
-func (qa *quotaAlarmer) check(ctx context.Context, r interface{}) error {
- if qa.q.Available(r) {
- return nil
- }
- req := &pb.AlarmRequest{
- MemberID: uint64(qa.id),
- Action: pb.AlarmRequest_ACTIVATE,
- Alarm: pb.AlarmType_NOSPACE,
- }
- qa.a.Alarm(ctx, req)
- return rpctypes.ErrGRPCNoSpace
-}
-
-func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
- return "aKVServer{
- NewKVServer(s),
- quotaAlarmer{etcdserver.NewBackendQuota(s, "kv"), s, s.ID()},
- }
-}
-
-func (s *quotaKVServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
- if err := s.qa.check(ctx, r); err != nil {
- return nil, err
- }
- return s.KVServer.Put(ctx, r)
-}
-
-func (s *quotaKVServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
- if err := s.qa.check(ctx, r); err != nil {
- return nil, err
- }
- return s.KVServer.Txn(ctx, r)
-}
-
-type quotaLeaseServer struct {
- pb.LeaseServer
- qa quotaAlarmer
-}
-
-func (s *quotaLeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
- if err := s.qa.check(ctx, cr); err != nil {
- return nil, err
- }
- return s.LeaseServer.LeaseGrant(ctx, cr)
-}
-
-func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
- return "aLeaseServer{
- NewLeaseServer(s),
- quotaAlarmer{etcdserver.NewBackendQuota(s, "lease"), s, s.ID()},
- }
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go
index e6a2814..bc1ad7b 100644
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go
@@ -40,9 +40,6 @@
ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
ErrGRPCMemberBadURLs = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err()
ErrGRPCMemberNotFound = status.New(codes.NotFound, "etcdserver: member not found").Err()
- ErrGRPCMemberNotLearner = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member").Err()
- ErrGRPCLearnerNotReady = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member which is in sync with leader").Err()
- ErrGRPCTooManyLearners = status.New(codes.FailedPrecondition, "etcdserver: too many learner members in cluster").Err()
ErrGRPCRequestTooLarge = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err()
ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err()
@@ -54,7 +51,6 @@
ErrGRPCUserNotFound = status.New(codes.FailedPrecondition, "etcdserver: user name not found").Err()
ErrGRPCRoleAlreadyExist = status.New(codes.FailedPrecondition, "etcdserver: role name already exists").Err()
ErrGRPCRoleNotFound = status.New(codes.FailedPrecondition, "etcdserver: role name not found").Err()
- ErrGRPCRoleEmpty = status.New(codes.InvalidArgument, "etcdserver: role name is empty").Err()
ErrGRPCAuthFailed = status.New(codes.InvalidArgument, "etcdserver: authentication failed, invalid user ID or password").Err()
ErrGRPCPermissionDenied = status.New(codes.PermissionDenied, "etcdserver: permission denied").Err()
ErrGRPCRoleNotGranted = status.New(codes.FailedPrecondition, "etcdserver: role is not granted to the user").Err()
@@ -73,8 +69,6 @@
ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
- ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
- ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err()
errStringToError = map[string]error{
ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
@@ -97,9 +91,6 @@
ErrorDesc(ErrGRPCMemberNotEnoughStarted): ErrGRPCMemberNotEnoughStarted,
ErrorDesc(ErrGRPCMemberBadURLs): ErrGRPCMemberBadURLs,
ErrorDesc(ErrGRPCMemberNotFound): ErrGRPCMemberNotFound,
- ErrorDesc(ErrGRPCMemberNotLearner): ErrGRPCMemberNotLearner,
- ErrorDesc(ErrGRPCLearnerNotReady): ErrGRPCLearnerNotReady,
- ErrorDesc(ErrGRPCTooManyLearners): ErrGRPCTooManyLearners,
ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge,
ErrorDesc(ErrGRPCRequestTooManyRequests): ErrGRPCRequestTooManyRequests,
@@ -111,7 +102,6 @@
ErrorDesc(ErrGRPCUserNotFound): ErrGRPCUserNotFound,
ErrorDesc(ErrGRPCRoleAlreadyExist): ErrGRPCRoleAlreadyExist,
ErrorDesc(ErrGRPCRoleNotFound): ErrGRPCRoleNotFound,
- ErrorDesc(ErrGRPCRoleEmpty): ErrGRPCRoleEmpty,
ErrorDesc(ErrGRPCAuthFailed): ErrGRPCAuthFailed,
ErrorDesc(ErrGRPCPermissionDenied): ErrGRPCPermissionDenied,
ErrorDesc(ErrGRPCRoleNotGranted): ErrGRPCRoleNotGranted,
@@ -122,7 +112,6 @@
ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
- ErrorDesc(ErrGRPCLeaderChanged): ErrGRPCLeaderChanged,
ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout,
@@ -130,8 +119,6 @@
ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost,
ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy,
ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt,
- ErrorDesc(ErrGPRCNotSupportedForLearner): ErrGPRCNotSupportedForLearner,
- ErrorDesc(ErrGRPCBadLeaderTransferee): ErrGRPCBadLeaderTransferee,
}
)
@@ -156,9 +143,6 @@
ErrMemberNotEnoughStarted = Error(ErrGRPCMemberNotEnoughStarted)
ErrMemberBadURLs = Error(ErrGRPCMemberBadURLs)
ErrMemberNotFound = Error(ErrGRPCMemberNotFound)
- ErrMemberNotLearner = Error(ErrGRPCMemberNotLearner)
- ErrMemberLearnerNotReady = Error(ErrGRPCLearnerNotReady)
- ErrTooManyLearners = Error(ErrGRPCTooManyLearners)
ErrRequestTooLarge = Error(ErrGRPCRequestTooLarge)
ErrTooManyRequests = Error(ErrGRPCRequestTooManyRequests)
@@ -170,7 +154,6 @@
ErrUserNotFound = Error(ErrGRPCUserNotFound)
ErrRoleAlreadyExist = Error(ErrGRPCRoleAlreadyExist)
ErrRoleNotFound = Error(ErrGRPCRoleNotFound)
- ErrRoleEmpty = Error(ErrGRPCRoleEmpty)
ErrAuthFailed = Error(ErrGRPCAuthFailed)
ErrPermissionDenied = Error(ErrGRPCPermissionDenied)
ErrRoleNotGranted = Error(ErrGRPCRoleNotGranted)
@@ -189,7 +172,6 @@
ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
ErrUnhealthy = Error(ErrGRPCUnhealthy)
ErrCorrupt = Error(ErrGRPCCorrupt)
- ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)
)
// EtcdError defines gRPC server errors.
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go
index 5c590e1..90b8b83 100644
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go
@@ -17,4 +17,6 @@
var (
MetadataRequireLeaderKey = "hasleader"
MetadataHasLeader = "true"
+
+ MetadataClientAPIVersionKey = "client-api-version"
)
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/util.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/util.go
deleted file mode 100644
index 281ddc7..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/util.go
+++ /dev/null
@@ -1,136 +0,0 @@
-// Copyright 2016 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import (
- "context"
- "strings"
-
- "go.etcd.io/etcd/auth"
- "go.etcd.io/etcd/etcdserver"
- "go.etcd.io/etcd/etcdserver/api/membership"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- "go.etcd.io/etcd/lease"
- "go.etcd.io/etcd/mvcc"
-
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-)
-
-var toGRPCErrorMap = map[error]error{
- membership.ErrIDRemoved: rpctypes.ErrGRPCMemberNotFound,
- membership.ErrIDNotFound: rpctypes.ErrGRPCMemberNotFound,
- membership.ErrIDExists: rpctypes.ErrGRPCMemberExist,
- membership.ErrPeerURLexists: rpctypes.ErrGRPCPeerURLExist,
- membership.ErrMemberNotLearner: rpctypes.ErrGRPCMemberNotLearner,
- membership.ErrTooManyLearners: rpctypes.ErrGRPCTooManyLearners,
- etcdserver.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted,
- etcdserver.ErrLearnerNotReady: rpctypes.ErrGRPCLearnerNotReady,
-
- mvcc.ErrCompacted: rpctypes.ErrGRPCCompacted,
- mvcc.ErrFutureRev: rpctypes.ErrGRPCFutureRev,
- etcdserver.ErrRequestTooLarge: rpctypes.ErrGRPCRequestTooLarge,
- etcdserver.ErrNoSpace: rpctypes.ErrGRPCNoSpace,
- etcdserver.ErrTooManyRequests: rpctypes.ErrTooManyRequests,
-
- etcdserver.ErrNoLeader: rpctypes.ErrGRPCNoLeader,
- etcdserver.ErrNotLeader: rpctypes.ErrGRPCNotLeader,
- etcdserver.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged,
- etcdserver.ErrStopped: rpctypes.ErrGRPCStopped,
- etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
- etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
- etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost,
- etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
- etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
- etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
- etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,
-
- lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
- lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,
- lease.ErrLeaseTTLTooLarge: rpctypes.ErrGRPCLeaseTTLTooLarge,
-
- auth.ErrRootUserNotExist: rpctypes.ErrGRPCRootUserNotExist,
- auth.ErrRootRoleNotExist: rpctypes.ErrGRPCRootRoleNotExist,
- auth.ErrUserAlreadyExist: rpctypes.ErrGRPCUserAlreadyExist,
- auth.ErrUserEmpty: rpctypes.ErrGRPCUserEmpty,
- auth.ErrUserNotFound: rpctypes.ErrGRPCUserNotFound,
- auth.ErrRoleAlreadyExist: rpctypes.ErrGRPCRoleAlreadyExist,
- auth.ErrRoleNotFound: rpctypes.ErrGRPCRoleNotFound,
- auth.ErrRoleEmpty: rpctypes.ErrGRPCRoleEmpty,
- auth.ErrAuthFailed: rpctypes.ErrGRPCAuthFailed,
- auth.ErrPermissionDenied: rpctypes.ErrGRPCPermissionDenied,
- auth.ErrRoleNotGranted: rpctypes.ErrGRPCRoleNotGranted,
- auth.ErrPermissionNotGranted: rpctypes.ErrGRPCPermissionNotGranted,
- auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled,
- auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken,
- auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt,
-}
-
-func togRPCError(err error) error {
- // let gRPC server convert to codes.Canceled, codes.DeadlineExceeded
- if err == context.Canceled || err == context.DeadlineExceeded {
- return err
- }
- grpcErr, ok := toGRPCErrorMap[err]
- if !ok {
- return status.Error(codes.Unknown, err.Error())
- }
- return grpcErr
-}
-
-func isClientCtxErr(ctxErr error, err error) bool {
- if ctxErr != nil {
- return true
- }
-
- ev, ok := status.FromError(err)
- if !ok {
- return false
- }
-
- switch ev.Code() {
- case codes.Canceled, codes.DeadlineExceeded:
- // client-side context cancel or deadline exceeded
- // "rpc error: code = Canceled desc = context canceled"
- // "rpc error: code = DeadlineExceeded desc = context deadline exceeded"
- return true
- case codes.Unavailable:
- msg := ev.Message()
- // client-side context cancel or deadline exceeded with TLS ("http2.errClientDisconnected")
- // "rpc error: code = Unavailable desc = client disconnected"
- if msg == "client disconnected" {
- return true
- }
- // "grpc/transport.ClientTransport.CloseStream" on canceled streams
- // "rpc error: code = Unavailable desc = stream error: stream ID 21; CANCEL")
- if strings.HasPrefix(msg, "stream error: ") && strings.HasSuffix(msg, "; CANCEL") {
- return true
- }
- }
- return false
-}
-
-// in v3.4, learner is allowed to serve serializable read and endpoint status
-func isRPCSupportedForLearner(req interface{}) bool {
- switch r := req.(type) {
- case *pb.StatusRequest:
- return true
- case *pb.RangeRequest:
- return r.Serializable
- default:
- return false
- }
-}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/watch.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/watch.go
deleted file mode 100644
index f41cb6c..0000000
--- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/watch.go
+++ /dev/null
@@ -1,584 +0,0 @@
-// Copyright 2015 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package v3rpc
-
-import (
- "context"
- "io"
- "math/rand"
- "sync"
- "time"
-
- "go.etcd.io/etcd/auth"
- "go.etcd.io/etcd/etcdserver"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- "go.etcd.io/etcd/mvcc"
- "go.etcd.io/etcd/mvcc/mvccpb"
-
- "go.uber.org/zap"
-)
-
-type watchServer struct {
- lg *zap.Logger
-
- clusterID int64
- memberID int64
-
- maxRequestBytes int
-
- sg etcdserver.RaftStatusGetter
- watchable mvcc.WatchableKV
- ag AuthGetter
-}
-
-// NewWatchServer returns a new watch server.
-func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
- return &watchServer{
- lg: s.Cfg.Logger,
-
- clusterID: int64(s.Cluster().ID()),
- memberID: int64(s.ID()),
-
- maxRequestBytes: int(s.Cfg.MaxRequestBytes + grpcOverheadBytes),
-
- sg: s,
- watchable: s.Watchable(),
- ag: s,
- }
-}
-
-var (
- // External test can read this with GetProgressReportInterval()
- // and change this to a small value to finish fast with
- // SetProgressReportInterval().
- progressReportInterval = 10 * time.Minute
- progressReportIntervalMu sync.RWMutex
-)
-
-// GetProgressReportInterval returns the current progress report interval (for testing).
-func GetProgressReportInterval() time.Duration {
- progressReportIntervalMu.RLock()
- interval := progressReportInterval
- progressReportIntervalMu.RUnlock()
-
- // add rand(1/10*progressReportInterval) as jitter so that etcdserver will not
- // send progress notifications to watchers around the same time even when watchers
- // are created around the same time (which is common when a client restarts itself).
- jitter := time.Duration(rand.Int63n(int64(interval) / 10))
-
- return interval + jitter
-}
-
-// SetProgressReportInterval updates the current progress report interval (for testing).
-func SetProgressReportInterval(newTimeout time.Duration) {
- progressReportIntervalMu.Lock()
- progressReportInterval = newTimeout
- progressReportIntervalMu.Unlock()
-}
-
-// We send ctrl response inside the read loop. We do not want
-// send to block read, but we still want ctrl response we sent to
-// be serialized. Thus we use a buffered chan to solve the problem.
-// A small buffer should be OK for most cases, since we expect the
-// ctrl requests are infrequent.
-const ctrlStreamBufLen = 16
-
-// serverWatchStream is an etcd server side stream. It receives requests
-// from client side gRPC stream. It receives watch events from mvcc.WatchStream,
-// and creates responses that forwarded to gRPC stream.
-// It also forwards control message like watch created and canceled.
-type serverWatchStream struct {
- lg *zap.Logger
-
- clusterID int64
- memberID int64
-
- maxRequestBytes int
-
- sg etcdserver.RaftStatusGetter
- watchable mvcc.WatchableKV
- ag AuthGetter
-
- gRPCStream pb.Watch_WatchServer
- watchStream mvcc.WatchStream
- ctrlStream chan *pb.WatchResponse
-
- // mu protects progress, prevKV, fragment
- mu sync.RWMutex
- // tracks the watchID that stream might need to send progress to
- // TODO: combine progress and prevKV into a single struct?
- progress map[mvcc.WatchID]bool
- // record watch IDs that need return previous key-value pair
- prevKV map[mvcc.WatchID]bool
- // records fragmented watch IDs
- fragment map[mvcc.WatchID]bool
-
- // closec indicates the stream is closed.
- closec chan struct{}
-
- // wg waits for the send loop to complete
- wg sync.WaitGroup
-}
-
-func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
- sws := serverWatchStream{
- lg: ws.lg,
-
- clusterID: ws.clusterID,
- memberID: ws.memberID,
-
- maxRequestBytes: ws.maxRequestBytes,
-
- sg: ws.sg,
- watchable: ws.watchable,
- ag: ws.ag,
-
- gRPCStream: stream,
- watchStream: ws.watchable.NewWatchStream(),
- // chan for sending control response like watcher created and canceled.
- ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
-
- progress: make(map[mvcc.WatchID]bool),
- prevKV: make(map[mvcc.WatchID]bool),
- fragment: make(map[mvcc.WatchID]bool),
-
- closec: make(chan struct{}),
- }
-
- sws.wg.Add(1)
- go func() {
- sws.sendLoop()
- sws.wg.Done()
- }()
-
- errc := make(chan error, 1)
- // Ideally recvLoop would also use sws.wg to signal its completion
- // but when stream.Context().Done() is closed, the stream's recv
- // may continue to block since it uses a different context, leading to
- // deadlock when calling sws.close().
- go func() {
- if rerr := sws.recvLoop(); rerr != nil {
- if isClientCtxErr(stream.Context().Err(), rerr) {
- if sws.lg != nil {
- sws.lg.Debug("failed to receive watch request from gRPC stream", zap.Error(rerr))
- } else {
- plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
- }
- } else {
- if sws.lg != nil {
- sws.lg.Warn("failed to receive watch request from gRPC stream", zap.Error(rerr))
- } else {
- plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
- }
- streamFailures.WithLabelValues("receive", "watch").Inc()
- }
- errc <- rerr
- }
- }()
-
- select {
- case err = <-errc:
- close(sws.ctrlStream)
-
- case <-stream.Context().Done():
- err = stream.Context().Err()
- // the only server-side cancellation is noleader for now.
- if err == context.Canceled {
- err = rpctypes.ErrGRPCNoLeader
- }
- }
-
- sws.close()
- return err
-}
-
-func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) bool {
- authInfo, err := sws.ag.AuthInfoFromCtx(sws.gRPCStream.Context())
- if err != nil {
- return false
- }
- if authInfo == nil {
- // if auth is enabled, IsRangePermitted() can cause an error
- authInfo = &auth.AuthInfo{}
- }
- return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEnd) == nil
-}
-
-func (sws *serverWatchStream) recvLoop() error {
- for {
- req, err := sws.gRPCStream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- switch uv := req.RequestUnion.(type) {
- case *pb.WatchRequest_CreateRequest:
- if uv.CreateRequest == nil {
- break
- }
-
- creq := uv.CreateRequest
- if len(creq.Key) == 0 {
- // \x00 is the smallest key
- creq.Key = []byte{0}
- }
- if len(creq.RangeEnd) == 0 {
- // force nil since watchstream.Watch distinguishes
- // between nil and []byte{} for single key / >=
- creq.RangeEnd = nil
- }
- if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 {
- // support >= key queries
- creq.RangeEnd = []byte{}
- }
-
- if !sws.isWatchPermitted(creq) {
- wr := &pb.WatchResponse{
- Header: sws.newResponseHeader(sws.watchStream.Rev()),
- WatchId: creq.WatchId,
- Canceled: true,
- Created: true,
- CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
- }
-
- select {
- case sws.ctrlStream <- wr:
- case <-sws.closec:
- }
- return nil
- }
-
- filters := FiltersFromRequest(creq)
-
- wsrev := sws.watchStream.Rev()
- rev := creq.StartRevision
- if rev == 0 {
- rev = wsrev + 1
- }
- id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
- if err == nil {
- sws.mu.Lock()
- if creq.ProgressNotify {
- sws.progress[id] = true
- }
- if creq.PrevKv {
- sws.prevKV[id] = true
- }
- if creq.Fragment {
- sws.fragment[id] = true
- }
- sws.mu.Unlock()
- }
- wr := &pb.WatchResponse{
- Header: sws.newResponseHeader(wsrev),
- WatchId: int64(id),
- Created: true,
- Canceled: err != nil,
- }
- if err != nil {
- wr.CancelReason = err.Error()
- }
- select {
- case sws.ctrlStream <- wr:
- case <-sws.closec:
- return nil
- }
-
- case *pb.WatchRequest_CancelRequest:
- if uv.CancelRequest != nil {
- id := uv.CancelRequest.WatchId
- err := sws.watchStream.Cancel(mvcc.WatchID(id))
- if err == nil {
- sws.ctrlStream <- &pb.WatchResponse{
- Header: sws.newResponseHeader(sws.watchStream.Rev()),
- WatchId: id,
- Canceled: true,
- }
- sws.mu.Lock()
- delete(sws.progress, mvcc.WatchID(id))
- delete(sws.prevKV, mvcc.WatchID(id))
- delete(sws.fragment, mvcc.WatchID(id))
- sws.mu.Unlock()
- }
- }
- case *pb.WatchRequest_ProgressRequest:
- if uv.ProgressRequest != nil {
- sws.ctrlStream <- &pb.WatchResponse{
- Header: sws.newResponseHeader(sws.watchStream.Rev()),
- WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
- }
- }
- default:
- // we probably should not shutdown the entire stream when
- // receive an valid command.
- // so just do nothing instead.
- continue
- }
- }
-}
-
-func (sws *serverWatchStream) sendLoop() {
- // watch ids that are currently active
- ids := make(map[mvcc.WatchID]struct{})
- // watch responses pending on a watch id creation message
- pending := make(map[mvcc.WatchID][]*pb.WatchResponse)
-
- interval := GetProgressReportInterval()
- progressTicker := time.NewTicker(interval)
-
- defer func() {
- progressTicker.Stop()
- // drain the chan to clean up pending events
- for ws := range sws.watchStream.Chan() {
- mvcc.ReportEventReceived(len(ws.Events))
- }
- for _, wrs := range pending {
- for _, ws := range wrs {
- mvcc.ReportEventReceived(len(ws.Events))
- }
- }
- }()
-
- for {
- select {
- case wresp, ok := <-sws.watchStream.Chan():
- if !ok {
- return
- }
-
- // TODO: evs is []mvccpb.Event type
- // either return []*mvccpb.Event from the mvcc package
- // or define protocol buffer with []mvccpb.Event.
- evs := wresp.Events
- events := make([]*mvccpb.Event, len(evs))
- sws.mu.RLock()
- needPrevKV := sws.prevKV[wresp.WatchID]
- sws.mu.RUnlock()
- for i := range evs {
- events[i] = &evs[i]
- if needPrevKV {
- opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
- r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
- if err == nil && len(r.KVs) != 0 {
- events[i].PrevKv = &(r.KVs[0])
- }
- }
- }
-
- canceled := wresp.CompactRevision != 0
- wr := &pb.WatchResponse{
- Header: sws.newResponseHeader(wresp.Revision),
- WatchId: int64(wresp.WatchID),
- Events: events,
- CompactRevision: wresp.CompactRevision,
- Canceled: canceled,
- }
-
- if _, okID := ids[wresp.WatchID]; !okID {
- // buffer if id not yet announced
- wrs := append(pending[wresp.WatchID], wr)
- pending[wresp.WatchID] = wrs
- continue
- }
-
- mvcc.ReportEventReceived(len(evs))
-
- sws.mu.RLock()
- fragmented, ok := sws.fragment[wresp.WatchID]
- sws.mu.RUnlock()
-
- var serr error
- if !fragmented && !ok {
- serr = sws.gRPCStream.Send(wr)
- } else {
- serr = sendFragments(wr, sws.maxRequestBytes, sws.gRPCStream.Send)
- }
-
- if serr != nil {
- if isClientCtxErr(sws.gRPCStream.Context().Err(), serr) {
- if sws.lg != nil {
- sws.lg.Debug("failed to send watch response to gRPC stream", zap.Error(serr))
- } else {
- plog.Debugf("failed to send watch response to gRPC stream (%q)", serr.Error())
- }
- } else {
- if sws.lg != nil {
- sws.lg.Warn("failed to send watch response to gRPC stream", zap.Error(serr))
- } else {
- plog.Warningf("failed to send watch response to gRPC stream (%q)", serr.Error())
- }
- streamFailures.WithLabelValues("send", "watch").Inc()
- }
- return
- }
-
- sws.mu.Lock()
- if len(evs) > 0 && sws.progress[wresp.WatchID] {
- // elide next progress update if sent a key update
- sws.progress[wresp.WatchID] = false
- }
- sws.mu.Unlock()
-
- case c, ok := <-sws.ctrlStream:
- if !ok {
- return
- }
-
- if err := sws.gRPCStream.Send(c); err != nil {
- if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
- if sws.lg != nil {
- sws.lg.Debug("failed to send watch control response to gRPC stream", zap.Error(err))
- } else {
- plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
- }
- } else {
- if sws.lg != nil {
- sws.lg.Warn("failed to send watch control response to gRPC stream", zap.Error(err))
- } else {
- plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
- }
- streamFailures.WithLabelValues("send", "watch").Inc()
- }
- return
- }
-
- // track id creation
- wid := mvcc.WatchID(c.WatchId)
- if c.Canceled {
- delete(ids, wid)
- continue
- }
- if c.Created {
- // flush buffered events
- ids[wid] = struct{}{}
- for _, v := range pending[wid] {
- mvcc.ReportEventReceived(len(v.Events))
- if err := sws.gRPCStream.Send(v); err != nil {
- if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
- if sws.lg != nil {
- sws.lg.Debug("failed to send pending watch response to gRPC stream", zap.Error(err))
- } else {
- plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
- }
- } else {
- if sws.lg != nil {
- sws.lg.Warn("failed to send pending watch response to gRPC stream", zap.Error(err))
- } else {
- plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
- }
- streamFailures.WithLabelValues("send", "watch").Inc()
- }
- return
- }
- }
- delete(pending, wid)
- }
-
- case <-progressTicker.C:
- sws.mu.Lock()
- for id, ok := range sws.progress {
- if ok {
- sws.watchStream.RequestProgress(id)
- }
- sws.progress[id] = true
- }
- sws.mu.Unlock()
-
- case <-sws.closec:
- return
- }
- }
-}
-
-func sendFragments(
- wr *pb.WatchResponse,
- maxRequestBytes int,
- sendFunc func(*pb.WatchResponse) error) error {
- // no need to fragment if total request size is smaller
- // than max request limit or response contains only one event
- if wr.Size() < maxRequestBytes || len(wr.Events) < 2 {
- return sendFunc(wr)
- }
-
- ow := *wr
- ow.Events = make([]*mvccpb.Event, 0)
- ow.Fragment = true
-
- var idx int
- for {
- cur := ow
- for _, ev := range wr.Events[idx:] {
- cur.Events = append(cur.Events, ev)
- if len(cur.Events) > 1 && cur.Size() >= maxRequestBytes {
- cur.Events = cur.Events[:len(cur.Events)-1]
- break
- }
- idx++
- }
- if idx == len(wr.Events) {
- // last response has no more fragment
- cur.Fragment = false
- }
- if err := sendFunc(&cur); err != nil {
- return err
- }
- if !cur.Fragment {
- break
- }
- }
- return nil
-}
-
-func (sws *serverWatchStream) close() {
- sws.watchStream.Close()
- close(sws.closec)
- sws.wg.Wait()
-}
-
-func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader {
- return &pb.ResponseHeader{
- ClusterId: uint64(sws.clusterID),
- MemberId: uint64(sws.memberID),
- Revision: rev,
- RaftTerm: sws.sg.Term(),
- }
-}
-
-func filterNoDelete(e mvccpb.Event) bool {
- return e.Type == mvccpb.DELETE
-}
-
-func filterNoPut(e mvccpb.Event) bool {
- return e.Type == mvccpb.PUT
-}
-
-// FiltersFromRequest returns "mvcc.FilterFunc" from a given watch create request.
-func FiltersFromRequest(creq *pb.WatchCreateRequest) []mvcc.FilterFunc {
- filters := make([]mvcc.FilterFunc, 0, len(creq.Filters))
- for _, ft := range creq.Filters {
- switch ft {
- case pb.WatchCreateRequest_NOPUT:
- filters = append(filters, filterNoPut)
- case pb.WatchCreateRequest_NODELETE:
- filters = append(filters, filterNoDelete)
- default:
- }
- }
- return filters
-}