blob: 5e87cf8f3ed3ed7aa717a5b03f1fd9b32d0fbac6 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "fmt"
20 "io"
21
22 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
23
24 "google.golang.org/grpc"
25)
26
27type (
28 DefragmentResponse pb.DefragmentResponse
29 AlarmResponse pb.AlarmResponse
30 AlarmMember pb.AlarmMember
31 StatusResponse pb.StatusResponse
32 HashKVResponse pb.HashKVResponse
33 MoveLeaderResponse pb.MoveLeaderResponse
34)
35
36type Maintenance interface {
37 // AlarmList gets all active alarms.
38 AlarmList(ctx context.Context) (*AlarmResponse, error)
39
40 // AlarmDisarm disarms a given alarm.
41 AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error)
42
43 // Defragment releases wasted space from internal fragmentation on a given etcd member.
44 // Defragment is only needed when deleting a large number of keys and want to reclaim
45 // the resources.
46 // Defragment is an expensive operation. User should avoid defragmenting multiple members
47 // at the same time.
48 // To defragment multiple members in the cluster, user need to call defragment multiple
49 // times with different endpoints.
50 Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error)
51
52 // Status gets the status of the endpoint.
53 Status(ctx context.Context, endpoint string) (*StatusResponse, error)
54
55 // HashKV returns a hash of the KV state at the time of the RPC.
56 // If revision is zero, the hash is computed on all keys. If the revision
57 // is non-zero, the hash is computed on all keys at or below the given revision.
58 HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)
59
60 // Snapshot provides a reader for a point-in-time snapshot of etcd.
61 // If the context "ctx" is canceled or timed out, reading from returned
62 // "io.ReadCloser" would error out (e.g. context.Canceled, context.DeadlineExceeded).
63 Snapshot(ctx context.Context) (io.ReadCloser, error)
64
65 // MoveLeader requests current leader to transfer its leadership to the transferee.
66 // Request must be made to the leader.
67 MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error)
68}
69
70type maintenance struct {
71 dial func(endpoint string) (pb.MaintenanceClient, func(), error)
72 remote pb.MaintenanceClient
73 callOpts []grpc.CallOption
74}
75
76func NewMaintenance(c *Client) Maintenance {
77 api := &maintenance{
78 dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
79 conn, err := c.Dial(endpoint)
80 if err != nil {
81 return nil, nil, fmt.Errorf("failed to dial endpoint %s with maintenance client: %v", endpoint, err)
82 }
83 cancel := func() { conn.Close() }
84 return RetryMaintenanceClient(c, conn), cancel, nil
85 },
86 remote: RetryMaintenanceClient(c, c.conn),
87 }
88 if c != nil {
89 api.callOpts = c.callOpts
90 }
91 return api
92}
93
94func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
95 api := &maintenance{
96 dial: func(string) (pb.MaintenanceClient, func(), error) {
97 return remote, func() {}, nil
98 },
99 remote: remote,
100 }
101 if c != nil {
102 api.callOpts = c.callOpts
103 }
104 return api
105}
106
107func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
108 req := &pb.AlarmRequest{
109 Action: pb.AlarmRequest_GET,
110 MemberID: 0, // all
111 Alarm: pb.AlarmType_NONE, // all
112 }
113 resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
114 if err == nil {
115 return (*AlarmResponse)(resp), nil
116 }
117 return nil, toErr(ctx, err)
118}
119
120func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
121 req := &pb.AlarmRequest{
122 Action: pb.AlarmRequest_DEACTIVATE,
123 MemberID: am.MemberID,
124 Alarm: am.Alarm,
125 }
126
127 if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
128 ar, err := m.AlarmList(ctx)
129 if err != nil {
130 return nil, toErr(ctx, err)
131 }
132 ret := AlarmResponse{}
133 for _, am := range ar.Alarms {
134 dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
135 if derr != nil {
136 return nil, toErr(ctx, derr)
137 }
138 ret.Alarms = append(ret.Alarms, dresp.Alarms...)
139 }
140 return &ret, nil
141 }
142
143 resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
144 if err == nil {
145 return (*AlarmResponse)(resp), nil
146 }
147 return nil, toErr(ctx, err)
148}
149
150func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
151 remote, cancel, err := m.dial(endpoint)
152 if err != nil {
153 return nil, toErr(ctx, err)
154 }
155 defer cancel()
156 resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
157 if err != nil {
158 return nil, toErr(ctx, err)
159 }
160 return (*DefragmentResponse)(resp), nil
161}
162
163func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
164 remote, cancel, err := m.dial(endpoint)
165 if err != nil {
166 return nil, toErr(ctx, err)
167 }
168 defer cancel()
169 resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
170 if err != nil {
171 return nil, toErr(ctx, err)
172 }
173 return (*StatusResponse)(resp), nil
174}
175
176func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
177 remote, cancel, err := m.dial(endpoint)
178 if err != nil {
179
180 return nil, toErr(ctx, err)
181 }
182 defer cancel()
183 resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...)
184 if err != nil {
185 return nil, toErr(ctx, err)
186 }
187 return (*HashKVResponse)(resp), nil
188}
189
190func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
191 ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
192 if err != nil {
193 return nil, toErr(ctx, err)
194 }
195
196 plog.Info("opened snapshot stream; downloading")
197 pr, pw := io.Pipe()
198 go func() {
199 for {
200 resp, err := ss.Recv()
201 if err != nil {
202 switch err {
203 case io.EOF:
204 plog.Info("completed snapshot read; closing")
205 default:
206 plog.Warningf("failed to receive from snapshot stream; closing (%v)", err)
207 }
208 pw.CloseWithError(err)
209 return
210 }
211
212 // can "resp == nil && err == nil"
213 // before we receive snapshot SHA digest?
214 // No, server sends EOF with an empty response
215 // after it sends SHA digest at the end
216
217 if _, werr := pw.Write(resp.Blob); werr != nil {
218 pw.CloseWithError(werr)
219 return
220 }
221 }
222 }()
223 return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
224}
225
226type snapshotReadCloser struct {
227 ctx context.Context
228 io.ReadCloser
229}
230
231func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) {
232 n, err = rc.ReadCloser.Read(p)
233 return n, toErr(rc.ctx, err)
234}
235
236func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
237 resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...)
238 return (*MoveLeaderResponse)(resp), toErr(ctx, err)
239}