[VOL-2235] Mocks and interfaces for rw-core
This update consists of mocks that are used by the rw-core
during unit testing. It also includes interfaces used for unit
tests.
Change-Id: I20ca1455c358113c3aa897acc6355e0ddbc614b7
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/maintenance.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/maintenance.go
new file mode 100644
index 0000000..c51271a
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/maintenance.go
@@ -0,0 +1,260 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3rpc
+
+import (
+ "context"
+ "crypto/sha256"
+ "io"
+
+ "go.etcd.io/etcd/auth"
+ "go.etcd.io/etcd/etcdserver"
+ "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/mvcc"
+ "go.etcd.io/etcd/mvcc/backend"
+ "go.etcd.io/etcd/raft"
+ "go.etcd.io/etcd/version"
+
+ "go.uber.org/zap"
+)
+
+type KVGetter interface {
+ KV() mvcc.ConsistentWatchableKV
+}
+
+type BackendGetter interface {
+ Backend() backend.Backend
+}
+
+type Alarmer interface {
+ // Alarms is implemented in Server interface located in etcdserver/server.go
+ // It returns a list of alarms present in the AlarmStore
+ Alarms() []*pb.AlarmMember
+ Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
+}
+
+type LeaderTransferrer interface {
+ MoveLeader(ctx context.Context, lead, target uint64) error
+}
+
+type AuthGetter interface {
+ AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error)
+ AuthStore() auth.AuthStore
+}
+
+type ClusterStatusGetter interface {
+ IsLearner() bool
+}
+
+type maintenanceServer struct {
+ lg *zap.Logger
+ rg etcdserver.RaftStatusGetter
+ kg KVGetter
+ bg BackendGetter
+ a Alarmer
+ lt LeaderTransferrer
+ hdr header
+ cs ClusterStatusGetter
+}
+
+func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
+ srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s}
+ return &authMaintenanceServer{srv, s}
+}
+
+func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
+ if ms.lg != nil {
+ ms.lg.Info("starting defragment")
+ } else {
+ plog.Noticef("starting to defragment the storage backend...")
+ }
+ err := ms.bg.Backend().Defrag()
+ if err != nil {
+ if ms.lg != nil {
+ ms.lg.Warn("failed to defragment", zap.Error(err))
+ } else {
+ plog.Errorf("failed to defragment the storage backend (%v)", err)
+ }
+ return nil, err
+ }
+ if ms.lg != nil {
+ ms.lg.Info("finished defragment")
+ } else {
+ plog.Noticef("finished defragmenting the storage backend")
+ }
+ return &pb.DefragmentResponse{}, nil
+}
+
+func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
+ snap := ms.bg.Backend().Snapshot()
+ pr, pw := io.Pipe()
+
+ defer pr.Close()
+
+ go func() {
+ snap.WriteTo(pw)
+ if err := snap.Close(); err != nil {
+ if ms.lg != nil {
+ ms.lg.Warn("failed to close snapshot", zap.Error(err))
+ } else {
+ plog.Errorf("error closing snapshot (%v)", err)
+ }
+ }
+ pw.Close()
+ }()
+
+ // send file data
+ h := sha256.New()
+ br := int64(0)
+ buf := make([]byte, 32*1024)
+ sz := snap.Size()
+ for br < sz {
+ n, err := io.ReadFull(pr, buf)
+ if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
+ return togRPCError(err)
+ }
+ br += int64(n)
+ resp := &pb.SnapshotResponse{
+ RemainingBytes: uint64(sz - br),
+ Blob: buf[:n],
+ }
+ if err = srv.Send(resp); err != nil {
+ return togRPCError(err)
+ }
+ h.Write(buf[:n])
+ }
+
+ // send sha
+ sha := h.Sum(nil)
+ hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha}
+ if err := srv.Send(hresp); err != nil {
+ return togRPCError(err)
+ }
+
+ return nil
+}
+
+func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
+ h, rev, err := ms.kg.KV().Hash()
+ if err != nil {
+ return nil, togRPCError(err)
+ }
+ resp := &pb.HashResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h}
+ ms.hdr.fill(resp.Header)
+ return resp, nil
+}
+
+func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
+ h, rev, compactRev, err := ms.kg.KV().HashByRev(r.Revision)
+ if err != nil {
+ return nil, togRPCError(err)
+ }
+
+ resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev}
+ ms.hdr.fill(resp.Header)
+ return resp, nil
+}
+
+func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
+ return ms.a.Alarm(ctx, ar)
+}
+
+func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
+ hdr := &pb.ResponseHeader{}
+ ms.hdr.fill(hdr)
+ resp := &pb.StatusResponse{
+ Header: hdr,
+ Version: version.Version,
+ Leader: uint64(ms.rg.Leader()),
+ RaftIndex: ms.rg.CommittedIndex(),
+ RaftAppliedIndex: ms.rg.AppliedIndex(),
+ RaftTerm: ms.rg.Term(),
+ DbSize: ms.bg.Backend().Size(),
+ DbSizeInUse: ms.bg.Backend().SizeInUse(),
+ IsLearner: ms.cs.IsLearner(),
+ }
+ if resp.Leader == raft.None {
+ resp.Errors = append(resp.Errors, etcdserver.ErrNoLeader.Error())
+ }
+ for _, a := range ms.a.Alarms() {
+ resp.Errors = append(resp.Errors, a.String())
+ }
+ return resp, nil
+}
+
+func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
+ if ms.rg.ID() != ms.rg.Leader() {
+ return nil, rpctypes.ErrGRPCNotLeader
+ }
+
+ if err := ms.lt.MoveLeader(ctx, uint64(ms.rg.Leader()), tr.TargetID); err != nil {
+ return nil, togRPCError(err)
+ }
+ return &pb.MoveLeaderResponse{}, nil
+}
+
+type authMaintenanceServer struct {
+ *maintenanceServer
+ ag AuthGetter
+}
+
+func (ams *authMaintenanceServer) isAuthenticated(ctx context.Context) error {
+ authInfo, err := ams.ag.AuthInfoFromCtx(ctx)
+ if err != nil {
+ return err
+ }
+
+ return ams.ag.AuthStore().IsAdminPermitted(authInfo)
+}
+
+func (ams *authMaintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
+ if err := ams.isAuthenticated(ctx); err != nil {
+ return nil, err
+ }
+
+ return ams.maintenanceServer.Defragment(ctx, sr)
+}
+
+func (ams *authMaintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
+ if err := ams.isAuthenticated(srv.Context()); err != nil {
+ return err
+ }
+
+ return ams.maintenanceServer.Snapshot(sr, srv)
+}
+
+func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
+ if err := ams.isAuthenticated(ctx); err != nil {
+ return nil, err
+ }
+
+ return ams.maintenanceServer.Hash(ctx, r)
+}
+
+func (ams *authMaintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
+ if err := ams.isAuthenticated(ctx); err != nil {
+ return nil, err
+ }
+ return ams.maintenanceServer.HashKV(ctx, r)
+}
+
+func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
+ return ams.maintenanceServer.Status(ctx, ar)
+}
+
+func (ams *authMaintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
+ return ams.maintenanceServer.MoveLeader(ctx, tr)
+}