Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 1 | // Copyright 2016 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | // Based on github.com/grpc-ecosystem/go-grpc-middleware/retry, but modified to support the more |
| 16 | // fine grained error checking required by write-at-most-once retry semantics of etcd. |
| 17 | |
| 18 | package clientv3 |
| 19 | |
| 20 | import ( |
| 21 | "context" |
| 22 | "io" |
| 23 | "sync" |
| 24 | "time" |
| 25 | |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 26 | "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 27 | "go.uber.org/zap" |
| 28 | "google.golang.org/grpc" |
| 29 | "google.golang.org/grpc/codes" |
| 30 | "google.golang.org/grpc/metadata" |
Scott Baker | 8461e15 | 2019-10-01 14:44:30 -0700 | [diff] [blame] | 31 | "google.golang.org/grpc/status" |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 32 | ) |
| 33 | |
| 34 | // unaryClientInterceptor returns a new retrying unary client interceptor. |
| 35 | // |
| 36 | // The default configuration of the interceptor is to not retry *at all*. This behaviour can be |
| 37 | // changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions). |
| 38 | func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor { |
| 39 | intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) |
| 40 | return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 41 | ctx = withVersion(ctx) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 42 | grpcOpts, retryOpts := filterCallOptions(opts) |
| 43 | callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) |
| 44 | // short circuit for simplicity, and avoiding allocations. |
| 45 | if callOpts.max == 0 { |
| 46 | return invoker(ctx, method, req, reply, cc, grpcOpts...) |
| 47 | } |
| 48 | var lastErr error |
| 49 | for attempt := uint(0); attempt < callOpts.max; attempt++ { |
| 50 | if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil { |
| 51 | return err |
| 52 | } |
William Kurkian | daa6bb2 | 2019-03-07 12:26:28 -0500 | [diff] [blame] | 53 | logger.Debug( |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 54 | "retrying of unary invoker", |
| 55 | zap.String("target", cc.Target()), |
| 56 | zap.Uint("attempt", attempt), |
| 57 | ) |
| 58 | lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...) |
| 59 | if lastErr == nil { |
| 60 | return nil |
| 61 | } |
| 62 | logger.Warn( |
| 63 | "retrying of unary invoker failed", |
| 64 | zap.String("target", cc.Target()), |
| 65 | zap.Uint("attempt", attempt), |
| 66 | zap.Error(lastErr), |
| 67 | ) |
| 68 | if isContextError(lastErr) { |
| 69 | if ctx.Err() != nil { |
| 70 | // its the context deadline or cancellation. |
| 71 | return lastErr |
| 72 | } |
| 73 | // its the callCtx deadline or cancellation, in which case try again. |
| 74 | continue |
| 75 | } |
| 76 | if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken { |
| 77 | gterr := c.getToken(ctx) |
| 78 | if gterr != nil { |
| 79 | logger.Warn( |
| 80 | "retrying of unary invoker failed to fetch new auth token", |
| 81 | zap.String("target", cc.Target()), |
| 82 | zap.Error(gterr), |
| 83 | ) |
Scott Baker | 8461e15 | 2019-10-01 14:44:30 -0700 | [diff] [blame] | 84 | return gterr // lastErr must be invalid auth token |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 85 | } |
| 86 | continue |
| 87 | } |
| 88 | if !isSafeRetry(c.lg, lastErr, callOpts) { |
| 89 | return lastErr |
| 90 | } |
| 91 | } |
| 92 | return lastErr |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | // streamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls. |
| 97 | // |
| 98 | // The default configuration of the interceptor is to not retry *at all*. This behaviour can be |
| 99 | // changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions). |
| 100 | // |
| 101 | // Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs |
| 102 | // to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams, |
| 103 | // BidiStreams), the retry interceptor will fail the call. |
| 104 | func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor { |
| 105 | intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) |
| 106 | return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 107 | ctx = withVersion(ctx) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 108 | grpcOpts, retryOpts := filterCallOptions(opts) |
| 109 | callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) |
| 110 | // short circuit for simplicity, and avoiding allocations. |
| 111 | if callOpts.max == 0 { |
| 112 | return streamer(ctx, desc, cc, method, grpcOpts...) |
| 113 | } |
| 114 | if desc.ClientStreams { |
Scott Baker | 8461e15 | 2019-10-01 14:44:30 -0700 | [diff] [blame] | 115 | return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()") |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 116 | } |
| 117 | newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 118 | if err != nil { |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 119 | logger.Error("streamer failed to create ClientStream", zap.Error(err)) |
| 120 | return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable? |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 121 | } |
| 122 | retryingStreamer := &serverStreamingRetryingStream{ |
| 123 | client: c, |
| 124 | ClientStream: newStreamer, |
| 125 | callOpts: callOpts, |
| 126 | ctx: ctx, |
| 127 | streamerCall: func(ctx context.Context) (grpc.ClientStream, error) { |
| 128 | return streamer(ctx, desc, cc, method, grpcOpts...) |
| 129 | }, |
| 130 | } |
| 131 | return retryingStreamer, nil |
| 132 | } |
| 133 | } |
| 134 | |
| 135 | // type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a |
| 136 | // proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish |
| 137 | // a new ClientStream according to the retry policy. |
| 138 | type serverStreamingRetryingStream struct { |
| 139 | grpc.ClientStream |
| 140 | client *Client |
| 141 | bufferedSends []interface{} // single message that the client can sen |
| 142 | receivedGood bool // indicates whether any prior receives were successful |
| 143 | wasClosedSend bool // indicates that CloseSend was closed |
| 144 | ctx context.Context |
| 145 | callOpts *options |
| 146 | streamerCall func(ctx context.Context) (grpc.ClientStream, error) |
| 147 | mu sync.RWMutex |
| 148 | } |
| 149 | |
| 150 | func (s *serverStreamingRetryingStream) setStream(clientStream grpc.ClientStream) { |
| 151 | s.mu.Lock() |
| 152 | s.ClientStream = clientStream |
| 153 | s.mu.Unlock() |
| 154 | } |
| 155 | |
| 156 | func (s *serverStreamingRetryingStream) getStream() grpc.ClientStream { |
| 157 | s.mu.RLock() |
| 158 | defer s.mu.RUnlock() |
| 159 | return s.ClientStream |
| 160 | } |
| 161 | |
| 162 | func (s *serverStreamingRetryingStream) SendMsg(m interface{}) error { |
| 163 | s.mu.Lock() |
| 164 | s.bufferedSends = append(s.bufferedSends, m) |
| 165 | s.mu.Unlock() |
| 166 | return s.getStream().SendMsg(m) |
| 167 | } |
| 168 | |
| 169 | func (s *serverStreamingRetryingStream) CloseSend() error { |
| 170 | s.mu.Lock() |
| 171 | s.wasClosedSend = true |
| 172 | s.mu.Unlock() |
| 173 | return s.getStream().CloseSend() |
| 174 | } |
| 175 | |
| 176 | func (s *serverStreamingRetryingStream) Header() (metadata.MD, error) { |
| 177 | return s.getStream().Header() |
| 178 | } |
| 179 | |
| 180 | func (s *serverStreamingRetryingStream) Trailer() metadata.MD { |
| 181 | return s.getStream().Trailer() |
| 182 | } |
| 183 | |
| 184 | func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error { |
| 185 | attemptRetry, lastErr := s.receiveMsgAndIndicateRetry(m) |
| 186 | if !attemptRetry { |
| 187 | return lastErr // success or hard failure |
| 188 | } |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 189 | |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 190 | // We start off from attempt 1, because zeroth was already made on normal SendMsg(). |
| 191 | for attempt := uint(1); attempt < s.callOpts.max; attempt++ { |
| 192 | if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil { |
| 193 | return err |
| 194 | } |
| 195 | newStream, err := s.reestablishStreamAndResendBuffer(s.ctx) |
| 196 | if err != nil { |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 197 | s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err)) |
| 198 | return err // TODO(mwitkow): Maybe dial and transport errors should be retriable? |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 199 | } |
| 200 | s.setStream(newStream) |
khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 201 | |
| 202 | s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr)) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 203 | attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 204 | if !attemptRetry { |
| 205 | return lastErr |
| 206 | } |
| 207 | } |
| 208 | return lastErr |
| 209 | } |
| 210 | |
| 211 | func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}) (bool, error) { |
| 212 | s.mu.RLock() |
| 213 | wasGood := s.receivedGood |
| 214 | s.mu.RUnlock() |
| 215 | err := s.getStream().RecvMsg(m) |
| 216 | if err == nil || err == io.EOF { |
| 217 | s.mu.Lock() |
| 218 | s.receivedGood = true |
| 219 | s.mu.Unlock() |
| 220 | return false, err |
| 221 | } else if wasGood { |
| 222 | // previous RecvMsg in the stream succeeded, no retry logic should interfere |
| 223 | return false, err |
| 224 | } |
| 225 | if isContextError(err) { |
| 226 | if s.ctx.Err() != nil { |
| 227 | return false, err |
| 228 | } |
| 229 | // its the callCtx deadline or cancellation, in which case try again. |
| 230 | return true, err |
| 231 | } |
| 232 | if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { |
| 233 | gterr := s.client.getToken(s.ctx) |
| 234 | if gterr != nil { |
William Kurkian | daa6bb2 | 2019-03-07 12:26:28 -0500 | [diff] [blame] | 235 | s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr)) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 236 | return false, err // return the original error for simplicity |
| 237 | } |
| 238 | return true, err |
| 239 | |
| 240 | } |
| 241 | return isSafeRetry(s.client.lg, err, s.callOpts), err |
| 242 | } |
| 243 | |
| 244 | func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) { |
| 245 | s.mu.RLock() |
| 246 | bufferedSends := s.bufferedSends |
| 247 | s.mu.RUnlock() |
| 248 | newStream, err := s.streamerCall(callCtx) |
| 249 | if err != nil { |
| 250 | return nil, err |
| 251 | } |
| 252 | for _, msg := range bufferedSends { |
| 253 | if err := newStream.SendMsg(msg); err != nil { |
| 254 | return nil, err |
| 255 | } |
| 256 | } |
| 257 | if err := newStream.CloseSend(); err != nil { |
| 258 | return nil, err |
| 259 | } |
| 260 | return newStream, nil |
| 261 | } |
| 262 | |
| 263 | func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) error { |
| 264 | waitTime := time.Duration(0) |
| 265 | if attempt > 0 { |
| 266 | waitTime = callOpts.backoffFunc(attempt) |
| 267 | } |
| 268 | if waitTime > 0 { |
| 269 | timer := time.NewTimer(waitTime) |
| 270 | select { |
| 271 | case <-ctx.Done(): |
| 272 | timer.Stop() |
| 273 | return contextErrToGrpcErr(ctx.Err()) |
| 274 | case <-timer.C: |
| 275 | } |
| 276 | } |
| 277 | return nil |
| 278 | } |
| 279 | |
| 280 | // isSafeRetry returns "true", if request is safe for retry with the given error. |
| 281 | func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool { |
| 282 | if isContextError(err) { |
| 283 | return false |
| 284 | } |
| 285 | switch callOpts.retryPolicy { |
| 286 | case repeatable: |
| 287 | return isSafeRetryImmutableRPC(err) |
| 288 | case nonRepeatable: |
| 289 | return isSafeRetryMutableRPC(err) |
| 290 | default: |
| 291 | lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String())) |
| 292 | return false |
| 293 | } |
| 294 | } |
| 295 | |
| 296 | func isContextError(err error) bool { |
| 297 | return grpc.Code(err) == codes.DeadlineExceeded || grpc.Code(err) == codes.Canceled |
| 298 | } |
| 299 | |
| 300 | func contextErrToGrpcErr(err error) error { |
| 301 | switch err { |
| 302 | case context.DeadlineExceeded: |
Scott Baker | 8461e15 | 2019-10-01 14:44:30 -0700 | [diff] [blame] | 303 | return status.Errorf(codes.DeadlineExceeded, err.Error()) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 304 | case context.Canceled: |
Scott Baker | 8461e15 | 2019-10-01 14:44:30 -0700 | [diff] [blame] | 305 | return status.Errorf(codes.Canceled, err.Error()) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 306 | default: |
Scott Baker | 8461e15 | 2019-10-01 14:44:30 -0700 | [diff] [blame] | 307 | return status.Errorf(codes.Unknown, err.Error()) |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 308 | } |
| 309 | } |
| 310 | |
| 311 | var ( |
| 312 | defaultOptions = &options{ |
| 313 | retryPolicy: nonRepeatable, |
| 314 | max: 0, // disable |
| 315 | backoffFunc: backoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10), |
| 316 | retryAuth: true, |
| 317 | } |
| 318 | ) |
| 319 | |
| 320 | // backoffFunc denotes a family of functions that control the backoff duration between call retries. |
| 321 | // |
| 322 | // They are called with an identifier of the attempt, and should return a time the system client should |
| 323 | // hold off for. If the time returned is longer than the `context.Context.Deadline` of the request |
| 324 | // the deadline of the request takes precedence and the wait will be interrupted before proceeding |
| 325 | // with the next iteration. |
| 326 | type backoffFunc func(attempt uint) time.Duration |
| 327 | |
| 328 | // withRetryPolicy sets the retry policy of this call. |
| 329 | func withRetryPolicy(rp retryPolicy) retryOption { |
| 330 | return retryOption{applyFunc: func(o *options) { |
| 331 | o.retryPolicy = rp |
| 332 | }} |
| 333 | } |
| 334 | |
Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 335 | // withMax sets the maximum number of retries on this call, or this interceptor. |
| 336 | func withMax(maxRetries uint) retryOption { |
| 337 | return retryOption{applyFunc: func(o *options) { |
| 338 | o.max = maxRetries |
| 339 | }} |
| 340 | } |
| 341 | |
| 342 | // WithBackoff sets the `BackoffFunc `used to control time between retries. |
| 343 | func withBackoff(bf backoffFunc) retryOption { |
| 344 | return retryOption{applyFunc: func(o *options) { |
| 345 | o.backoffFunc = bf |
| 346 | }} |
| 347 | } |
| 348 | |
| 349 | type options struct { |
| 350 | retryPolicy retryPolicy |
| 351 | max uint |
| 352 | backoffFunc backoffFunc |
| 353 | retryAuth bool |
| 354 | } |
| 355 | |
| 356 | // retryOption is a grpc.CallOption that is local to clientv3's retry interceptor. |
| 357 | type retryOption struct { |
| 358 | grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic. |
| 359 | applyFunc func(opt *options) |
| 360 | } |
| 361 | |
| 362 | func reuseOrNewWithCallOptions(opt *options, retryOptions []retryOption) *options { |
| 363 | if len(retryOptions) == 0 { |
| 364 | return opt |
| 365 | } |
| 366 | optCopy := &options{} |
| 367 | *optCopy = *opt |
| 368 | for _, f := range retryOptions { |
| 369 | f.applyFunc(optCopy) |
| 370 | } |
| 371 | return optCopy |
| 372 | } |
| 373 | |
| 374 | func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []retryOption) { |
| 375 | for _, opt := range callOptions { |
| 376 | if co, ok := opt.(retryOption); ok { |
| 377 | retryOptions = append(retryOptions, co) |
| 378 | } else { |
| 379 | grpcOptions = append(grpcOptions, opt) |
| 380 | } |
| 381 | } |
| 382 | return grpcOptions, retryOptions |
| 383 | } |
| 384 | |
| 385 | // BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment). |
| 386 | // |
| 387 | // For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms. |
| 388 | func backoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) backoffFunc { |
| 389 | return func(attempt uint) time.Duration { |
| 390 | return jitterUp(waitBetween, jitterFraction) |
| 391 | } |
| 392 | } |