[VOL-3197][VOL-3196] Enhanced Kafka RPC and gRPC interfaces to propagate Span context for log correlation

Also, made some corrections to helper method in log/utils based on testing

Change-Id: Ic0fec935dd8996b3c6c17116586c5bd307e7bebb
diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go
index 2bf7696..9f7c785 100644
--- a/pkg/grpc/server.go
+++ b/pkg/grpc/server.go
@@ -17,6 +17,9 @@
 
 import (
 	"context"
+	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
+	"github.com/opentracing/opentracing-go"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
@@ -96,17 +99,27 @@
 		logger.Fatalf(ctx, "failed to listen: %v", err)
 	}
 
+	// Use Intercepters to automatically inject and publish Open Tracing Spans by this GRPC server
+	serverOptions := []grpc.ServerOption{
+		grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
+			grpc_opentracing.StreamServerInterceptor(grpc_opentracing.WithTracer(opentracing.GlobalTracer())),
+		)),
+		grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
+			grpc_opentracing.UnaryServerInterceptor(grpc_opentracing.WithTracer(opentracing.GlobalTracer())),
+			mkServerInterceptor(s),
+		))}
+
 	if s.secure && s.GrpcSecurity != nil {
 		creds, err := credentials.NewServerTLSFromFile(s.CertFile, s.KeyFile)
 		if err != nil {
 			logger.Fatalf(ctx, "could not load TLS keys: %s", err)
 		}
-		s.gs = grpc.NewServer(grpc.Creds(creds),
-			withServerUnaryInterceptor(s))
 
+		serverOptions = append(serverOptions, grpc.Creds(creds))
+		s.gs = grpc.NewServer(serverOptions...)
 	} else {
 		logger.Info(ctx, "starting-insecure-grpc-server")
-		s.gs = grpc.NewServer(withServerUnaryInterceptor(s))
+		s.gs = grpc.NewServer(serverOptions...)
 	}
 
 	// Register all required services
@@ -119,10 +132,6 @@
 	}
 }
 
-func withServerUnaryInterceptor(s *GrpcServer) grpc.ServerOption {
-	return grpc.UnaryInterceptor(mkServerInterceptor(s))
-}
-
 // Make a serverInterceptor for the given GrpcServer
 // This interceptor will check whether there is an attached probe,
 // and if that probe indicates NotReady, then an UNAVAILABLE