blob: f3c50570677dc1b49f9666d20071dc11f9a0cab8 [file] [log] [blame]
Stephane Barbarie260a5632019-02-26 16:12:49 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Based on github.com/grpc-ecosystem/go-grpc-middleware/retry, but modified to support the more
16// fine grained error checking required by write-at-most-once retry semantics of etcd.
17
18package clientv3
19
20import (
21 "context"
22 "io"
23 "sync"
24 "time"
25
khenaidood948f772021-08-11 17:49:24 -040026 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
Stephane Barbarie260a5632019-02-26 16:12:49 -050027 "go.uber.org/zap"
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/metadata"
Scott Baker8461e152019-10-01 14:44:30 -070031 "google.golang.org/grpc/status"
Stephane Barbarie260a5632019-02-26 16:12:49 -050032)
33
34// unaryClientInterceptor returns a new retrying unary client interceptor.
35//
36// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
37// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
38func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor {
39 intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
40 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
khenaidood948f772021-08-11 17:49:24 -040041 ctx = withVersion(ctx)
Stephane Barbarie260a5632019-02-26 16:12:49 -050042 grpcOpts, retryOpts := filterCallOptions(opts)
43 callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
44 // short circuit for simplicity, and avoiding allocations.
45 if callOpts.max == 0 {
46 return invoker(ctx, method, req, reply, cc, grpcOpts...)
47 }
48 var lastErr error
49 for attempt := uint(0); attempt < callOpts.max; attempt++ {
50 if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
51 return err
52 }
William Kurkiandaa6bb22019-03-07 12:26:28 -050053 logger.Debug(
Stephane Barbarie260a5632019-02-26 16:12:49 -050054 "retrying of unary invoker",
55 zap.String("target", cc.Target()),
56 zap.Uint("attempt", attempt),
57 )
58 lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...)
59 if lastErr == nil {
60 return nil
61 }
62 logger.Warn(
63 "retrying of unary invoker failed",
64 zap.String("target", cc.Target()),
65 zap.Uint("attempt", attempt),
66 zap.Error(lastErr),
67 )
68 if isContextError(lastErr) {
69 if ctx.Err() != nil {
70 // its the context deadline or cancellation.
71 return lastErr
72 }
73 // its the callCtx deadline or cancellation, in which case try again.
74 continue
75 }
76 if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
77 gterr := c.getToken(ctx)
78 if gterr != nil {
79 logger.Warn(
80 "retrying of unary invoker failed to fetch new auth token",
81 zap.String("target", cc.Target()),
82 zap.Error(gterr),
83 )
Scott Baker8461e152019-10-01 14:44:30 -070084 return gterr // lastErr must be invalid auth token
Stephane Barbarie260a5632019-02-26 16:12:49 -050085 }
86 continue
87 }
88 if !isSafeRetry(c.lg, lastErr, callOpts) {
89 return lastErr
90 }
91 }
92 return lastErr
93 }
94}
95
96// streamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls.
97//
98// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
99// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
100//
101// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs
102// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams,
103// BidiStreams), the retry interceptor will fail the call.
104func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor {
105 intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
106 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
khenaidood948f772021-08-11 17:49:24 -0400107 ctx = withVersion(ctx)
Stephane Barbarie260a5632019-02-26 16:12:49 -0500108 grpcOpts, retryOpts := filterCallOptions(opts)
109 callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
110 // short circuit for simplicity, and avoiding allocations.
111 if callOpts.max == 0 {
112 return streamer(ctx, desc, cc, method, grpcOpts...)
113 }
114 if desc.ClientStreams {
Scott Baker8461e152019-10-01 14:44:30 -0700115 return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()")
Stephane Barbarie260a5632019-02-26 16:12:49 -0500116 }
117 newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
Stephane Barbarie260a5632019-02-26 16:12:49 -0500118 if err != nil {
khenaidood948f772021-08-11 17:49:24 -0400119 logger.Error("streamer failed to create ClientStream", zap.Error(err))
120 return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
Stephane Barbarie260a5632019-02-26 16:12:49 -0500121 }
122 retryingStreamer := &serverStreamingRetryingStream{
123 client: c,
124 ClientStream: newStreamer,
125 callOpts: callOpts,
126 ctx: ctx,
127 streamerCall: func(ctx context.Context) (grpc.ClientStream, error) {
128 return streamer(ctx, desc, cc, method, grpcOpts...)
129 },
130 }
131 return retryingStreamer, nil
132 }
133}
134
135// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
136// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
137// a new ClientStream according to the retry policy.
138type serverStreamingRetryingStream struct {
139 grpc.ClientStream
140 client *Client
141 bufferedSends []interface{} // single message that the client can sen
142 receivedGood bool // indicates whether any prior receives were successful
143 wasClosedSend bool // indicates that CloseSend was closed
144 ctx context.Context
145 callOpts *options
146 streamerCall func(ctx context.Context) (grpc.ClientStream, error)
147 mu sync.RWMutex
148}
149
150func (s *serverStreamingRetryingStream) setStream(clientStream grpc.ClientStream) {
151 s.mu.Lock()
152 s.ClientStream = clientStream
153 s.mu.Unlock()
154}
155
156func (s *serverStreamingRetryingStream) getStream() grpc.ClientStream {
157 s.mu.RLock()
158 defer s.mu.RUnlock()
159 return s.ClientStream
160}
161
162func (s *serverStreamingRetryingStream) SendMsg(m interface{}) error {
163 s.mu.Lock()
164 s.bufferedSends = append(s.bufferedSends, m)
165 s.mu.Unlock()
166 return s.getStream().SendMsg(m)
167}
168
169func (s *serverStreamingRetryingStream) CloseSend() error {
170 s.mu.Lock()
171 s.wasClosedSend = true
172 s.mu.Unlock()
173 return s.getStream().CloseSend()
174}
175
176func (s *serverStreamingRetryingStream) Header() (metadata.MD, error) {
177 return s.getStream().Header()
178}
179
180func (s *serverStreamingRetryingStream) Trailer() metadata.MD {
181 return s.getStream().Trailer()
182}
183
184func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
185 attemptRetry, lastErr := s.receiveMsgAndIndicateRetry(m)
186 if !attemptRetry {
187 return lastErr // success or hard failure
188 }
khenaidood948f772021-08-11 17:49:24 -0400189
Stephane Barbarie260a5632019-02-26 16:12:49 -0500190 // We start off from attempt 1, because zeroth was already made on normal SendMsg().
191 for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
192 if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil {
193 return err
194 }
195 newStream, err := s.reestablishStreamAndResendBuffer(s.ctx)
196 if err != nil {
khenaidood948f772021-08-11 17:49:24 -0400197 s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err))
198 return err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
Stephane Barbarie260a5632019-02-26 16:12:49 -0500199 }
200 s.setStream(newStream)
khenaidood948f772021-08-11 17:49:24 -0400201
202 s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr))
Stephane Barbarie260a5632019-02-26 16:12:49 -0500203 attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
Stephane Barbarie260a5632019-02-26 16:12:49 -0500204 if !attemptRetry {
205 return lastErr
206 }
207 }
208 return lastErr
209}
210
211func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}) (bool, error) {
212 s.mu.RLock()
213 wasGood := s.receivedGood
214 s.mu.RUnlock()
215 err := s.getStream().RecvMsg(m)
216 if err == nil || err == io.EOF {
217 s.mu.Lock()
218 s.receivedGood = true
219 s.mu.Unlock()
220 return false, err
221 } else if wasGood {
222 // previous RecvMsg in the stream succeeded, no retry logic should interfere
223 return false, err
224 }
225 if isContextError(err) {
226 if s.ctx.Err() != nil {
227 return false, err
228 }
229 // its the callCtx deadline or cancellation, in which case try again.
230 return true, err
231 }
232 if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
233 gterr := s.client.getToken(s.ctx)
234 if gterr != nil {
William Kurkiandaa6bb22019-03-07 12:26:28 -0500235 s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr))
Stephane Barbarie260a5632019-02-26 16:12:49 -0500236 return false, err // return the original error for simplicity
237 }
238 return true, err
239
240 }
241 return isSafeRetry(s.client.lg, err, s.callOpts), err
242}
243
244func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) {
245 s.mu.RLock()
246 bufferedSends := s.bufferedSends
247 s.mu.RUnlock()
248 newStream, err := s.streamerCall(callCtx)
249 if err != nil {
250 return nil, err
251 }
252 for _, msg := range bufferedSends {
253 if err := newStream.SendMsg(msg); err != nil {
254 return nil, err
255 }
256 }
257 if err := newStream.CloseSend(); err != nil {
258 return nil, err
259 }
260 return newStream, nil
261}
262
263func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) error {
264 waitTime := time.Duration(0)
265 if attempt > 0 {
266 waitTime = callOpts.backoffFunc(attempt)
267 }
268 if waitTime > 0 {
269 timer := time.NewTimer(waitTime)
270 select {
271 case <-ctx.Done():
272 timer.Stop()
273 return contextErrToGrpcErr(ctx.Err())
274 case <-timer.C:
275 }
276 }
277 return nil
278}
279
280// isSafeRetry returns "true", if request is safe for retry with the given error.
281func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool {
282 if isContextError(err) {
283 return false
284 }
285 switch callOpts.retryPolicy {
286 case repeatable:
287 return isSafeRetryImmutableRPC(err)
288 case nonRepeatable:
289 return isSafeRetryMutableRPC(err)
290 default:
291 lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
292 return false
293 }
294}
295
296func isContextError(err error) bool {
297 return grpc.Code(err) == codes.DeadlineExceeded || grpc.Code(err) == codes.Canceled
298}
299
300func contextErrToGrpcErr(err error) error {
301 switch err {
302 case context.DeadlineExceeded:
Scott Baker8461e152019-10-01 14:44:30 -0700303 return status.Errorf(codes.DeadlineExceeded, err.Error())
Stephane Barbarie260a5632019-02-26 16:12:49 -0500304 case context.Canceled:
Scott Baker8461e152019-10-01 14:44:30 -0700305 return status.Errorf(codes.Canceled, err.Error())
Stephane Barbarie260a5632019-02-26 16:12:49 -0500306 default:
Scott Baker8461e152019-10-01 14:44:30 -0700307 return status.Errorf(codes.Unknown, err.Error())
Stephane Barbarie260a5632019-02-26 16:12:49 -0500308 }
309}
310
311var (
312 defaultOptions = &options{
313 retryPolicy: nonRepeatable,
314 max: 0, // disable
315 backoffFunc: backoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10),
316 retryAuth: true,
317 }
318)
319
320// backoffFunc denotes a family of functions that control the backoff duration between call retries.
321//
322// They are called with an identifier of the attempt, and should return a time the system client should
323// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
324// the deadline of the request takes precedence and the wait will be interrupted before proceeding
325// with the next iteration.
326type backoffFunc func(attempt uint) time.Duration
327
328// withRetryPolicy sets the retry policy of this call.
329func withRetryPolicy(rp retryPolicy) retryOption {
330 return retryOption{applyFunc: func(o *options) {
331 o.retryPolicy = rp
332 }}
333}
334
Stephane Barbarie260a5632019-02-26 16:12:49 -0500335// withMax sets the maximum number of retries on this call, or this interceptor.
336func withMax(maxRetries uint) retryOption {
337 return retryOption{applyFunc: func(o *options) {
338 o.max = maxRetries
339 }}
340}
341
342// WithBackoff sets the `BackoffFunc `used to control time between retries.
343func withBackoff(bf backoffFunc) retryOption {
344 return retryOption{applyFunc: func(o *options) {
345 o.backoffFunc = bf
346 }}
347}
348
349type options struct {
350 retryPolicy retryPolicy
351 max uint
352 backoffFunc backoffFunc
353 retryAuth bool
354}
355
356// retryOption is a grpc.CallOption that is local to clientv3's retry interceptor.
357type retryOption struct {
358 grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
359 applyFunc func(opt *options)
360}
361
362func reuseOrNewWithCallOptions(opt *options, retryOptions []retryOption) *options {
363 if len(retryOptions) == 0 {
364 return opt
365 }
366 optCopy := &options{}
367 *optCopy = *opt
368 for _, f := range retryOptions {
369 f.applyFunc(optCopy)
370 }
371 return optCopy
372}
373
374func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []retryOption) {
375 for _, opt := range callOptions {
376 if co, ok := opt.(retryOption); ok {
377 retryOptions = append(retryOptions, co)
378 } else {
379 grpcOptions = append(grpcOptions, opt)
380 }
381 }
382 return grpcOptions, retryOptions
383}
384
385// BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment).
386//
387// For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms.
388func backoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) backoffFunc {
389 return func(attempt uint) time.Duration {
390 return jitterUp(waitBetween, jitterFraction)
391 }
392}