[VOL-3197][VOL-3196] Enhanced Kafka RPC and gRPC interfaces to propagate Span context for log correlation
Also, made some corrections to helper method in log/utils based on testing
Change-Id: Ic0fec935dd8996b3c6c17116586c5bd307e7bebb
diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go
index 2bf7696..9f7c785 100644
--- a/pkg/grpc/server.go
+++ b/pkg/grpc/server.go
@@ -17,6 +17,9 @@
import (
"context"
+ grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+ grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
+ "github.com/opentracing/opentracing-go"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@@ -96,17 +99,27 @@
logger.Fatalf(ctx, "failed to listen: %v", err)
}
+ // Use Intercepters to automatically inject and publish Open Tracing Spans by this GRPC server
+ serverOptions := []grpc.ServerOption{
+ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
+ grpc_opentracing.StreamServerInterceptor(grpc_opentracing.WithTracer(opentracing.GlobalTracer())),
+ )),
+ grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
+ grpc_opentracing.UnaryServerInterceptor(grpc_opentracing.WithTracer(opentracing.GlobalTracer())),
+ mkServerInterceptor(s),
+ ))}
+
if s.secure && s.GrpcSecurity != nil {
creds, err := credentials.NewServerTLSFromFile(s.CertFile, s.KeyFile)
if err != nil {
logger.Fatalf(ctx, "could not load TLS keys: %s", err)
}
- s.gs = grpc.NewServer(grpc.Creds(creds),
- withServerUnaryInterceptor(s))
+ serverOptions = append(serverOptions, grpc.Creds(creds))
+ s.gs = grpc.NewServer(serverOptions...)
} else {
logger.Info(ctx, "starting-insecure-grpc-server")
- s.gs = grpc.NewServer(withServerUnaryInterceptor(s))
+ s.gs = grpc.NewServer(serverOptions...)
}
// Register all required services
@@ -119,10 +132,6 @@
}
}
-func withServerUnaryInterceptor(s *GrpcServer) grpc.ServerOption {
- return grpc.UnaryInterceptor(mkServerInterceptor(s))
-}
-
// Make a serverInterceptor for the given GrpcServer
// This interceptor will check whether there is an attached probe,
// and if that probe indicates NotReady, then an UNAVAILABLE