blob: 7f89ba641a445510f4581cb309741f3b7e99ee15 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19
20 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
21 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
22
23 "google.golang.org/grpc"
24 "google.golang.org/grpc/codes"
25 "google.golang.org/grpc/status"
26)
27
28type retryPolicy uint8
29
30const (
31 repeatable retryPolicy = iota
32 nonRepeatable
33)
34
35type rpcFunc func(ctx context.Context) error
36type retryRPCFunc func(context.Context, rpcFunc, retryPolicy) error
37type retryStopErrFunc func(error) bool
38
39// immutable requests (e.g. Get) should be retried unless it's
40// an obvious server-side error (e.g. rpctypes.ErrRequestTooLarge).
41//
42// "isRepeatableStopError" returns "true" when an immutable request
43// is interrupted by server-side or gRPC-side error and its status
44// code is not transient (!= codes.Unavailable).
45//
46// Returning "true" means retry should stop, since client cannot
47// handle itself even with retries.
48func isRepeatableStopError(err error) bool {
49 eErr := rpctypes.Error(err)
50 // always stop retry on etcd errors
51 if serverErr, ok := eErr.(rpctypes.EtcdError); ok && serverErr.Code() != codes.Unavailable {
52 return true
53 }
54 // only retry if unavailable
55 ev, _ := status.FromError(err)
56 return ev.Code() != codes.Unavailable
57}
58
59// mutable requests (e.g. Put, Delete, Txn) should only be retried
60// when the status code is codes.Unavailable when initial connection
61// has not been established (no pinned endpoint).
62//
63// "isNonRepeatableStopError" returns "true" when a mutable request
64// is interrupted by non-transient error that client cannot handle itself,
65// or transient error while the connection has already been established
66// (pinned endpoint exists).
67//
68// Returning "true" means retry should stop, otherwise it violates
69// write-at-most-once semantics.
70func isNonRepeatableStopError(err error) bool {
71 ev, _ := status.FromError(err)
72 if ev.Code() != codes.Unavailable {
73 return true
74 }
75 desc := rpctypes.ErrorDesc(err)
76 return desc != "there is no address available" && desc != "there is no connection available"
77}
78
79func (c *Client) newRetryWrapper() retryRPCFunc {
80 return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error {
81 var isStop retryStopErrFunc
82 switch rp {
83 case repeatable:
84 isStop = isRepeatableStopError
85 case nonRepeatable:
86 isStop = isNonRepeatableStopError
87 }
88 for {
89 if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil {
90 return err
91 }
92 pinned := c.balancer.pinned()
93 err := f(rpcCtx)
94 if err == nil {
95 return nil
96 }
97 logger.Lvl(4).Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned)
98
99 if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) {
100 // mark this before endpoint switch is triggered
101 c.balancer.hostPortError(pinned, err)
102 c.balancer.next()
103 logger.Lvl(4).Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error())
104 }
105
106 if isStop(err) {
107 return err
108 }
109 }
110 }
111}
112
113func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc {
114 return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error {
115 for {
116 pinned := c.balancer.pinned()
117 err := retryf(rpcCtx, f, rp)
118 if err == nil {
119 return nil
120 }
121 logger.Lvl(4).Infof("clientv3/auth-retry: error %q on pinned endpoint %q", err.Error(), pinned)
122 // always stop retry on etcd errors other than invalid auth token
123 if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
124 gterr := c.getToken(rpcCtx)
125 if gterr != nil {
126 logger.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q) on pinned endpoint %q", err.Error(), gterr.Error(), pinned)
127 return err // return the original error for simplicity
128 }
129 continue
130 }
131 return err
132 }
133 }
134}
135
136type retryKVClient struct {
137 kc pb.KVClient
138 retryf retryRPCFunc
139}
140
141// RetryKVClient implements a KVClient.
142func RetryKVClient(c *Client) pb.KVClient {
143 return &retryKVClient{
144 kc: pb.NewKVClient(c.conn),
145 retryf: c.newAuthRetryWrapper(c.newRetryWrapper()),
146 }
147}
148func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
149 err = rkv.retryf(ctx, func(rctx context.Context) error {
150 resp, err = rkv.kc.Range(rctx, in, opts...)
151 return err
152 }, repeatable)
153 return resp, err
154}
155
156func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
157 err = rkv.retryf(ctx, func(rctx context.Context) error {
158 resp, err = rkv.kc.Put(rctx, in, opts...)
159 return err
160 }, nonRepeatable)
161 return resp, err
162}
163
164func (rkv *retryKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) {
165 err = rkv.retryf(ctx, func(rctx context.Context) error {
166 resp, err = rkv.kc.DeleteRange(rctx, in, opts...)
167 return err
168 }, nonRepeatable)
169 return resp, err
170}
171
172func (rkv *retryKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) {
173 // TODO: "repeatable" for read-only txn
174 err = rkv.retryf(ctx, func(rctx context.Context) error {
175 resp, err = rkv.kc.Txn(rctx, in, opts...)
176 return err
177 }, nonRepeatable)
178 return resp, err
179}
180
181func (rkv *retryKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) {
182 err = rkv.retryf(ctx, func(rctx context.Context) error {
183 resp, err = rkv.kc.Compact(rctx, in, opts...)
184 return err
185 }, nonRepeatable)
186 return resp, err
187}
188
189type retryLeaseClient struct {
190 lc pb.LeaseClient
191 retryf retryRPCFunc
192}
193
194// RetryLeaseClient implements a LeaseClient.
195func RetryLeaseClient(c *Client) pb.LeaseClient {
196 return &retryLeaseClient{
197 lc: pb.NewLeaseClient(c.conn),
198 retryf: c.newAuthRetryWrapper(c.newRetryWrapper()),
199 }
200}
201
202func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) {
203 err = rlc.retryf(ctx, func(rctx context.Context) error {
204 resp, err = rlc.lc.LeaseTimeToLive(rctx, in, opts...)
205 return err
206 }, repeatable)
207 return resp, err
208}
209
210func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (resp *pb.LeaseLeasesResponse, err error) {
211 err = rlc.retryf(ctx, func(rctx context.Context) error {
212 resp, err = rlc.lc.LeaseLeases(rctx, in, opts...)
213 return err
214 }, repeatable)
215 return resp, err
216}
217
218func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
219 err = rlc.retryf(ctx, func(rctx context.Context) error {
220 resp, err = rlc.lc.LeaseGrant(rctx, in, opts...)
221 return err
222 }, repeatable)
223 return resp, err
224
225}
226
227func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) {
228 err = rlc.retryf(ctx, func(rctx context.Context) error {
229 resp, err = rlc.lc.LeaseRevoke(rctx, in, opts...)
230 return err
231 }, repeatable)
232 return resp, err
233}
234
235func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (stream pb.Lease_LeaseKeepAliveClient, err error) {
236 err = rlc.retryf(ctx, func(rctx context.Context) error {
237 stream, err = rlc.lc.LeaseKeepAlive(rctx, opts...)
238 return err
239 }, repeatable)
240 return stream, err
241}
242
243type retryClusterClient struct {
244 cc pb.ClusterClient
245 retryf retryRPCFunc
246}
247
248// RetryClusterClient implements a ClusterClient.
249func RetryClusterClient(c *Client) pb.ClusterClient {
250 return &retryClusterClient{
251 cc: pb.NewClusterClient(c.conn),
252 retryf: c.newRetryWrapper(),
253 }
254}
255
256func (rcc *retryClusterClient) MemberList(ctx context.Context, in *pb.MemberListRequest, opts ...grpc.CallOption) (resp *pb.MemberListResponse, err error) {
257 err = rcc.retryf(ctx, func(rctx context.Context) error {
258 resp, err = rcc.cc.MemberList(rctx, in, opts...)
259 return err
260 }, repeatable)
261 return resp, err
262}
263
264func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
265 err = rcc.retryf(ctx, func(rctx context.Context) error {
266 resp, err = rcc.cc.MemberAdd(rctx, in, opts...)
267 return err
268 }, nonRepeatable)
269 return resp, err
270}
271
272func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) {
273 err = rcc.retryf(ctx, func(rctx context.Context) error {
274 resp, err = rcc.cc.MemberRemove(rctx, in, opts...)
275 return err
276 }, nonRepeatable)
277 return resp, err
278}
279
280func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) {
281 err = rcc.retryf(ctx, func(rctx context.Context) error {
282 resp, err = rcc.cc.MemberUpdate(rctx, in, opts...)
283 return err
284 }, nonRepeatable)
285 return resp, err
286}
287
288type retryMaintenanceClient struct {
289 mc pb.MaintenanceClient
290 retryf retryRPCFunc
291}
292
293// RetryMaintenanceClient implements a Maintenance.
294func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient {
295 return &retryMaintenanceClient{
296 mc: pb.NewMaintenanceClient(conn),
297 retryf: c.newRetryWrapper(),
298 }
299}
300
301func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmRequest, opts ...grpc.CallOption) (resp *pb.AlarmResponse, err error) {
302 err = rmc.retryf(ctx, func(rctx context.Context) error {
303 resp, err = rmc.mc.Alarm(rctx, in, opts...)
304 return err
305 }, repeatable)
306 return resp, err
307}
308
309func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (resp *pb.StatusResponse, err error) {
310 err = rmc.retryf(ctx, func(rctx context.Context) error {
311 resp, err = rmc.mc.Status(rctx, in, opts...)
312 return err
313 }, repeatable)
314 return resp, err
315}
316
317func (rmc *retryMaintenanceClient) Hash(ctx context.Context, in *pb.HashRequest, opts ...grpc.CallOption) (resp *pb.HashResponse, err error) {
318 err = rmc.retryf(ctx, func(rctx context.Context) error {
319 resp, err = rmc.mc.Hash(rctx, in, opts...)
320 return err
321 }, repeatable)
322 return resp, err
323}
324
325func (rmc *retryMaintenanceClient) HashKV(ctx context.Context, in *pb.HashKVRequest, opts ...grpc.CallOption) (resp *pb.HashKVResponse, err error) {
326 err = rmc.retryf(ctx, func(rctx context.Context) error {
327 resp, err = rmc.mc.HashKV(rctx, in, opts...)
328 return err
329 }, repeatable)
330 return resp, err
331}
332
333func (rmc *retryMaintenanceClient) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (stream pb.Maintenance_SnapshotClient, err error) {
334 err = rmc.retryf(ctx, func(rctx context.Context) error {
335 stream, err = rmc.mc.Snapshot(rctx, in, opts...)
336 return err
337 }, repeatable)
338 return stream, err
339}
340
341func (rmc *retryMaintenanceClient) MoveLeader(ctx context.Context, in *pb.MoveLeaderRequest, opts ...grpc.CallOption) (resp *pb.MoveLeaderResponse, err error) {
342 err = rmc.retryf(ctx, func(rctx context.Context) error {
343 resp, err = rmc.mc.MoveLeader(rctx, in, opts...)
344 return err
345 }, repeatable)
346 return resp, err
347}
348
349func (rmc *retryMaintenanceClient) Defragment(ctx context.Context, in *pb.DefragmentRequest, opts ...grpc.CallOption) (resp *pb.DefragmentResponse, err error) {
350 err = rmc.retryf(ctx, func(rctx context.Context) error {
351 resp, err = rmc.mc.Defragment(rctx, in, opts...)
352 return err
353 }, nonRepeatable)
354 return resp, err
355}
356
357type retryAuthClient struct {
358 ac pb.AuthClient
359 retryf retryRPCFunc
360}
361
362// RetryAuthClient implements a AuthClient.
363func RetryAuthClient(c *Client) pb.AuthClient {
364 return &retryAuthClient{
365 ac: pb.NewAuthClient(c.conn),
366 retryf: c.newRetryWrapper(),
367 }
368}
369
370func (rac *retryAuthClient) UserList(ctx context.Context, in *pb.AuthUserListRequest, opts ...grpc.CallOption) (resp *pb.AuthUserListResponse, err error) {
371 err = rac.retryf(ctx, func(rctx context.Context) error {
372 resp, err = rac.ac.UserList(rctx, in, opts...)
373 return err
374 }, repeatable)
375 return resp, err
376}
377
378func (rac *retryAuthClient) UserGet(ctx context.Context, in *pb.AuthUserGetRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGetResponse, err error) {
379 err = rac.retryf(ctx, func(rctx context.Context) error {
380 resp, err = rac.ac.UserGet(rctx, in, opts...)
381 return err
382 }, repeatable)
383 return resp, err
384}
385
386func (rac *retryAuthClient) RoleGet(ctx context.Context, in *pb.AuthRoleGetRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGetResponse, err error) {
387 err = rac.retryf(ctx, func(rctx context.Context) error {
388 resp, err = rac.ac.RoleGet(rctx, in, opts...)
389 return err
390 }, repeatable)
391 return resp, err
392}
393
394func (rac *retryAuthClient) RoleList(ctx context.Context, in *pb.AuthRoleListRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleListResponse, err error) {
395 err = rac.retryf(ctx, func(rctx context.Context) error {
396 resp, err = rac.ac.RoleList(rctx, in, opts...)
397 return err
398 }, repeatable)
399 return resp, err
400}
401
402func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
403 err = rac.retryf(ctx, func(rctx context.Context) error {
404 resp, err = rac.ac.AuthEnable(rctx, in, opts...)
405 return err
406 }, nonRepeatable)
407 return resp, err
408}
409
410func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) {
411 err = rac.retryf(ctx, func(rctx context.Context) error {
412 resp, err = rac.ac.AuthDisable(rctx, in, opts...)
413 return err
414 }, nonRepeatable)
415 return resp, err
416}
417
418func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) {
419 err = rac.retryf(ctx, func(rctx context.Context) error {
420 resp, err = rac.ac.UserAdd(rctx, in, opts...)
421 return err
422 }, nonRepeatable)
423 return resp, err
424}
425
426func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) {
427 err = rac.retryf(ctx, func(rctx context.Context) error {
428 resp, err = rac.ac.UserDelete(rctx, in, opts...)
429 return err
430 }, nonRepeatable)
431 return resp, err
432}
433
434func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) {
435 err = rac.retryf(ctx, func(rctx context.Context) error {
436 resp, err = rac.ac.UserChangePassword(rctx, in, opts...)
437 return err
438 }, nonRepeatable)
439 return resp, err
440}
441
442func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) {
443 err = rac.retryf(ctx, func(rctx context.Context) error {
444 resp, err = rac.ac.UserGrantRole(rctx, in, opts...)
445 return err
446 }, nonRepeatable)
447 return resp, err
448}
449
450func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) {
451 err = rac.retryf(ctx, func(rctx context.Context) error {
452 resp, err = rac.ac.UserRevokeRole(rctx, in, opts...)
453 return err
454 }, nonRepeatable)
455 return resp, err
456}
457
458func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) {
459 err = rac.retryf(ctx, func(rctx context.Context) error {
460 resp, err = rac.ac.RoleAdd(rctx, in, opts...)
461 return err
462 }, nonRepeatable)
463 return resp, err
464}
465
466func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) {
467 err = rac.retryf(ctx, func(rctx context.Context) error {
468 resp, err = rac.ac.RoleDelete(rctx, in, opts...)
469 return err
470 }, nonRepeatable)
471 return resp, err
472}
473
474func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) {
475 err = rac.retryf(ctx, func(rctx context.Context) error {
476 resp, err = rac.ac.RoleGrantPermission(rctx, in, opts...)
477 return err
478 }, nonRepeatable)
479 return resp, err
480}
481
482func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) {
483 err = rac.retryf(ctx, func(rctx context.Context) error {
484 resp, err = rac.ac.RoleRevokePermission(rctx, in, opts...)
485 return err
486 }, nonRepeatable)
487 return resp, err
488}
489
490func (rac *retryAuthClient) Authenticate(ctx context.Context, in *pb.AuthenticateRequest, opts ...grpc.CallOption) (resp *pb.AuthenticateResponse, err error) {
491 err = rac.retryf(ctx, func(rctx context.Context) error {
492 resp, err = rac.ac.Authenticate(rctx, in, opts...)
493 return err
494 }, nonRepeatable)
495 return resp, err
496}