VOL-2098 Monitor Kafka service readiness

Change-Id: Ifb9658c8ea4f03374fe2921846149b1e55237327
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/grpc/server.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/grpc/server.go
index 4c95aa1..7796b83 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/grpc/server.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/grpc/server.go
@@ -20,7 +20,9 @@
 	"fmt"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
+	"google.golang.org/grpc/status"
 	"net"
 )
 
@@ -52,12 +54,19 @@
 	s.server.Start(ctx)
 */
 
+// Interface allows probes to be attached to server
+// A probe must support the IsReady() method
+type ReadyProbe interface {
+	IsReady() bool
+}
+
 type GrpcServer struct {
 	gs       *grpc.Server
 	address  string
 	port     int
 	secure   bool
 	services []func(*grpc.Server)
+	probe    ReadyProbe // optional
 
 	*GrpcSecurity
 }
@@ -70,12 +79,14 @@
 	port int,
 	certs *GrpcSecurity,
 	secure bool,
+	probe ReadyProbe,
 ) *GrpcServer {
 	server := &GrpcServer{
 		address:      address,
 		port:         port,
 		secure:       secure,
 		GrpcSecurity: certs,
+		probe:        probe,
 	}
 	return server
 }
@@ -97,11 +108,12 @@
 		if err != nil {
 			log.Fatalf("could not load TLS keys: %s", err)
 		}
-		s.gs = grpc.NewServer(grpc.Creds(creds))
+		s.gs = grpc.NewServer(grpc.Creds(creds),
+			withServerUnaryInterceptor(s))
 
 	} else {
 		log.Info("starting-insecure-grpc-server")
-		s.gs = grpc.NewServer()
+		s.gs = grpc.NewServer(withServerUnaryInterceptor(s))
 	}
 
 	// Register all required services
@@ -114,6 +126,36 @@
 	}
 }
 
+func withServerUnaryInterceptor(s *GrpcServer) grpc.ServerOption {
+	return grpc.UnaryInterceptor(mkServerInterceptor(s))
+}
+
+// Make a serverInterceptor for the given GrpcServer
+// This interceptor will check whether there is an attached probe,
+// and if that probe indicates NotReady, then an UNAVAILABLE
+// response will be returned.
+func mkServerInterceptor(s *GrpcServer) func(ctx context.Context,
+	req interface{},
+	info *grpc.UnaryServerInfo,
+	handler grpc.UnaryHandler) (interface{}, error) {
+
+	return func(ctx context.Context,
+		req interface{},
+		info *grpc.UnaryServerInfo,
+		handler grpc.UnaryHandler) (interface{}, error) {
+
+		if (s.probe != nil) && (!s.probe.IsReady()) {
+			log.Warnf("Grpc request received while not ready %v", req)
+			return nil, status.Error(codes.Unavailable, "system is not ready")
+		}
+
+		// Calls the handler
+		h, err := handler(ctx, req)
+
+		return h, err
+	}
+}
+
 /*
 Stop servicing GRPC requests
 */