blob: c51271ac0fe397c6553e59caf23cdc67a79939f2 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3rpc
16
17import (
18 "context"
19 "crypto/sha256"
20 "io"
21
22 "go.etcd.io/etcd/auth"
23 "go.etcd.io/etcd/etcdserver"
24 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
25 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
26 "go.etcd.io/etcd/mvcc"
27 "go.etcd.io/etcd/mvcc/backend"
28 "go.etcd.io/etcd/raft"
29 "go.etcd.io/etcd/version"
30
31 "go.uber.org/zap"
32)
33
34type KVGetter interface {
35 KV() mvcc.ConsistentWatchableKV
36}
37
38type BackendGetter interface {
39 Backend() backend.Backend
40}
41
42type Alarmer interface {
43 // Alarms is implemented in Server interface located in etcdserver/server.go
44 // It returns a list of alarms present in the AlarmStore
45 Alarms() []*pb.AlarmMember
46 Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
47}
48
49type LeaderTransferrer interface {
50 MoveLeader(ctx context.Context, lead, target uint64) error
51}
52
53type AuthGetter interface {
54 AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error)
55 AuthStore() auth.AuthStore
56}
57
58type ClusterStatusGetter interface {
59 IsLearner() bool
60}
61
62type maintenanceServer struct {
63 lg *zap.Logger
64 rg etcdserver.RaftStatusGetter
65 kg KVGetter
66 bg BackendGetter
67 a Alarmer
68 lt LeaderTransferrer
69 hdr header
70 cs ClusterStatusGetter
71}
72
73func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
74 srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s}
75 return &authMaintenanceServer{srv, s}
76}
77
78func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
79 if ms.lg != nil {
80 ms.lg.Info("starting defragment")
81 } else {
82 plog.Noticef("starting to defragment the storage backend...")
83 }
84 err := ms.bg.Backend().Defrag()
85 if err != nil {
86 if ms.lg != nil {
87 ms.lg.Warn("failed to defragment", zap.Error(err))
88 } else {
89 plog.Errorf("failed to defragment the storage backend (%v)", err)
90 }
91 return nil, err
92 }
93 if ms.lg != nil {
94 ms.lg.Info("finished defragment")
95 } else {
96 plog.Noticef("finished defragmenting the storage backend")
97 }
98 return &pb.DefragmentResponse{}, nil
99}
100
101func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
102 snap := ms.bg.Backend().Snapshot()
103 pr, pw := io.Pipe()
104
105 defer pr.Close()
106
107 go func() {
108 snap.WriteTo(pw)
109 if err := snap.Close(); err != nil {
110 if ms.lg != nil {
111 ms.lg.Warn("failed to close snapshot", zap.Error(err))
112 } else {
113 plog.Errorf("error closing snapshot (%v)", err)
114 }
115 }
116 pw.Close()
117 }()
118
119 // send file data
120 h := sha256.New()
121 br := int64(0)
122 buf := make([]byte, 32*1024)
123 sz := snap.Size()
124 for br < sz {
125 n, err := io.ReadFull(pr, buf)
126 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
127 return togRPCError(err)
128 }
129 br += int64(n)
130 resp := &pb.SnapshotResponse{
131 RemainingBytes: uint64(sz - br),
132 Blob: buf[:n],
133 }
134 if err = srv.Send(resp); err != nil {
135 return togRPCError(err)
136 }
137 h.Write(buf[:n])
138 }
139
140 // send sha
141 sha := h.Sum(nil)
142 hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha}
143 if err := srv.Send(hresp); err != nil {
144 return togRPCError(err)
145 }
146
147 return nil
148}
149
150func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
151 h, rev, err := ms.kg.KV().Hash()
152 if err != nil {
153 return nil, togRPCError(err)
154 }
155 resp := &pb.HashResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h}
156 ms.hdr.fill(resp.Header)
157 return resp, nil
158}
159
160func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
161 h, rev, compactRev, err := ms.kg.KV().HashByRev(r.Revision)
162 if err != nil {
163 return nil, togRPCError(err)
164 }
165
166 resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev}
167 ms.hdr.fill(resp.Header)
168 return resp, nil
169}
170
171func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
172 return ms.a.Alarm(ctx, ar)
173}
174
175func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
176 hdr := &pb.ResponseHeader{}
177 ms.hdr.fill(hdr)
178 resp := &pb.StatusResponse{
179 Header: hdr,
180 Version: version.Version,
181 Leader: uint64(ms.rg.Leader()),
182 RaftIndex: ms.rg.CommittedIndex(),
183 RaftAppliedIndex: ms.rg.AppliedIndex(),
184 RaftTerm: ms.rg.Term(),
185 DbSize: ms.bg.Backend().Size(),
186 DbSizeInUse: ms.bg.Backend().SizeInUse(),
187 IsLearner: ms.cs.IsLearner(),
188 }
189 if resp.Leader == raft.None {
190 resp.Errors = append(resp.Errors, etcdserver.ErrNoLeader.Error())
191 }
192 for _, a := range ms.a.Alarms() {
193 resp.Errors = append(resp.Errors, a.String())
194 }
195 return resp, nil
196}
197
198func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
199 if ms.rg.ID() != ms.rg.Leader() {
200 return nil, rpctypes.ErrGRPCNotLeader
201 }
202
203 if err := ms.lt.MoveLeader(ctx, uint64(ms.rg.Leader()), tr.TargetID); err != nil {
204 return nil, togRPCError(err)
205 }
206 return &pb.MoveLeaderResponse{}, nil
207}
208
209type authMaintenanceServer struct {
210 *maintenanceServer
211 ag AuthGetter
212}
213
214func (ams *authMaintenanceServer) isAuthenticated(ctx context.Context) error {
215 authInfo, err := ams.ag.AuthInfoFromCtx(ctx)
216 if err != nil {
217 return err
218 }
219
220 return ams.ag.AuthStore().IsAdminPermitted(authInfo)
221}
222
223func (ams *authMaintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
224 if err := ams.isAuthenticated(ctx); err != nil {
225 return nil, err
226 }
227
228 return ams.maintenanceServer.Defragment(ctx, sr)
229}
230
231func (ams *authMaintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
232 if err := ams.isAuthenticated(srv.Context()); err != nil {
233 return err
234 }
235
236 return ams.maintenanceServer.Snapshot(sr, srv)
237}
238
239func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
240 if err := ams.isAuthenticated(ctx); err != nil {
241 return nil, err
242 }
243
244 return ams.maintenanceServer.Hash(ctx, r)
245}
246
247func (ams *authMaintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
248 if err := ams.isAuthenticated(ctx); err != nil {
249 return nil, err
250 }
251 return ams.maintenanceServer.HashKV(ctx, r)
252}
253
254func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
255 return ams.maintenanceServer.Status(ctx, ar)
256}
257
258func (ams *authMaintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
259 return ams.maintenanceServer.MoveLeader(ctx, tr)
260}