blob: c9df1800db28a19454b9ac2645e2488380fb8a79 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3rpc
16
17import (
18 "context"
19 "crypto/sha256"
20 "io"
21
22 "github.com/coreos/etcd/auth"
23 "github.com/coreos/etcd/etcdserver"
24 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
25 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
26 "github.com/coreos/etcd/mvcc"
27 "github.com/coreos/etcd/mvcc/backend"
28 "github.com/coreos/etcd/pkg/types"
29 "github.com/coreos/etcd/version"
30)
31
32type KVGetter interface {
33 KV() mvcc.ConsistentWatchableKV
34}
35
36type BackendGetter interface {
37 Backend() backend.Backend
38}
39
40type Alarmer interface {
41 Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
42}
43
44type LeaderTransferrer interface {
45 MoveLeader(ctx context.Context, lead, target uint64) error
46}
47
48type RaftStatusGetter interface {
49 etcdserver.RaftTimer
50 ID() types.ID
51 Leader() types.ID
52}
53
54type AuthGetter interface {
55 AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error)
56 AuthStore() auth.AuthStore
57}
58
59type maintenanceServer struct {
60 rg RaftStatusGetter
61 kg KVGetter
62 bg BackendGetter
63 a Alarmer
64 lt LeaderTransferrer
65 hdr header
66}
67
68func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
69 srv := &maintenanceServer{rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s)}
70 return &authMaintenanceServer{srv, s}
71}
72
73func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
74 plog.Noticef("starting to defragment the storage backend...")
75 err := ms.bg.Backend().Defrag()
76 if err != nil {
77 plog.Errorf("failed to defragment the storage backend (%v)", err)
78 return nil, err
79 }
80 plog.Noticef("finished defragmenting the storage backend")
81 return &pb.DefragmentResponse{}, nil
82}
83
84func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
85 snap := ms.bg.Backend().Snapshot()
86 pr, pw := io.Pipe()
87
88 defer pr.Close()
89
90 go func() {
91 snap.WriteTo(pw)
92 if err := snap.Close(); err != nil {
93 plog.Errorf("error closing snapshot (%v)", err)
94 }
95 pw.Close()
96 }()
97
98 // send file data
99 h := sha256.New()
100 br := int64(0)
101 buf := make([]byte, 32*1024)
102 sz := snap.Size()
103 for br < sz {
104 n, err := io.ReadFull(pr, buf)
105 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
106 return togRPCError(err)
107 }
108 br += int64(n)
109 resp := &pb.SnapshotResponse{
110 RemainingBytes: uint64(sz - br),
111 Blob: buf[:n],
112 }
113 if err = srv.Send(resp); err != nil {
114 return togRPCError(err)
115 }
116 h.Write(buf[:n])
117 }
118
119 // send sha
120 sha := h.Sum(nil)
121 hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha}
122 if err := srv.Send(hresp); err != nil {
123 return togRPCError(err)
124 }
125
126 return nil
127}
128
129func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
130 h, rev, err := ms.kg.KV().Hash()
131 if err != nil {
132 return nil, togRPCError(err)
133 }
134 resp := &pb.HashResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h}
135 ms.hdr.fill(resp.Header)
136 return resp, nil
137}
138
139func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
140 h, rev, compactRev, err := ms.kg.KV().HashByRev(r.Revision)
141 if err != nil {
142 return nil, togRPCError(err)
143 }
144
145 resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev}
146 ms.hdr.fill(resp.Header)
147 return resp, nil
148}
149
150func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
151 return ms.a.Alarm(ctx, ar)
152}
153
154func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
155 resp := &pb.StatusResponse{
156 Header: &pb.ResponseHeader{Revision: ms.hdr.rev()},
157 Version: version.Version,
158 DbSize: ms.bg.Backend().Size(),
159 Leader: uint64(ms.rg.Leader()),
160 RaftIndex: ms.rg.Index(),
161 RaftTerm: ms.rg.Term(),
162 }
163 ms.hdr.fill(resp.Header)
164 return resp, nil
165}
166
167func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
168 if ms.rg.ID() != ms.rg.Leader() {
169 return nil, rpctypes.ErrGRPCNotLeader
170 }
171
172 if err := ms.lt.MoveLeader(ctx, uint64(ms.rg.Leader()), tr.TargetID); err != nil {
173 return nil, togRPCError(err)
174 }
175 return &pb.MoveLeaderResponse{}, nil
176}
177
178type authMaintenanceServer struct {
179 *maintenanceServer
180 ag AuthGetter
181}
182
183func (ams *authMaintenanceServer) isAuthenticated(ctx context.Context) error {
184 authInfo, err := ams.ag.AuthInfoFromCtx(ctx)
185 if err != nil {
186 return err
187 }
188
189 return ams.ag.AuthStore().IsAdminPermitted(authInfo)
190}
191
192func (ams *authMaintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
193 if err := ams.isAuthenticated(ctx); err != nil {
194 return nil, err
195 }
196
197 return ams.maintenanceServer.Defragment(ctx, sr)
198}
199
200func (ams *authMaintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
201 if err := ams.isAuthenticated(srv.Context()); err != nil {
202 return err
203 }
204
205 return ams.maintenanceServer.Snapshot(sr, srv)
206}
207
208func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
209 if err := ams.isAuthenticated(ctx); err != nil {
210 return nil, err
211 }
212
213 return ams.maintenanceServer.Hash(ctx, r)
214}
215
216func (ams *authMaintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
217 if err := ams.isAuthenticated(ctx); err != nil {
218 return nil, err
219 }
220 return ams.maintenanceServer.HashKV(ctx, r)
221}
222
223func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
224 return ams.maintenanceServer.Status(ctx, ar)
225}
226
227func (ams *authMaintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
228 return ams.maintenanceServer.MoveLeader(ctx, tr)
229}