blob: f60cfbe47193f959e45c50b1bcc01f5bf5d2bfc3 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "io"
20
21 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
22
23 "google.golang.org/grpc"
24)
25
26type (
27 DefragmentResponse pb.DefragmentResponse
28 AlarmResponse pb.AlarmResponse
29 AlarmMember pb.AlarmMember
30 StatusResponse pb.StatusResponse
31 HashKVResponse pb.HashKVResponse
32 MoveLeaderResponse pb.MoveLeaderResponse
33)
34
35type Maintenance interface {
36 // AlarmList gets all active alarms.
37 AlarmList(ctx context.Context) (*AlarmResponse, error)
38
39 // AlarmDisarm disarms a given alarm.
40 AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error)
41
42 // Defragment releases wasted space from internal fragmentation on a given etcd member.
43 // Defragment is only needed when deleting a large number of keys and want to reclaim
44 // the resources.
45 // Defragment is an expensive operation. User should avoid defragmenting multiple members
46 // at the same time.
47 // To defragment multiple members in the cluster, user need to call defragment multiple
48 // times with different endpoints.
49 Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error)
50
51 // Status gets the status of the endpoint.
52 Status(ctx context.Context, endpoint string) (*StatusResponse, error)
53
54 // HashKV returns a hash of the KV state at the time of the RPC.
55 // If revision is zero, the hash is computed on all keys. If the revision
56 // is non-zero, the hash is computed on all keys at or below the given revision.
57 HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)
58
59 // Snapshot provides a reader for a point-in-time snapshot of etcd.
60 Snapshot(ctx context.Context) (io.ReadCloser, error)
61
62 // MoveLeader requests current leader to transfer its leadership to the transferee.
63 // Request must be made to the leader.
64 MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error)
65}
66
67type maintenance struct {
68 dial func(endpoint string) (pb.MaintenanceClient, func(), error)
69 remote pb.MaintenanceClient
70 callOpts []grpc.CallOption
71}
72
73func NewMaintenance(c *Client) Maintenance {
74 api := &maintenance{
75 dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
76 conn, err := c.dial(endpoint)
77 if err != nil {
78 return nil, nil, err
79 }
80 cancel := func() { conn.Close() }
81 return RetryMaintenanceClient(c, conn), cancel, nil
82 },
83 remote: RetryMaintenanceClient(c, c.conn),
84 }
85 if c != nil {
86 api.callOpts = c.callOpts
87 }
88 return api
89}
90
91func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
92 api := &maintenance{
93 dial: func(string) (pb.MaintenanceClient, func(), error) {
94 return remote, func() {}, nil
95 },
96 remote: remote,
97 }
98 if c != nil {
99 api.callOpts = c.callOpts
100 }
101 return api
102}
103
104func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
105 req := &pb.AlarmRequest{
106 Action: pb.AlarmRequest_GET,
107 MemberID: 0, // all
108 Alarm: pb.AlarmType_NONE, // all
109 }
110 resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
111 if err == nil {
112 return (*AlarmResponse)(resp), nil
113 }
114 return nil, toErr(ctx, err)
115}
116
117func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
118 req := &pb.AlarmRequest{
119 Action: pb.AlarmRequest_DEACTIVATE,
120 MemberID: am.MemberID,
121 Alarm: am.Alarm,
122 }
123
124 if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
125 ar, err := m.AlarmList(ctx)
126 if err != nil {
127 return nil, toErr(ctx, err)
128 }
129 ret := AlarmResponse{}
130 for _, am := range ar.Alarms {
131 dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
132 if derr != nil {
133 return nil, toErr(ctx, derr)
134 }
135 ret.Alarms = append(ret.Alarms, dresp.Alarms...)
136 }
137 return &ret, nil
138 }
139
140 resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
141 if err == nil {
142 return (*AlarmResponse)(resp), nil
143 }
144 return nil, toErr(ctx, err)
145}
146
147func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
148 remote, cancel, err := m.dial(endpoint)
149 if err != nil {
150 return nil, toErr(ctx, err)
151 }
152 defer cancel()
153 resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
154 if err != nil {
155 return nil, toErr(ctx, err)
156 }
157 return (*DefragmentResponse)(resp), nil
158}
159
160func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
161 remote, cancel, err := m.dial(endpoint)
162 if err != nil {
163 return nil, toErr(ctx, err)
164 }
165 defer cancel()
166 resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
167 if err != nil {
168 return nil, toErr(ctx, err)
169 }
170 return (*StatusResponse)(resp), nil
171}
172
173func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
174 remote, cancel, err := m.dial(endpoint)
175 if err != nil {
176 return nil, toErr(ctx, err)
177 }
178 defer cancel()
179 resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...)
180 if err != nil {
181 return nil, toErr(ctx, err)
182 }
183 return (*HashKVResponse)(resp), nil
184}
185
186func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
187 ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, m.callOpts...)
188 if err != nil {
189 return nil, toErr(ctx, err)
190 }
191
192 pr, pw := io.Pipe()
193 go func() {
194 for {
195 resp, err := ss.Recv()
196 if err != nil {
197 pw.CloseWithError(err)
198 return
199 }
200 if resp == nil && err == nil {
201 break
202 }
203 if _, werr := pw.Write(resp.Blob); werr != nil {
204 pw.CloseWithError(werr)
205 return
206 }
207 }
208 pw.Close()
209 }()
210 return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
211}
212
213type snapshotReadCloser struct {
214 ctx context.Context
215 io.ReadCloser
216}
217
218func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) {
219 n, err = rc.ReadCloser.Read(p)
220 return n, toErr(rc.ctx, err)
221}
222
223func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
224 resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...)
225 return (*MoveLeaderResponse)(resp), toErr(ctx, err)
226}