(VOL-4952) add parameter to enable grpc client to set retry options via interceptors
Change-Id: I7d04981da9eb5468363290fc04bce08d073a2db5
diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go
index 3baa1f4..307da44 100644
--- a/pkg/grpc/client.go
+++ b/pkg/grpc/client.go
@@ -229,7 +229,7 @@
}
// executeWithTimeout runs a sending function (sf) along with a receiving one(rf) and returns an error, if any.
-// If the deadline d elapses first, it returns a grpc DeadlineExceeded error instead.
+// If the deadline elapses first, it returns a grpc DeadlineExceeded error instead.
func (c *Client) executeWithTimeout(sf func(*common.Connection) error, rf func() (interface{}, error), conn *common.Connection, d time.Duration) error {
errChan := make(chan error, 1)
go func() {
@@ -394,7 +394,7 @@
}
// Start kicks off the adapter agent by trying to connect to the adapter
-func (c *Client) Start(ctx context.Context, handler GetServiceClient) {
+func (c *Client) Start(ctx context.Context, handler GetServiceClient, retry_interceptor ...grpc.UnaryClientInterceptor) {
logger.Debugw(ctx, "Starting GRPC - Client", log.Fields{"api-endpoint": c.serverEndPoint})
// If the context contains a k8s probe then register services
@@ -443,7 +443,14 @@
if c.state != stateConnecting {
c.state = stateConnecting
go func() {
- if err := c.connectToEndpoint(ctx, p); err != nil {
+ var err error
+ if len(retry_interceptor) > 0 {
+ err = c.connectToEndpoint(ctx, p, retry_interceptor...)
+ } else {
+ err = c.connectToEndpoint(ctx, p)
+ }
+
+ if err != nil {
c.stateLock.Lock()
c.state = stateDisconnected
c.stateLock.Unlock()
@@ -579,7 +586,7 @@
logger.Infow(ctx, "client-stopped", log.Fields{"api-endpoint": c.serverEndPoint, "client": c.clientEndpoint})
}
-func (c *Client) connectToEndpoint(ctx context.Context, p *probe.Probe) error {
+func (c *Client) connectToEndpoint(ctx context.Context, p *probe.Probe, retry_interceptor ...grpc.UnaryClientInterceptor) error {
if p != nil {
p.UpdateStatus(ctx, c.serverEndPoint, probe.ServiceStatusPreparing)
}
@@ -598,14 +605,17 @@
// 1. automatically inject
// 2. publish Open Tracing Spans by this GRPC Client
// 3. detect connection failure on client calls such that the reconnection process can begin
+ interceptor_opts := []grpc.UnaryClientInterceptor{grpc_opentracing.UnaryClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{}))}
+
+ if len(retry_interceptor) > 0 {
+ interceptor_opts = append(interceptor_opts, retry_interceptor...)
+ }
conn, err := grpc.Dial(c.serverEndPoint,
grpc.WithInsecure(),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
grpc_opentracing.StreamClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{})),
)),
- grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
- grpc_opentracing.UnaryClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{})),
- )),
+ grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(interceptor_opts...)),
)
if err == nil {