enable grpc retry

Change-Id: Id9f1cd18324e2788b2e6c4a9838ffff80ee49f0f
diff --git a/VERSION b/VERSION
index 54fe126..b52282a 100755
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.10.3.1-dev
+2.10.4
diff --git a/cmd/openonu-adapter/main.go b/cmd/openonu-adapter/main.go
index 3dc484b..8aa6c1c 100644
--- a/cmd/openonu-adapter/main.go
+++ b/cmd/openonu-adapter/main.go
@@ -28,6 +28,7 @@
 	"syscall"
 	"time"
 
+	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
 	conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
 	"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
 	"github.com/opencord/voltha-lib-go/v7/pkg/events"
@@ -42,6 +43,7 @@
 	"github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
 
 	"github.com/opencord/voltha-protos/v5/go/core_adapter"
 
@@ -136,8 +138,17 @@
 		a.coreRestarted); err != nil {
 		logger.Fatal(ctx, "grpc-client-not-created")
 	}
+	// the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
+	backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
+
+	retryCodes := []codes.Code{
+		codes.Unavailable,      // server is currently unavailable
+		codes.DeadlineExceeded, // deadline for the operation was exceeded
+	}
+	grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
+	logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
 	// Start the core grpc client
-	go a.coreClient.Start(ctx, getCoreServiceClientHandler)
+	go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
 
 	// Create the open ONU interface adapter
 	if a.onuAdapter, err = a.startONUAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
diff --git a/go.mod b/go.mod
index 1bdea8c..0995af6 100644
--- a/go.mod
+++ b/go.mod
@@ -14,6 +14,7 @@
 	github.com/gogo/protobuf v1.3.2
 	github.com/golang/protobuf v1.5.2
 	github.com/google/gopacket v1.1.17
+	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
 	github.com/looplab/fsm v0.2.0
 	github.com/opencord/omci-lib-go/v2 v2.2.3
 	github.com/opencord/voltha-lib-go/v7 v7.4.3
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index 14342a4..7f1e047 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -73,6 +73,8 @@
 	CoreEndpoint                string
 	RPCTimeout                  time.Duration
 	MaxConcurrentFlowsPerUni    int
+	PerRPCRetryTimeout          time.Duration
+	MaxRetries                  uint
 }
 
 // ParseCommandArguments parses the arguments when running read-write adaptercore service
@@ -281,7 +283,14 @@
 		"max_concurrent_flows_per_uni",
 		16,
 		"The max number of concurrent flows (add/remove) that can be queued per UNI")
-
+	fs.DurationVar(&(so.PerRPCRetryTimeout),
+		"per_rpc_retry_timeout",
+		0*time.Second,
+		"The default timeout per RPC retry")
+	fs.UintVar(&(so.MaxRetries),
+		"max_grpc_client_retry",
+		0,
+		"The maximum number of times olt adaptor will retry in case grpc request timeouts")
 	_ = fs.Parse(args)
 	containerName := getContainerInfo()
 	if len(containerName) > 0 {
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/backoff.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/backoff.go
new file mode 100644
index 0000000..ad35f09
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/backoff.go
@@ -0,0 +1,44 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_retry
+
+import (
+	"time"
+
+	"github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils"
+)
+
+// BackoffLinear is very simple: it waits for a fixed period of time between calls.
+func BackoffLinear(waitBetween time.Duration) BackoffFunc {
+	return func(attempt uint) time.Duration {
+		return waitBetween
+	}
+}
+
+// BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment).
+//
+// For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms.
+func BackoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) BackoffFunc {
+	return func(attempt uint) time.Duration {
+		return backoffutils.JitterUp(waitBetween, jitterFraction)
+	}
+}
+
+// BackoffExponential produces increasing intervals for each attempt.
+//
+// The scalar is multiplied times 2 raised to the current attempt. So the first
+// retry with a scalar of 100ms is 100ms, while the 5th attempt would be 1.6s.
+func BackoffExponential(scalar time.Duration) BackoffFunc {
+	return func(attempt uint) time.Duration {
+		return scalar * time.Duration(backoffutils.ExponentBase2(attempt))
+	}
+}
+
+// BackoffExponentialWithJitter creates an exponential backoff like
+// BackoffExponential does, but adds jitter.
+func BackoffExponentialWithJitter(scalar time.Duration, jitterFraction float64) BackoffFunc {
+	return func(attempt uint) time.Duration {
+		return backoffutils.JitterUp(scalar*time.Duration(backoffutils.ExponentBase2(attempt)), jitterFraction)
+	}
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/doc.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/doc.go
new file mode 100644
index 0000000..afd924a
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/doc.go
@@ -0,0 +1,25 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+/*
+`grpc_retry` provides client-side request retry logic for gRPC.
+
+Client-Side Request Retry Interceptor
+
+It allows for automatic retry, inside the generated gRPC code of requests based on the gRPC status
+of the reply. It supports unary (1:1), and server stream (1:n) requests.
+
+By default the interceptors *are disabled*, preventing accidental use of retries. You can easily
+override the number of retries (setting them to more than 0) with a `grpc.ClientOption`, e.g.:
+
+ myclient.Ping(ctx, goodPing, grpc_retry.WithMax(5))
+
+Other default options are: retry on `ResourceExhausted` and `Unavailable` gRPC codes, use a 50ms
+linear backoff with 10% jitter.
+
+For chained interceptors, the retry interceptor will call every interceptor that follows it
+whenever when a retry happens.
+
+Please see examples for more advanced use.
+*/
+package grpc_retry
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/options.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/options.go
new file mode 100644
index 0000000..7a633e2
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/options.go
@@ -0,0 +1,142 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_retry
+
+import (
+	"context"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+)
+
+var (
+	// DefaultRetriableCodes is a set of well known types gRPC codes that should be retri-able.
+	//
+	// `ResourceExhausted` means that the user quota, e.g. per-RPC limits, have been reached.
+	// `Unavailable` means that system is currently unavailable and the client should retry again.
+	DefaultRetriableCodes = []codes.Code{codes.ResourceExhausted, codes.Unavailable}
+
+	defaultOptions = &options{
+		max:            0, // disabled
+		perCallTimeout: 0, // disabled
+		includeHeader:  true,
+		codes:          DefaultRetriableCodes,
+		backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
+			return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt)
+		}),
+	}
+)
+
+// BackoffFunc denotes a family of functions that control the backoff duration between call retries.
+//
+// They are called with an identifier of the attempt, and should return a time the system client should
+// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
+// the deadline of the request takes precedence and the wait will be interrupted before proceeding
+// with the next iteration.
+type BackoffFunc func(attempt uint) time.Duration
+
+// BackoffFuncContext denotes a family of functions that control the backoff duration between call retries.
+//
+// They are called with an identifier of the attempt, and should return a time the system client should
+// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
+// the deadline of the request takes precedence and the wait will be interrupted before proceeding
+// with the next iteration. The context can be used to extract request scoped metadata and context values.
+type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration
+
+// Disable disables the retry behaviour on this call, or this interceptor.
+//
+// Its semantically the same to `WithMax`
+func Disable() CallOption {
+	return WithMax(0)
+}
+
+// WithMax sets the maximum number of retries on this call, or this interceptor.
+func WithMax(maxRetries uint) CallOption {
+	return CallOption{applyFunc: func(o *options) {
+		o.max = maxRetries
+	}}
+}
+
+// WithBackoff sets the `BackoffFunc` used to control time between retries.
+func WithBackoff(bf BackoffFunc) CallOption {
+	return CallOption{applyFunc: func(o *options) {
+		o.backoffFunc = BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
+			return bf(attempt)
+		})
+	}}
+}
+
+// WithBackoffContext sets the `BackoffFuncContext` used to control time between retries.
+func WithBackoffContext(bf BackoffFuncContext) CallOption {
+	return CallOption{applyFunc: func(o *options) {
+		o.backoffFunc = bf
+	}}
+}
+
+// WithCodes sets which codes should be retried.
+//
+// Please *use with care*, as you may be retrying non-idempotent calls.
+//
+// You cannot automatically retry on Cancelled and Deadline, please use `WithPerRetryTimeout` for these.
+func WithCodes(retryCodes ...codes.Code) CallOption {
+	return CallOption{applyFunc: func(o *options) {
+		o.codes = retryCodes
+	}}
+}
+
+// WithPerRetryTimeout sets the RPC timeout per call (including initial call) on this call, or this interceptor.
+//
+// The context.Deadline of the call takes precedence and sets the maximum time the whole invocation
+// will take, but WithPerRetryTimeout can be used to limit the RPC time per each call.
+//
+// For example, with context.Deadline = now + 10s, and WithPerRetryTimeout(3 * time.Seconds), each
+// of the retry calls (including the initial one) will have a deadline of now + 3s.
+//
+// A value of 0 disables the timeout overrides completely and returns to each retry call using the
+// parent `context.Deadline`.
+//
+// Note that when this is enabled, any DeadlineExceeded errors that are propagated up will be retried.
+func WithPerRetryTimeout(timeout time.Duration) CallOption {
+	return CallOption{applyFunc: func(o *options) {
+		o.perCallTimeout = timeout
+	}}
+}
+
+type options struct {
+	max            uint
+	perCallTimeout time.Duration
+	includeHeader  bool
+	codes          []codes.Code
+	backoffFunc    BackoffFuncContext
+}
+
+// CallOption is a grpc.CallOption that is local to grpc_retry.
+type CallOption struct {
+	grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
+	applyFunc            func(opt *options)
+}
+
+func reuseOrNewWithCallOptions(opt *options, callOptions []CallOption) *options {
+	if len(callOptions) == 0 {
+		return opt
+	}
+	optCopy := &options{}
+	*optCopy = *opt
+	for _, f := range callOptions {
+		f.applyFunc(optCopy)
+	}
+	return optCopy
+}
+
+func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []CallOption) {
+	for _, opt := range callOptions {
+		if co, ok := opt.(CallOption); ok {
+			retryOptions = append(retryOptions, co)
+		} else {
+			grpcOptions = append(grpcOptions, opt)
+		}
+	}
+	return grpcOptions, retryOptions
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/retry.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/retry.go
new file mode 100644
index 0000000..62d8312
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/retry.go
@@ -0,0 +1,329 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_retry
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"sync"
+	"time"
+
+	"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
+	"golang.org/x/net/trace"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/status"
+)
+
+const (
+	AttemptMetadataKey = "x-retry-attempty"
+)
+
+// UnaryClientInterceptor returns a new retrying unary client interceptor.
+//
+// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
+// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
+func UnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor {
+	intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
+	return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+		grpcOpts, retryOpts := filterCallOptions(opts)
+		callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
+		// short circuit for simplicity, and avoiding allocations.
+		if callOpts.max == 0 {
+			return invoker(parentCtx, method, req, reply, cc, grpcOpts...)
+		}
+		var lastErr error
+		for attempt := uint(0); attempt < callOpts.max; attempt++ {
+			if err := waitRetryBackoff(attempt, parentCtx, callOpts); err != nil {
+				return err
+			}
+			callCtx := perCallContext(parentCtx, callOpts, attempt)
+			lastErr = invoker(callCtx, method, req, reply, cc, grpcOpts...)
+			// TODO(mwitkow): Maybe dial and transport errors should be retriable?
+			if lastErr == nil {
+				return nil
+			}
+			logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr)
+			if isContextError(lastErr) {
+				if parentCtx.Err() != nil {
+					logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err())
+					// its the parent context deadline or cancellation.
+					return lastErr
+				} else if callOpts.perCallTimeout != 0 {
+					// We have set a perCallTimeout in the retry middleware, which would result in a context error if
+					// the deadline was exceeded, in which case try again.
+					logTrace(parentCtx, "grpc_retry attempt: %d, context error from retry call", attempt)
+					continue
+				}
+			}
+			if !isRetriable(lastErr, callOpts) {
+				return lastErr
+			}
+		}
+		return lastErr
+	}
+}
+
+// StreamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls.
+//
+// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
+// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
+//
+// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs
+// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams,
+// BidiStreams), the retry interceptor will fail the call.
+func StreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientInterceptor {
+	intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
+	return func(parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+		grpcOpts, retryOpts := filterCallOptions(opts)
+		callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
+		// short circuit for simplicity, and avoiding allocations.
+		if callOpts.max == 0 {
+			return streamer(parentCtx, desc, cc, method, grpcOpts...)
+		}
+		if desc.ClientStreams {
+			return nil, status.Errorf(codes.Unimplemented, "grpc_retry: cannot retry on ClientStreams, set grpc_retry.Disable()")
+		}
+
+		var lastErr error
+		for attempt := uint(0); attempt < callOpts.max; attempt++ {
+			if err := waitRetryBackoff(attempt, parentCtx, callOpts); err != nil {
+				return nil, err
+			}
+			callCtx := perCallContext(parentCtx, callOpts, 0)
+
+			var newStreamer grpc.ClientStream
+			newStreamer, lastErr = streamer(callCtx, desc, cc, method, grpcOpts...)
+			if lastErr == nil {
+				retryingStreamer := &serverStreamingRetryingStream{
+					ClientStream: newStreamer,
+					callOpts:     callOpts,
+					parentCtx:    parentCtx,
+					streamerCall: func(ctx context.Context) (grpc.ClientStream, error) {
+						return streamer(ctx, desc, cc, method, grpcOpts...)
+					},
+				}
+				return retryingStreamer, nil
+			}
+
+			logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr)
+			if isContextError(lastErr) {
+				if parentCtx.Err() != nil {
+					logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err())
+					// its the parent context deadline or cancellation.
+					return nil, lastErr
+				} else if callOpts.perCallTimeout != 0 {
+					// We have set a perCallTimeout in the retry middleware, which would result in a context error if
+					// the deadline was exceeded, in which case try again.
+					logTrace(parentCtx, "grpc_retry attempt: %d, context error from retry call", attempt)
+					continue
+				}
+			}
+			if !isRetriable(lastErr, callOpts) {
+				return nil, lastErr
+			}
+		}
+		return nil, lastErr
+	}
+}
+
+// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
+// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
+// a new ClientStream according to the retry policy.
+type serverStreamingRetryingStream struct {
+	grpc.ClientStream
+	bufferedSends []interface{} // single message that the client can sen
+	receivedGood  bool          // indicates whether any prior receives were successful
+	wasClosedSend bool          // indicates that CloseSend was closed
+	parentCtx     context.Context
+	callOpts      *options
+	streamerCall  func(ctx context.Context) (grpc.ClientStream, error)
+	mu            sync.RWMutex
+}
+
+func (s *serverStreamingRetryingStream) setStream(clientStream grpc.ClientStream) {
+	s.mu.Lock()
+	s.ClientStream = clientStream
+	s.mu.Unlock()
+}
+
+func (s *serverStreamingRetryingStream) getStream() grpc.ClientStream {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	return s.ClientStream
+}
+
+func (s *serverStreamingRetryingStream) SendMsg(m interface{}) error {
+	s.mu.Lock()
+	s.bufferedSends = append(s.bufferedSends, m)
+	s.mu.Unlock()
+	return s.getStream().SendMsg(m)
+}
+
+func (s *serverStreamingRetryingStream) CloseSend() error {
+	s.mu.Lock()
+	s.wasClosedSend = true
+	s.mu.Unlock()
+	return s.getStream().CloseSend()
+}
+
+func (s *serverStreamingRetryingStream) Header() (metadata.MD, error) {
+	return s.getStream().Header()
+}
+
+func (s *serverStreamingRetryingStream) Trailer() metadata.MD {
+	return s.getStream().Trailer()
+}
+
+func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
+	attemptRetry, lastErr := s.receiveMsgAndIndicateRetry(m)
+	if !attemptRetry {
+		return lastErr // success or hard failure
+	}
+	// We start off from attempt 1, because zeroth was already made on normal SendMsg().
+	for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
+		if err := waitRetryBackoff(attempt, s.parentCtx, s.callOpts); err != nil {
+			return err
+		}
+		callCtx := perCallContext(s.parentCtx, s.callOpts, attempt)
+		newStream, err := s.reestablishStreamAndResendBuffer(callCtx)
+		if err != nil {
+			// Retry dial and transport errors of establishing stream as grpc doesn't retry.
+			if isRetriable(err, s.callOpts) {
+				continue
+			}
+			return err
+		}
+
+		s.setStream(newStream)
+		attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
+		//fmt.Printf("Received message and indicate: %v  %v\n", attemptRetry, lastErr)
+		if !attemptRetry {
+			return lastErr
+		}
+	}
+	return lastErr
+}
+
+func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}) (bool, error) {
+	s.mu.RLock()
+	wasGood := s.receivedGood
+	s.mu.RUnlock()
+	err := s.getStream().RecvMsg(m)
+	if err == nil || err == io.EOF {
+		s.mu.Lock()
+		s.receivedGood = true
+		s.mu.Unlock()
+		return false, err
+	} else if wasGood {
+		// previous RecvMsg in the stream succeeded, no retry logic should interfere
+		return false, err
+	}
+	if isContextError(err) {
+		if s.parentCtx.Err() != nil {
+			logTrace(s.parentCtx, "grpc_retry parent context error: %v", s.parentCtx.Err())
+			return false, err
+		} else if s.callOpts.perCallTimeout != 0 {
+			// We have set a perCallTimeout in the retry middleware, which would result in a context error if
+			// the deadline was exceeded, in which case try again.
+			logTrace(s.parentCtx, "grpc_retry context error from retry call")
+			return true, err
+		}
+	}
+	return isRetriable(err, s.callOpts), err
+}
+
+func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(
+	callCtx context.Context,
+) (grpc.ClientStream, error) {
+	s.mu.RLock()
+	bufferedSends := s.bufferedSends
+	s.mu.RUnlock()
+	newStream, err := s.streamerCall(callCtx)
+	if err != nil {
+		logTrace(callCtx, "grpc_retry failed redialing new stream: %v", err)
+		return nil, err
+	}
+	for _, msg := range bufferedSends {
+		if err := newStream.SendMsg(msg); err != nil {
+			logTrace(callCtx, "grpc_retry failed resending message: %v", err)
+			return nil, err
+		}
+	}
+	if err := newStream.CloseSend(); err != nil {
+		logTrace(callCtx, "grpc_retry failed CloseSend on new stream %v", err)
+		return nil, err
+	}
+	return newStream, nil
+}
+
+func waitRetryBackoff(attempt uint, parentCtx context.Context, callOpts *options) error {
+	var waitTime time.Duration = 0
+	if attempt > 0 {
+		waitTime = callOpts.backoffFunc(parentCtx, attempt)
+	}
+	if waitTime > 0 {
+		logTrace(parentCtx, "grpc_retry attempt: %d, backoff for %v", attempt, waitTime)
+		timer := time.NewTimer(waitTime)
+		select {
+		case <-parentCtx.Done():
+			timer.Stop()
+			return contextErrToGrpcErr(parentCtx.Err())
+		case <-timer.C:
+		}
+	}
+	return nil
+}
+
+func isRetriable(err error, callOpts *options) bool {
+	errCode := status.Code(err)
+	if isContextError(err) {
+		// context errors are not retriable based on user settings.
+		return false
+	}
+	for _, code := range callOpts.codes {
+		if code == errCode {
+			return true
+		}
+	}
+	return false
+}
+
+func isContextError(err error) bool {
+	code := status.Code(err)
+	return code == codes.DeadlineExceeded || code == codes.Canceled
+}
+
+func perCallContext(parentCtx context.Context, callOpts *options, attempt uint) context.Context {
+	ctx := parentCtx
+	if callOpts.perCallTimeout != 0 {
+		ctx, _ = context.WithTimeout(ctx, callOpts.perCallTimeout)
+	}
+	if attempt > 0 && callOpts.includeHeader {
+		mdClone := metautils.ExtractOutgoing(ctx).Clone().Set(AttemptMetadataKey, fmt.Sprintf("%d", attempt))
+		ctx = mdClone.ToOutgoing(ctx)
+	}
+	return ctx
+}
+
+func contextErrToGrpcErr(err error) error {
+	switch err {
+	case context.DeadlineExceeded:
+		return status.Error(codes.DeadlineExceeded, err.Error())
+	case context.Canceled:
+		return status.Error(codes.Canceled, err.Error())
+	default:
+		return status.Error(codes.Unknown, err.Error())
+	}
+}
+
+func logTrace(ctx context.Context, format string, a ...interface{}) {
+	tr, ok := trace.FromContext(ctx)
+	if !ok {
+		return
+	}
+	tr.LazyPrintf(format, a...)
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils/backoff.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils/backoff.go
new file mode 100644
index 0000000..4e69a63
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils/backoff.go
@@ -0,0 +1,28 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+/*
+Backoff Helper Utilities
+
+Implements common backoff features.
+*/
+package backoffutils
+
+import (
+	"math/rand"
+	"time"
+)
+
+// JitterUp adds random jitter to the duration.
+//
+// This adds or subtracts time from the duration within a given jitter fraction.
+// For example for 10s and jitter 0.1, it will return a time within [9s, 11s])
+func JitterUp(duration time.Duration, jitter float64) time.Duration {
+	multiplier := jitter * (rand.Float64()*2 - 1)
+	return time.Duration(float64(duration) * (1 + multiplier))
+}
+
+// ExponentBase2 computes 2^(a-1) where a >= 1. If a is 0, the result is 0.
+func ExponentBase2(a uint) uint {
+	return (1 << a) >> 1
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index bda7c1d..21f4b7a 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -81,9 +81,12 @@
 # github.com/google/uuid v1.3.0
 github.com/google/uuid
 # github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
+## explicit
 github.com/grpc-ecosystem/go-grpc-middleware
+github.com/grpc-ecosystem/go-grpc-middleware/retry
 github.com/grpc-ecosystem/go-grpc-middleware/tags
 github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing
+github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils
 github.com/grpc-ecosystem/go-grpc-middleware/util/metautils
 # github.com/hashicorp/go-uuid v1.0.2
 github.com/hashicorp/go-uuid