blob: 9c168d78b0a4f34a64423a40169e3a13fb1ca736 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3rpc
16
17import (
18 "context"
19 "crypto/sha256"
20 "io"
21 "time"
22
23 "github.com/coreos/etcd/auth"
24 "github.com/coreos/etcd/etcdserver"
25 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
26 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
27 "github.com/coreos/etcd/mvcc"
28 "github.com/coreos/etcd/mvcc/backend"
29 "github.com/coreos/etcd/pkg/types"
30 "github.com/coreos/etcd/version"
31 "github.com/dustin/go-humanize"
32)
33
34type KVGetter interface {
35 KV() mvcc.ConsistentWatchableKV
36}
37
38type BackendGetter interface {
39 Backend() backend.Backend
40}
41
42type Alarmer interface {
43 Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
44}
45
46type LeaderTransferrer interface {
47 MoveLeader(ctx context.Context, lead, target uint64) error
48}
49
50type RaftStatusGetter interface {
51 etcdserver.RaftTimer
52 ID() types.ID
53 Leader() types.ID
54}
55
56type AuthGetter interface {
57 AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error)
58 AuthStore() auth.AuthStore
59}
60
61type maintenanceServer struct {
62 rg RaftStatusGetter
63 kg KVGetter
64 bg BackendGetter
65 a Alarmer
66 lt LeaderTransferrer
67 hdr header
68}
69
70func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
71 srv := &maintenanceServer{rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s)}
72 return &authMaintenanceServer{srv, s}
73}
74
75func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
76 plog.Noticef("starting to defragment the storage backend...")
77 err := ms.bg.Backend().Defrag()
78 if err != nil {
79 plog.Errorf("failed to defragment the storage backend (%v)", err)
80 return nil, err
81 }
82 plog.Noticef("finished defragmenting the storage backend")
83 return &pb.DefragmentResponse{}, nil
84}
85
86// big enough size to hold >1 OS pages in the buffer
87const snapshotSendBufferSize = 32 * 1024
88
89func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
90 snap := ms.bg.Backend().Snapshot()
91 pr, pw := io.Pipe()
92
93 defer pr.Close()
94
95 go func() {
96 snap.WriteTo(pw)
97 if err := snap.Close(); err != nil {
98 plog.Errorf("error closing snapshot (%v)", err)
99 }
100 pw.Close()
101 }()
102
103 // record SHA digest of snapshot data
104 // used for integrity checks during snapshot restore operation
105 h := sha256.New()
106
107 // buffer just holds read bytes from stream
108 // response size is multiple of OS page size, fetched in boltdb
109 // e.g. 4*1024
110 buf := make([]byte, snapshotSendBufferSize)
111
112 sent := int64(0)
113 total := snap.Size()
114 size := humanize.Bytes(uint64(total))
115
116 start := time.Now()
117 plog.Infof("sending database snapshot to client %s [%d bytes]", size, total)
118 for total-sent > 0 {
119 n, err := io.ReadFull(pr, buf)
120 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
121 return togRPCError(err)
122 }
123 sent += int64(n)
124
125 // if total is x * snapshotSendBufferSize. it is possible that
126 // resp.RemainingBytes == 0
127 // resp.Blob == zero byte but not nil
128 // does this make server response sent to client nil in proto
129 // and client stops receiving from snapshot stream before
130 // server sends snapshot SHA?
131 // No, the client will still receive non-nil response
132 // until server closes the stream with EOF
133
134 resp := &pb.SnapshotResponse{
135 RemainingBytes: uint64(total - sent),
136 Blob: buf[:n],
137 }
138 if err = srv.Send(resp); err != nil {
139 return togRPCError(err)
140 }
141 h.Write(buf[:n])
142 }
143
144 // send SHA digest for integrity checks
145 // during snapshot restore operation
146 sha := h.Sum(nil)
147
148 plog.Infof("sending database sha256 checksum to client [%d bytes]", len(sha))
149 hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha}
150 if err := srv.Send(hresp); err != nil {
151 return togRPCError(err)
152 }
153
154 plog.Infof("successfully sent database snapshot to client %s [%d bytes, took %s]", size, total, humanize.Time(start))
155 return nil
156}
157
158func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
159 h, rev, err := ms.kg.KV().Hash()
160 if err != nil {
161 return nil, togRPCError(err)
162 }
163 resp := &pb.HashResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h}
164 ms.hdr.fill(resp.Header)
165 return resp, nil
166}
167
168func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
169 h, rev, compactRev, err := ms.kg.KV().HashByRev(r.Revision)
170 if err != nil {
171 return nil, togRPCError(err)
172 }
173
174 resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev}
175 ms.hdr.fill(resp.Header)
176 return resp, nil
177}
178
179func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
180 return ms.a.Alarm(ctx, ar)
181}
182
183func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
184 resp := &pb.StatusResponse{
185 Header: &pb.ResponseHeader{Revision: ms.hdr.rev()},
186 Version: version.Version,
187 DbSize: ms.bg.Backend().Size(),
188 Leader: uint64(ms.rg.Leader()),
189 RaftIndex: ms.rg.Index(),
190 RaftTerm: ms.rg.Term(),
191 }
192 ms.hdr.fill(resp.Header)
193 return resp, nil
194}
195
196func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
197 if ms.rg.ID() != ms.rg.Leader() {
198 return nil, rpctypes.ErrGRPCNotLeader
199 }
200
201 if err := ms.lt.MoveLeader(ctx, uint64(ms.rg.Leader()), tr.TargetID); err != nil {
202 return nil, togRPCError(err)
203 }
204 return &pb.MoveLeaderResponse{}, nil
205}
206
207type authMaintenanceServer struct {
208 *maintenanceServer
209 ag AuthGetter
210}
211
212func (ams *authMaintenanceServer) isAuthenticated(ctx context.Context) error {
213 authInfo, err := ams.ag.AuthInfoFromCtx(ctx)
214 if err != nil {
215 return err
216 }
217
218 return ams.ag.AuthStore().IsAdminPermitted(authInfo)
219}
220
221func (ams *authMaintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
222 if err := ams.isAuthenticated(ctx); err != nil {
223 return nil, err
224 }
225
226 return ams.maintenanceServer.Defragment(ctx, sr)
227}
228
229func (ams *authMaintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
230 if err := ams.isAuthenticated(srv.Context()); err != nil {
231 return err
232 }
233
234 return ams.maintenanceServer.Snapshot(sr, srv)
235}
236
237func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
238 if err := ams.isAuthenticated(ctx); err != nil {
239 return nil, err
240 }
241
242 return ams.maintenanceServer.Hash(ctx, r)
243}
244
245func (ams *authMaintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
246 if err := ams.isAuthenticated(ctx); err != nil {
247 return nil, err
248 }
249 return ams.maintenanceServer.HashKV(ctx, r)
250}
251
252func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
253 return ams.maintenanceServer.Status(ctx, ar)
254}
255
256func (ams *authMaintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
257 return ams.maintenanceServer.MoveLeader(ctx, tr)
258}