Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 1 | // Copyright 2016 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | // Based on github.com/grpc-ecosystem/go-grpc-middleware/retry, but modified to support the more |
| 16 | // fine grained error checking required by write-at-most-once retry semantics of etcd. |
| 17 | |
| 18 | package clientv3 |
| 19 | |
| 20 | import ( |
| 21 | "context" |
| 22 | "io" |
| 23 | "sync" |
| 24 | "time" |
| 25 | |
| 26 | "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" |
| 27 | "go.uber.org/zap" |
| 28 | "google.golang.org/grpc" |
| 29 | "google.golang.org/grpc/codes" |
| 30 | "google.golang.org/grpc/metadata" |
| 31 | ) |
| 32 | |
| 33 | // unaryClientInterceptor returns a new retrying unary client interceptor. |
| 34 | // |
| 35 | // The default configuration of the interceptor is to not retry *at all*. This behaviour can be |
| 36 | // changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions). |
| 37 | func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor { |
| 38 | intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) |
| 39 | return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { |
| 40 | grpcOpts, retryOpts := filterCallOptions(opts) |
| 41 | callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) |
| 42 | // short circuit for simplicity, and avoiding allocations. |
| 43 | if callOpts.max == 0 { |
| 44 | return invoker(ctx, method, req, reply, cc, grpcOpts...) |
| 45 | } |
| 46 | var lastErr error |
| 47 | for attempt := uint(0); attempt < callOpts.max; attempt++ { |
| 48 | if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil { |
| 49 | return err |
| 50 | } |
| 51 | logger.Info( |
| 52 | "retrying of unary invoker", |
| 53 | zap.String("target", cc.Target()), |
| 54 | zap.Uint("attempt", attempt), |
| 55 | ) |
| 56 | lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...) |
| 57 | if lastErr == nil { |
| 58 | return nil |
| 59 | } |
| 60 | logger.Warn( |
| 61 | "retrying of unary invoker failed", |
| 62 | zap.String("target", cc.Target()), |
| 63 | zap.Uint("attempt", attempt), |
| 64 | zap.Error(lastErr), |
| 65 | ) |
| 66 | if isContextError(lastErr) { |
| 67 | if ctx.Err() != nil { |
| 68 | // its the context deadline or cancellation. |
| 69 | return lastErr |
| 70 | } |
| 71 | // its the callCtx deadline or cancellation, in which case try again. |
| 72 | continue |
| 73 | } |
| 74 | if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken { |
| 75 | gterr := c.getToken(ctx) |
| 76 | if gterr != nil { |
| 77 | logger.Warn( |
| 78 | "retrying of unary invoker failed to fetch new auth token", |
| 79 | zap.String("target", cc.Target()), |
| 80 | zap.Error(gterr), |
| 81 | ) |
| 82 | return lastErr // return the original error for simplicity |
| 83 | } |
| 84 | continue |
| 85 | } |
| 86 | if !isSafeRetry(c.lg, lastErr, callOpts) { |
| 87 | return lastErr |
| 88 | } |
| 89 | } |
| 90 | return lastErr |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | // streamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls. |
| 95 | // |
| 96 | // The default configuration of the interceptor is to not retry *at all*. This behaviour can be |
| 97 | // changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions). |
| 98 | // |
| 99 | // Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs |
| 100 | // to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams, |
| 101 | // BidiStreams), the retry interceptor will fail the call. |
| 102 | func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor { |
| 103 | intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) |
| 104 | return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { |
| 105 | grpcOpts, retryOpts := filterCallOptions(opts) |
| 106 | callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) |
| 107 | // short circuit for simplicity, and avoiding allocations. |
| 108 | if callOpts.max == 0 { |
| 109 | return streamer(ctx, desc, cc, method, grpcOpts...) |
| 110 | } |
| 111 | if desc.ClientStreams { |
| 112 | return nil, grpc.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()") |
| 113 | } |
| 114 | newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...) |
| 115 | logger.Info("retry stream intercept", zap.Error(err)) |
| 116 | if err != nil { |
| 117 | // TODO(mwitkow): Maybe dial and transport errors should be retriable? |
| 118 | return nil, err |
| 119 | } |
| 120 | retryingStreamer := &serverStreamingRetryingStream{ |
| 121 | client: c, |
| 122 | ClientStream: newStreamer, |
| 123 | callOpts: callOpts, |
| 124 | ctx: ctx, |
| 125 | streamerCall: func(ctx context.Context) (grpc.ClientStream, error) { |
| 126 | return streamer(ctx, desc, cc, method, grpcOpts...) |
| 127 | }, |
| 128 | } |
| 129 | return retryingStreamer, nil |
| 130 | } |
| 131 | } |
| 132 | |
| 133 | // type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a |
| 134 | // proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish |
| 135 | // a new ClientStream according to the retry policy. |
| 136 | type serverStreamingRetryingStream struct { |
| 137 | grpc.ClientStream |
| 138 | client *Client |
| 139 | bufferedSends []interface{} // single message that the client can sen |
| 140 | receivedGood bool // indicates whether any prior receives were successful |
| 141 | wasClosedSend bool // indicates that CloseSend was closed |
| 142 | ctx context.Context |
| 143 | callOpts *options |
| 144 | streamerCall func(ctx context.Context) (grpc.ClientStream, error) |
| 145 | mu sync.RWMutex |
| 146 | } |
| 147 | |
| 148 | func (s *serverStreamingRetryingStream) setStream(clientStream grpc.ClientStream) { |
| 149 | s.mu.Lock() |
| 150 | s.ClientStream = clientStream |
| 151 | s.mu.Unlock() |
| 152 | } |
| 153 | |
| 154 | func (s *serverStreamingRetryingStream) getStream() grpc.ClientStream { |
| 155 | s.mu.RLock() |
| 156 | defer s.mu.RUnlock() |
| 157 | return s.ClientStream |
| 158 | } |
| 159 | |
| 160 | func (s *serverStreamingRetryingStream) SendMsg(m interface{}) error { |
| 161 | s.mu.Lock() |
| 162 | s.bufferedSends = append(s.bufferedSends, m) |
| 163 | s.mu.Unlock() |
| 164 | return s.getStream().SendMsg(m) |
| 165 | } |
| 166 | |
| 167 | func (s *serverStreamingRetryingStream) CloseSend() error { |
| 168 | s.mu.Lock() |
| 169 | s.wasClosedSend = true |
| 170 | s.mu.Unlock() |
| 171 | return s.getStream().CloseSend() |
| 172 | } |
| 173 | |
| 174 | func (s *serverStreamingRetryingStream) Header() (metadata.MD, error) { |
| 175 | return s.getStream().Header() |
| 176 | } |
| 177 | |
| 178 | func (s *serverStreamingRetryingStream) Trailer() metadata.MD { |
| 179 | return s.getStream().Trailer() |
| 180 | } |
| 181 | |
| 182 | func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error { |
| 183 | attemptRetry, lastErr := s.receiveMsgAndIndicateRetry(m) |
| 184 | if !attemptRetry { |
| 185 | return lastErr // success or hard failure |
| 186 | } |
| 187 | // We start off from attempt 1, because zeroth was already made on normal SendMsg(). |
| 188 | for attempt := uint(1); attempt < s.callOpts.max; attempt++ { |
| 189 | if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil { |
| 190 | return err |
| 191 | } |
| 192 | newStream, err := s.reestablishStreamAndResendBuffer(s.ctx) |
| 193 | if err != nil { |
| 194 | // TODO(mwitkow): Maybe dial and transport errors should be retriable? |
| 195 | return err |
| 196 | } |
| 197 | s.setStream(newStream) |
| 198 | attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m) |
| 199 | //fmt.Printf("Received message and indicate: %v %v\n", attemptRetry, lastErr) |
| 200 | if !attemptRetry { |
| 201 | return lastErr |
| 202 | } |
| 203 | } |
| 204 | return lastErr |
| 205 | } |
| 206 | |
| 207 | func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}) (bool, error) { |
| 208 | s.mu.RLock() |
| 209 | wasGood := s.receivedGood |
| 210 | s.mu.RUnlock() |
| 211 | err := s.getStream().RecvMsg(m) |
| 212 | if err == nil || err == io.EOF { |
| 213 | s.mu.Lock() |
| 214 | s.receivedGood = true |
| 215 | s.mu.Unlock() |
| 216 | return false, err |
| 217 | } else if wasGood { |
| 218 | // previous RecvMsg in the stream succeeded, no retry logic should interfere |
| 219 | return false, err |
| 220 | } |
| 221 | if isContextError(err) { |
| 222 | if s.ctx.Err() != nil { |
| 223 | return false, err |
| 224 | } |
| 225 | // its the callCtx deadline or cancellation, in which case try again. |
| 226 | return true, err |
| 227 | } |
| 228 | if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { |
| 229 | gterr := s.client.getToken(s.ctx) |
| 230 | if gterr != nil { |
| 231 | s.client.lg.Info("retry failed to fetch new auth token", zap.Error(gterr)) |
| 232 | return false, err // return the original error for simplicity |
| 233 | } |
| 234 | return true, err |
| 235 | |
| 236 | } |
| 237 | return isSafeRetry(s.client.lg, err, s.callOpts), err |
| 238 | } |
| 239 | |
| 240 | func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) { |
| 241 | s.mu.RLock() |
| 242 | bufferedSends := s.bufferedSends |
| 243 | s.mu.RUnlock() |
| 244 | newStream, err := s.streamerCall(callCtx) |
| 245 | if err != nil { |
| 246 | return nil, err |
| 247 | } |
| 248 | for _, msg := range bufferedSends { |
| 249 | if err := newStream.SendMsg(msg); err != nil { |
| 250 | return nil, err |
| 251 | } |
| 252 | } |
| 253 | if err := newStream.CloseSend(); err != nil { |
| 254 | return nil, err |
| 255 | } |
| 256 | return newStream, nil |
| 257 | } |
| 258 | |
| 259 | func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) error { |
| 260 | waitTime := time.Duration(0) |
| 261 | if attempt > 0 { |
| 262 | waitTime = callOpts.backoffFunc(attempt) |
| 263 | } |
| 264 | if waitTime > 0 { |
| 265 | timer := time.NewTimer(waitTime) |
| 266 | select { |
| 267 | case <-ctx.Done(): |
| 268 | timer.Stop() |
| 269 | return contextErrToGrpcErr(ctx.Err()) |
| 270 | case <-timer.C: |
| 271 | } |
| 272 | } |
| 273 | return nil |
| 274 | } |
| 275 | |
| 276 | // isSafeRetry returns "true", if request is safe for retry with the given error. |
| 277 | func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool { |
| 278 | if isContextError(err) { |
| 279 | return false |
| 280 | } |
| 281 | switch callOpts.retryPolicy { |
| 282 | case repeatable: |
| 283 | return isSafeRetryImmutableRPC(err) |
| 284 | case nonRepeatable: |
| 285 | return isSafeRetryMutableRPC(err) |
| 286 | default: |
| 287 | lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String())) |
| 288 | return false |
| 289 | } |
| 290 | } |
| 291 | |
| 292 | func isContextError(err error) bool { |
| 293 | return grpc.Code(err) == codes.DeadlineExceeded || grpc.Code(err) == codes.Canceled |
| 294 | } |
| 295 | |
| 296 | func contextErrToGrpcErr(err error) error { |
| 297 | switch err { |
| 298 | case context.DeadlineExceeded: |
| 299 | return grpc.Errorf(codes.DeadlineExceeded, err.Error()) |
| 300 | case context.Canceled: |
| 301 | return grpc.Errorf(codes.Canceled, err.Error()) |
| 302 | default: |
| 303 | return grpc.Errorf(codes.Unknown, err.Error()) |
| 304 | } |
| 305 | } |
| 306 | |
| 307 | var ( |
| 308 | defaultOptions = &options{ |
| 309 | retryPolicy: nonRepeatable, |
| 310 | max: 0, // disable |
| 311 | backoffFunc: backoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10), |
| 312 | retryAuth: true, |
| 313 | } |
| 314 | ) |
| 315 | |
| 316 | // backoffFunc denotes a family of functions that control the backoff duration between call retries. |
| 317 | // |
| 318 | // They are called with an identifier of the attempt, and should return a time the system client should |
| 319 | // hold off for. If the time returned is longer than the `context.Context.Deadline` of the request |
| 320 | // the deadline of the request takes precedence and the wait will be interrupted before proceeding |
| 321 | // with the next iteration. |
| 322 | type backoffFunc func(attempt uint) time.Duration |
| 323 | |
| 324 | // withRetryPolicy sets the retry policy of this call. |
| 325 | func withRetryPolicy(rp retryPolicy) retryOption { |
| 326 | return retryOption{applyFunc: func(o *options) { |
| 327 | o.retryPolicy = rp |
| 328 | }} |
| 329 | } |
| 330 | |
| 331 | // withAuthRetry sets enables authentication retries. |
| 332 | func withAuthRetry(retryAuth bool) retryOption { |
| 333 | return retryOption{applyFunc: func(o *options) { |
| 334 | o.retryAuth = retryAuth |
| 335 | }} |
| 336 | } |
| 337 | |
| 338 | // withMax sets the maximum number of retries on this call, or this interceptor. |
| 339 | func withMax(maxRetries uint) retryOption { |
| 340 | return retryOption{applyFunc: func(o *options) { |
| 341 | o.max = maxRetries |
| 342 | }} |
| 343 | } |
| 344 | |
| 345 | // WithBackoff sets the `BackoffFunc `used to control time between retries. |
| 346 | func withBackoff(bf backoffFunc) retryOption { |
| 347 | return retryOption{applyFunc: func(o *options) { |
| 348 | o.backoffFunc = bf |
| 349 | }} |
| 350 | } |
| 351 | |
| 352 | type options struct { |
| 353 | retryPolicy retryPolicy |
| 354 | max uint |
| 355 | backoffFunc backoffFunc |
| 356 | retryAuth bool |
| 357 | } |
| 358 | |
| 359 | // retryOption is a grpc.CallOption that is local to clientv3's retry interceptor. |
| 360 | type retryOption struct { |
| 361 | grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic. |
| 362 | applyFunc func(opt *options) |
| 363 | } |
| 364 | |
| 365 | func reuseOrNewWithCallOptions(opt *options, retryOptions []retryOption) *options { |
| 366 | if len(retryOptions) == 0 { |
| 367 | return opt |
| 368 | } |
| 369 | optCopy := &options{} |
| 370 | *optCopy = *opt |
| 371 | for _, f := range retryOptions { |
| 372 | f.applyFunc(optCopy) |
| 373 | } |
| 374 | return optCopy |
| 375 | } |
| 376 | |
| 377 | func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []retryOption) { |
| 378 | for _, opt := range callOptions { |
| 379 | if co, ok := opt.(retryOption); ok { |
| 380 | retryOptions = append(retryOptions, co) |
| 381 | } else { |
| 382 | grpcOptions = append(grpcOptions, opt) |
| 383 | } |
| 384 | } |
| 385 | return grpcOptions, retryOptions |
| 386 | } |
| 387 | |
| 388 | // BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment). |
| 389 | // |
| 390 | // For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms. |
| 391 | func backoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) backoffFunc { |
| 392 | return func(attempt uint) time.Duration { |
| 393 | return jitterUp(waitBetween, jitterFraction) |
| 394 | } |
| 395 | } |