enabling grpc retry

Change-Id: I3e83654980386cca080f5ff5f2168e9b405fee1f
diff --git a/VERSION b/VERSION
index 15a2799..1809198 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.3.0
+3.4.0
diff --git a/go.mod b/go.mod
index 8129508..79d21a9 100644
--- a/go.mod
+++ b/go.mod
@@ -17,6 +17,7 @@
 	github.com/golang/mock v1.6.0
 	github.com/golang/protobuf v1.5.2
 	github.com/google/uuid v1.3.0
+	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
 	github.com/opencord/voltha-lib-go/v7 v7.4.3
 	github.com/opencord/voltha-protos/v5 v5.4.6
 	github.com/opentracing/opentracing-go v1.2.0
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index c928a84..1fe1940 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -60,6 +60,8 @@
 	BackoffRetryInitialInterval time.Duration
 	BackoffRetryMaxElapsedTime  time.Duration
 	BackoffRetryMaxInterval     time.Duration
+	PerRPCRetryTimeout          time.Duration
+	MaxRetries                  uint
 }
 
 // ParseCommandArguments parses the arguments when running read-write core service
@@ -201,6 +203,13 @@
 		"backoff_retry_max_interval",
 		1*time.Minute,
 		"The maximum number of milliseconds of an exponential backoff interval")
-
+	fs.DurationVar(&cf.PerRPCRetryTimeout,
+		"per_rpc_retry_timeout",
+		0*time.Second,
+		"The default timeout per RPC retry")
+	fs.UintVar(&cf.MaxRetries,
+		"max_grpc_client_retry",
+		0,
+		"The maximum number of times olt adaptor will retry in case grpc request timeouts")
 	_ = fs.Parse(args)
 }
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
index 973f767..c25c793 100644
--- a/rw_core/core/adapter/agent.go
+++ b/rw_core/core/adapter/agent.go
@@ -19,14 +19,15 @@
 import (
 	"context"
 	"errors"
-	"sync"
-	"time"
-
+	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
 	vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
 	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"github.com/opencord/voltha-protos/v5/go/adapter_service"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	"sync"
+	"time"
 )
 
 // agent represents adapter agent
@@ -39,6 +40,8 @@
 	onAdapterRestart   vgrpc.RestartedHandler
 	liveProbeInterval  time.Duration
 	coreEndpoint       string
+	maxRetries         uint
+	perRPCRetryTimeout time.Duration
 }
 
 func getAdapterServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
@@ -48,13 +51,15 @@
 	return adapter_service.NewAdapterServiceClient(conn)
 }
 
-func newAdapterAgent(coreEndpoint string, adapter *voltha.Adapter, onAdapterRestart vgrpc.RestartedHandler, liveProbeInterval time.Duration) *agent {
+func newAdapterAgent(coreEndpoint string, adapter *voltha.Adapter, onAdapterRestart vgrpc.RestartedHandler, liveProbeInterval time.Duration, maxRetries uint, perRPCRetryTimeout time.Duration) *agent {
 	return &agent{
 		adapter:            adapter,
 		onAdapterRestart:   onAdapterRestart,
 		adapterAPIEndPoint: adapter.Endpoint,
 		liveProbeInterval:  liveProbeInterval,
 		coreEndpoint:       coreEndpoint,
+		maxRetries:         maxRetries,
+		perRPCRetryTimeout: perRPCRetryTimeout,
 	}
 }
 
@@ -71,8 +76,15 @@
 
 	// Add a liveness communication update
 	aa.vClient.SubscribeForLiveness(aa.updateCommunicationTime)
-
-	go aa.vClient.Start(ctx, getAdapterServiceClientHandler)
+	// the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
+	backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(aa.perRPCRetryTimeout, 0.2))
+	retryCodes := []codes.Code{
+		codes.Unavailable,      // server is currently unavailable
+		codes.DeadlineExceeded, // deadline for the operation was exceeded
+	}
+	grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(aa.maxRetries), grpc_retry.WithPerRetryTimeout(aa.perRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
+	logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": aa.maxRetries, "TIMEOUT": aa.perRPCRetryTimeout})
+	go aa.vClient.Start(ctx, getAdapterServiceClientHandler, grpcRetryOptions)
 	return nil
 }
 
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index 1ab4341..d73da79 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -52,6 +52,8 @@
 	lockDeviceTypesMap      sync.RWMutex
 	lockAdapterEndPointsMap sync.RWMutex
 	liveProbeInterval       time.Duration
+	PerRPCRetryTimeout      time.Duration
+	MaxRetries              uint
 	coreEndpoint            string
 	rollingUpdateMap        map[string]bool
 	rollingUpdateLock       sync.RWMutex
@@ -70,6 +72,8 @@
 	coreInstanceID string,
 	backend *db.Backend,
 	liveProbeInterval time.Duration,
+	maxRetries uint,
+	perRPCRetryTimeout time.Duration,
 ) *Manager {
 	return &Manager{
 		adapterDbProxy:     dbPath.Proxy("adapters"),
@@ -80,6 +84,8 @@
 		endpointMgr:        NewEndpointManager(backend),
 		liveProbeInterval:  liveProbeInterval,
 		coreEndpoint:       coreEndpoint,
+		MaxRetries:         maxRetries,
+		PerRPCRetryTimeout: perRPCRetryTimeout,
 		rollingUpdateMap:   make(map[string]bool),
 		rxStreamCloseChMap: make(map[string]chan bool),
 	}
@@ -196,7 +202,7 @@
 		// Use a muted adapter restart handler which is invoked by the corresponding gRPC client on an adapter restart.
 		// This handler just log the restart event.  The actual action taken following an adapter restart
 		// will be done when an adapter re-registers itself.
-		aMgr.adapterAgents[adapter.Id] = newAdapterAgent(aMgr.coreEndpoint, clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
+		aMgr.adapterAgents[adapter.Id] = newAdapterAgent(aMgr.coreEndpoint, clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval, aMgr.MaxRetries, aMgr.PerRPCRetryTimeout)
 		aMgr.adapterEndpoints[Endpoint(adapter.Endpoint)] = aMgr.adapterAgents[adapter.Id]
 	}
 	return nil
@@ -229,7 +235,7 @@
 	// Use a muted adapter restart handler which is invoked by the corresponding gRPC client on an adapter restart.
 	// This handler just log the restart event.  The actual action taken following an adapter restart
 	// will be done when an adapter re-registers itself.
-	aMgr.adapterAgents[adapter.Id] = newAdapterAgent(aMgr.coreEndpoint, clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
+	aMgr.adapterAgents[adapter.Id] = newAdapterAgent(aMgr.coreEndpoint, clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval, aMgr.MaxRetries, aMgr.PerRPCRetryTimeout)
 	aMgr.adapterEndpoints[Endpoint(adapter.Endpoint)] = aMgr.adapterAgents[adapter.Id]
 	return nil
 }
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0df3b4e..c4eb6e7 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -143,7 +143,7 @@
 	dbPath := model.NewDBPath(backend)
 
 	// load adapters & device types while other things are starting
-	adapterMgr := adapter.NewAdapterManager(cf.GrpcSBIAddress, dbPath, id, backend, cf.LiveProbeInterval)
+	adapterMgr := adapter.NewAdapterManager(cf.GrpcSBIAddress, dbPath, id, backend, cf.LiveProbeInterval, cf.MaxRetries, cf.PerRPCRetryTimeout)
 	adapterMgr.Start(ctx, adapterService)
 
 	// We do not do a defer adapterMgr.Stop() here as we want this to be ran as soon as
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 076438c..49c4d6b 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -131,7 +131,7 @@
 		LivenessChannelInterval: cfg.LiveProbeInterval / 2}
 
 	proxy := model.NewDBPath(backend)
-	dat.adapterMgr = adapter.NewAdapterManager("test-endpoint", proxy, dat.coreInstanceID, backend, 5)
+	dat.adapterMgr = adapter.NewAdapterManager("test-endpoint", proxy, dat.coreInstanceID, backend, 5, 6, 5)
 	eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
 	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, cfg, dat.coreInstanceID, eventProxy)
 	dat.adapterMgr.Start(context.Background(), "agent-test")
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 8c9c8ee..3c626c6 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -154,7 +154,7 @@
 		LivenessChannelInterval: cfg.LiveProbeInterval / 2}
 
 	proxy := model.NewDBPath(backend)
-	adapterMgr := adapter.NewAdapterManager("test-endpoint", proxy, lda.coreInstanceID, backend, 5)
+	adapterMgr := adapter.NewAdapterManager("test-endpoint", proxy, lda.coreInstanceID, backend, 5, 6, 5)
 	eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
 	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, cfg, lda.coreInstanceID, eventProxy)
 	adapterMgr.Start(context.Background(), "logical-test")
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/backoff.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/backoff.go
new file mode 100644
index 0000000..ad35f09
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/backoff.go
@@ -0,0 +1,44 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_retry
+
+import (
+	"time"
+
+	"github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils"
+)
+
+// BackoffLinear is very simple: it waits for a fixed period of time between calls.
+func BackoffLinear(waitBetween time.Duration) BackoffFunc {
+	return func(attempt uint) time.Duration {
+		return waitBetween
+	}
+}
+
+// BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment).
+//
+// For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms.
+func BackoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) BackoffFunc {
+	return func(attempt uint) time.Duration {
+		return backoffutils.JitterUp(waitBetween, jitterFraction)
+	}
+}
+
+// BackoffExponential produces increasing intervals for each attempt.
+//
+// The scalar is multiplied times 2 raised to the current attempt. So the first
+// retry with a scalar of 100ms is 100ms, while the 5th attempt would be 1.6s.
+func BackoffExponential(scalar time.Duration) BackoffFunc {
+	return func(attempt uint) time.Duration {
+		return scalar * time.Duration(backoffutils.ExponentBase2(attempt))
+	}
+}
+
+// BackoffExponentialWithJitter creates an exponential backoff like
+// BackoffExponential does, but adds jitter.
+func BackoffExponentialWithJitter(scalar time.Duration, jitterFraction float64) BackoffFunc {
+	return func(attempt uint) time.Duration {
+		return backoffutils.JitterUp(scalar*time.Duration(backoffutils.ExponentBase2(attempt)), jitterFraction)
+	}
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/doc.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/doc.go
new file mode 100644
index 0000000..afd924a
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/doc.go
@@ -0,0 +1,25 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+/*
+`grpc_retry` provides client-side request retry logic for gRPC.
+
+Client-Side Request Retry Interceptor
+
+It allows for automatic retry, inside the generated gRPC code of requests based on the gRPC status
+of the reply. It supports unary (1:1), and server stream (1:n) requests.
+
+By default the interceptors *are disabled*, preventing accidental use of retries. You can easily
+override the number of retries (setting them to more than 0) with a `grpc.ClientOption`, e.g.:
+
+ myclient.Ping(ctx, goodPing, grpc_retry.WithMax(5))
+
+Other default options are: retry on `ResourceExhausted` and `Unavailable` gRPC codes, use a 50ms
+linear backoff with 10% jitter.
+
+For chained interceptors, the retry interceptor will call every interceptor that follows it
+whenever when a retry happens.
+
+Please see examples for more advanced use.
+*/
+package grpc_retry
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/options.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/options.go
new file mode 100644
index 0000000..7a633e2
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/options.go
@@ -0,0 +1,142 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_retry
+
+import (
+	"context"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+)
+
+var (
+	// DefaultRetriableCodes is a set of well known types gRPC codes that should be retri-able.
+	//
+	// `ResourceExhausted` means that the user quota, e.g. per-RPC limits, have been reached.
+	// `Unavailable` means that system is currently unavailable and the client should retry again.
+	DefaultRetriableCodes = []codes.Code{codes.ResourceExhausted, codes.Unavailable}
+
+	defaultOptions = &options{
+		max:            0, // disabled
+		perCallTimeout: 0, // disabled
+		includeHeader:  true,
+		codes:          DefaultRetriableCodes,
+		backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
+			return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt)
+		}),
+	}
+)
+
+// BackoffFunc denotes a family of functions that control the backoff duration between call retries.
+//
+// They are called with an identifier of the attempt, and should return a time the system client should
+// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
+// the deadline of the request takes precedence and the wait will be interrupted before proceeding
+// with the next iteration.
+type BackoffFunc func(attempt uint) time.Duration
+
+// BackoffFuncContext denotes a family of functions that control the backoff duration between call retries.
+//
+// They are called with an identifier of the attempt, and should return a time the system client should
+// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
+// the deadline of the request takes precedence and the wait will be interrupted before proceeding
+// with the next iteration. The context can be used to extract request scoped metadata and context values.
+type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration
+
+// Disable disables the retry behaviour on this call, or this interceptor.
+//
+// Its semantically the same to `WithMax`
+func Disable() CallOption {
+	return WithMax(0)
+}
+
+// WithMax sets the maximum number of retries on this call, or this interceptor.
+func WithMax(maxRetries uint) CallOption {
+	return CallOption{applyFunc: func(o *options) {
+		o.max = maxRetries
+	}}
+}
+
+// WithBackoff sets the `BackoffFunc` used to control time between retries.
+func WithBackoff(bf BackoffFunc) CallOption {
+	return CallOption{applyFunc: func(o *options) {
+		o.backoffFunc = BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
+			return bf(attempt)
+		})
+	}}
+}
+
+// WithBackoffContext sets the `BackoffFuncContext` used to control time between retries.
+func WithBackoffContext(bf BackoffFuncContext) CallOption {
+	return CallOption{applyFunc: func(o *options) {
+		o.backoffFunc = bf
+	}}
+}
+
+// WithCodes sets which codes should be retried.
+//
+// Please *use with care*, as you may be retrying non-idempotent calls.
+//
+// You cannot automatically retry on Cancelled and Deadline, please use `WithPerRetryTimeout` for these.
+func WithCodes(retryCodes ...codes.Code) CallOption {
+	return CallOption{applyFunc: func(o *options) {
+		o.codes = retryCodes
+	}}
+}
+
+// WithPerRetryTimeout sets the RPC timeout per call (including initial call) on this call, or this interceptor.
+//
+// The context.Deadline of the call takes precedence and sets the maximum time the whole invocation
+// will take, but WithPerRetryTimeout can be used to limit the RPC time per each call.
+//
+// For example, with context.Deadline = now + 10s, and WithPerRetryTimeout(3 * time.Seconds), each
+// of the retry calls (including the initial one) will have a deadline of now + 3s.
+//
+// A value of 0 disables the timeout overrides completely and returns to each retry call using the
+// parent `context.Deadline`.
+//
+// Note that when this is enabled, any DeadlineExceeded errors that are propagated up will be retried.
+func WithPerRetryTimeout(timeout time.Duration) CallOption {
+	return CallOption{applyFunc: func(o *options) {
+		o.perCallTimeout = timeout
+	}}
+}
+
+type options struct {
+	max            uint
+	perCallTimeout time.Duration
+	includeHeader  bool
+	codes          []codes.Code
+	backoffFunc    BackoffFuncContext
+}
+
+// CallOption is a grpc.CallOption that is local to grpc_retry.
+type CallOption struct {
+	grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
+	applyFunc            func(opt *options)
+}
+
+func reuseOrNewWithCallOptions(opt *options, callOptions []CallOption) *options {
+	if len(callOptions) == 0 {
+		return opt
+	}
+	optCopy := &options{}
+	*optCopy = *opt
+	for _, f := range callOptions {
+		f.applyFunc(optCopy)
+	}
+	return optCopy
+}
+
+func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []CallOption) {
+	for _, opt := range callOptions {
+		if co, ok := opt.(CallOption); ok {
+			retryOptions = append(retryOptions, co)
+		} else {
+			grpcOptions = append(grpcOptions, opt)
+		}
+	}
+	return grpcOptions, retryOptions
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/retry.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/retry.go
new file mode 100644
index 0000000..62d8312
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/retry/retry.go
@@ -0,0 +1,329 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_retry
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"sync"
+	"time"
+
+	"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
+	"golang.org/x/net/trace"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/status"
+)
+
+const (
+	AttemptMetadataKey = "x-retry-attempty"
+)
+
+// UnaryClientInterceptor returns a new retrying unary client interceptor.
+//
+// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
+// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
+func UnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor {
+	intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
+	return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+		grpcOpts, retryOpts := filterCallOptions(opts)
+		callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
+		// short circuit for simplicity, and avoiding allocations.
+		if callOpts.max == 0 {
+			return invoker(parentCtx, method, req, reply, cc, grpcOpts...)
+		}
+		var lastErr error
+		for attempt := uint(0); attempt < callOpts.max; attempt++ {
+			if err := waitRetryBackoff(attempt, parentCtx, callOpts); err != nil {
+				return err
+			}
+			callCtx := perCallContext(parentCtx, callOpts, attempt)
+			lastErr = invoker(callCtx, method, req, reply, cc, grpcOpts...)
+			// TODO(mwitkow): Maybe dial and transport errors should be retriable?
+			if lastErr == nil {
+				return nil
+			}
+			logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr)
+			if isContextError(lastErr) {
+				if parentCtx.Err() != nil {
+					logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err())
+					// its the parent context deadline or cancellation.
+					return lastErr
+				} else if callOpts.perCallTimeout != 0 {
+					// We have set a perCallTimeout in the retry middleware, which would result in a context error if
+					// the deadline was exceeded, in which case try again.
+					logTrace(parentCtx, "grpc_retry attempt: %d, context error from retry call", attempt)
+					continue
+				}
+			}
+			if !isRetriable(lastErr, callOpts) {
+				return lastErr
+			}
+		}
+		return lastErr
+	}
+}
+
+// StreamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls.
+//
+// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
+// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
+//
+// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs
+// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams,
+// BidiStreams), the retry interceptor will fail the call.
+func StreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientInterceptor {
+	intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
+	return func(parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+		grpcOpts, retryOpts := filterCallOptions(opts)
+		callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
+		// short circuit for simplicity, and avoiding allocations.
+		if callOpts.max == 0 {
+			return streamer(parentCtx, desc, cc, method, grpcOpts...)
+		}
+		if desc.ClientStreams {
+			return nil, status.Errorf(codes.Unimplemented, "grpc_retry: cannot retry on ClientStreams, set grpc_retry.Disable()")
+		}
+
+		var lastErr error
+		for attempt := uint(0); attempt < callOpts.max; attempt++ {
+			if err := waitRetryBackoff(attempt, parentCtx, callOpts); err != nil {
+				return nil, err
+			}
+			callCtx := perCallContext(parentCtx, callOpts, 0)
+
+			var newStreamer grpc.ClientStream
+			newStreamer, lastErr = streamer(callCtx, desc, cc, method, grpcOpts...)
+			if lastErr == nil {
+				retryingStreamer := &serverStreamingRetryingStream{
+					ClientStream: newStreamer,
+					callOpts:     callOpts,
+					parentCtx:    parentCtx,
+					streamerCall: func(ctx context.Context) (grpc.ClientStream, error) {
+						return streamer(ctx, desc, cc, method, grpcOpts...)
+					},
+				}
+				return retryingStreamer, nil
+			}
+
+			logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr)
+			if isContextError(lastErr) {
+				if parentCtx.Err() != nil {
+					logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err())
+					// its the parent context deadline or cancellation.
+					return nil, lastErr
+				} else if callOpts.perCallTimeout != 0 {
+					// We have set a perCallTimeout in the retry middleware, which would result in a context error if
+					// the deadline was exceeded, in which case try again.
+					logTrace(parentCtx, "grpc_retry attempt: %d, context error from retry call", attempt)
+					continue
+				}
+			}
+			if !isRetriable(lastErr, callOpts) {
+				return nil, lastErr
+			}
+		}
+		return nil, lastErr
+	}
+}
+
+// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
+// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
+// a new ClientStream according to the retry policy.
+type serverStreamingRetryingStream struct {
+	grpc.ClientStream
+	bufferedSends []interface{} // single message that the client can sen
+	receivedGood  bool          // indicates whether any prior receives were successful
+	wasClosedSend bool          // indicates that CloseSend was closed
+	parentCtx     context.Context
+	callOpts      *options
+	streamerCall  func(ctx context.Context) (grpc.ClientStream, error)
+	mu            sync.RWMutex
+}
+
+func (s *serverStreamingRetryingStream) setStream(clientStream grpc.ClientStream) {
+	s.mu.Lock()
+	s.ClientStream = clientStream
+	s.mu.Unlock()
+}
+
+func (s *serverStreamingRetryingStream) getStream() grpc.ClientStream {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	return s.ClientStream
+}
+
+func (s *serverStreamingRetryingStream) SendMsg(m interface{}) error {
+	s.mu.Lock()
+	s.bufferedSends = append(s.bufferedSends, m)
+	s.mu.Unlock()
+	return s.getStream().SendMsg(m)
+}
+
+func (s *serverStreamingRetryingStream) CloseSend() error {
+	s.mu.Lock()
+	s.wasClosedSend = true
+	s.mu.Unlock()
+	return s.getStream().CloseSend()
+}
+
+func (s *serverStreamingRetryingStream) Header() (metadata.MD, error) {
+	return s.getStream().Header()
+}
+
+func (s *serverStreamingRetryingStream) Trailer() metadata.MD {
+	return s.getStream().Trailer()
+}
+
+func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
+	attemptRetry, lastErr := s.receiveMsgAndIndicateRetry(m)
+	if !attemptRetry {
+		return lastErr // success or hard failure
+	}
+	// We start off from attempt 1, because zeroth was already made on normal SendMsg().
+	for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
+		if err := waitRetryBackoff(attempt, s.parentCtx, s.callOpts); err != nil {
+			return err
+		}
+		callCtx := perCallContext(s.parentCtx, s.callOpts, attempt)
+		newStream, err := s.reestablishStreamAndResendBuffer(callCtx)
+		if err != nil {
+			// Retry dial and transport errors of establishing stream as grpc doesn't retry.
+			if isRetriable(err, s.callOpts) {
+				continue
+			}
+			return err
+		}
+
+		s.setStream(newStream)
+		attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
+		//fmt.Printf("Received message and indicate: %v  %v\n", attemptRetry, lastErr)
+		if !attemptRetry {
+			return lastErr
+		}
+	}
+	return lastErr
+}
+
+func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}) (bool, error) {
+	s.mu.RLock()
+	wasGood := s.receivedGood
+	s.mu.RUnlock()
+	err := s.getStream().RecvMsg(m)
+	if err == nil || err == io.EOF {
+		s.mu.Lock()
+		s.receivedGood = true
+		s.mu.Unlock()
+		return false, err
+	} else if wasGood {
+		// previous RecvMsg in the stream succeeded, no retry logic should interfere
+		return false, err
+	}
+	if isContextError(err) {
+		if s.parentCtx.Err() != nil {
+			logTrace(s.parentCtx, "grpc_retry parent context error: %v", s.parentCtx.Err())
+			return false, err
+		} else if s.callOpts.perCallTimeout != 0 {
+			// We have set a perCallTimeout in the retry middleware, which would result in a context error if
+			// the deadline was exceeded, in which case try again.
+			logTrace(s.parentCtx, "grpc_retry context error from retry call")
+			return true, err
+		}
+	}
+	return isRetriable(err, s.callOpts), err
+}
+
+func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(
+	callCtx context.Context,
+) (grpc.ClientStream, error) {
+	s.mu.RLock()
+	bufferedSends := s.bufferedSends
+	s.mu.RUnlock()
+	newStream, err := s.streamerCall(callCtx)
+	if err != nil {
+		logTrace(callCtx, "grpc_retry failed redialing new stream: %v", err)
+		return nil, err
+	}
+	for _, msg := range bufferedSends {
+		if err := newStream.SendMsg(msg); err != nil {
+			logTrace(callCtx, "grpc_retry failed resending message: %v", err)
+			return nil, err
+		}
+	}
+	if err := newStream.CloseSend(); err != nil {
+		logTrace(callCtx, "grpc_retry failed CloseSend on new stream %v", err)
+		return nil, err
+	}
+	return newStream, nil
+}
+
+func waitRetryBackoff(attempt uint, parentCtx context.Context, callOpts *options) error {
+	var waitTime time.Duration = 0
+	if attempt > 0 {
+		waitTime = callOpts.backoffFunc(parentCtx, attempt)
+	}
+	if waitTime > 0 {
+		logTrace(parentCtx, "grpc_retry attempt: %d, backoff for %v", attempt, waitTime)
+		timer := time.NewTimer(waitTime)
+		select {
+		case <-parentCtx.Done():
+			timer.Stop()
+			return contextErrToGrpcErr(parentCtx.Err())
+		case <-timer.C:
+		}
+	}
+	return nil
+}
+
+func isRetriable(err error, callOpts *options) bool {
+	errCode := status.Code(err)
+	if isContextError(err) {
+		// context errors are not retriable based on user settings.
+		return false
+	}
+	for _, code := range callOpts.codes {
+		if code == errCode {
+			return true
+		}
+	}
+	return false
+}
+
+func isContextError(err error) bool {
+	code := status.Code(err)
+	return code == codes.DeadlineExceeded || code == codes.Canceled
+}
+
+func perCallContext(parentCtx context.Context, callOpts *options, attempt uint) context.Context {
+	ctx := parentCtx
+	if callOpts.perCallTimeout != 0 {
+		ctx, _ = context.WithTimeout(ctx, callOpts.perCallTimeout)
+	}
+	if attempt > 0 && callOpts.includeHeader {
+		mdClone := metautils.ExtractOutgoing(ctx).Clone().Set(AttemptMetadataKey, fmt.Sprintf("%d", attempt))
+		ctx = mdClone.ToOutgoing(ctx)
+	}
+	return ctx
+}
+
+func contextErrToGrpcErr(err error) error {
+	switch err {
+	case context.DeadlineExceeded:
+		return status.Error(codes.DeadlineExceeded, err.Error())
+	case context.Canceled:
+		return status.Error(codes.Canceled, err.Error())
+	default:
+		return status.Error(codes.Unknown, err.Error())
+	}
+}
+
+func logTrace(ctx context.Context, format string, a ...interface{}) {
+	tr, ok := trace.FromContext(ctx)
+	if !ok {
+		return
+	}
+	tr.LazyPrintf(format, a...)
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils/backoff.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils/backoff.go
new file mode 100644
index 0000000..4e69a63
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils/backoff.go
@@ -0,0 +1,28 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+/*
+Backoff Helper Utilities
+
+Implements common backoff features.
+*/
+package backoffutils
+
+import (
+	"math/rand"
+	"time"
+)
+
+// JitterUp adds random jitter to the duration.
+//
+// This adds or subtracts time from the duration within a given jitter fraction.
+// For example for 10s and jitter 0.1, it will return a time within [9s, 11s])
+func JitterUp(duration time.Duration, jitter float64) time.Duration {
+	multiplier := jitter * (rand.Float64()*2 - 1)
+	return time.Duration(float64(duration) * (1 + multiplier))
+}
+
+// ExponentBase2 computes 2^(a-1) where a >= 1. If a is 0, the result is 0.
+func ExponentBase2(a uint) uint {
+	return (1 << a) >> 1
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 8fafeca..a3b5d0a 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -152,9 +152,12 @@
 # github.com/gorilla/websocket v1.4.2
 github.com/gorilla/websocket
 # github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
+## explicit
 github.com/grpc-ecosystem/go-grpc-middleware
+github.com/grpc-ecosystem/go-grpc-middleware/retry
 github.com/grpc-ecosystem/go-grpc-middleware/tags
 github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing
+github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils
 github.com/grpc-ecosystem/go-grpc-middleware/util/metautils
 # github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
 github.com/grpc-ecosystem/go-grpc-prometheus