VOL-2098 Monitor Kafka service readiness
Change-Id: Ifb9658c8ea4f03374fe2921846149b1e55237327
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/grpc/server.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/grpc/server.go
index 4c95aa1..7796b83 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/grpc/server.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/grpc/server.go
@@ -20,7 +20,9 @@
"fmt"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/status"
"net"
)
@@ -52,12 +54,19 @@
s.server.Start(ctx)
*/
+// Interface allows probes to be attached to server
+// A probe must support the IsReady() method
+type ReadyProbe interface {
+ IsReady() bool
+}
+
type GrpcServer struct {
gs *grpc.Server
address string
port int
secure bool
services []func(*grpc.Server)
+ probe ReadyProbe // optional
*GrpcSecurity
}
@@ -70,12 +79,14 @@
port int,
certs *GrpcSecurity,
secure bool,
+ probe ReadyProbe,
) *GrpcServer {
server := &GrpcServer{
address: address,
port: port,
secure: secure,
GrpcSecurity: certs,
+ probe: probe,
}
return server
}
@@ -97,11 +108,12 @@
if err != nil {
log.Fatalf("could not load TLS keys: %s", err)
}
- s.gs = grpc.NewServer(grpc.Creds(creds))
+ s.gs = grpc.NewServer(grpc.Creds(creds),
+ withServerUnaryInterceptor(s))
} else {
log.Info("starting-insecure-grpc-server")
- s.gs = grpc.NewServer()
+ s.gs = grpc.NewServer(withServerUnaryInterceptor(s))
}
// Register all required services
@@ -114,6 +126,36 @@
}
}
+func withServerUnaryInterceptor(s *GrpcServer) grpc.ServerOption {
+ return grpc.UnaryInterceptor(mkServerInterceptor(s))
+}
+
+// Make a serverInterceptor for the given GrpcServer
+// This interceptor will check whether there is an attached probe,
+// and if that probe indicates NotReady, then an UNAVAILABLE
+// response will be returned.
+func mkServerInterceptor(s *GrpcServer) func(ctx context.Context,
+ req interface{},
+ info *grpc.UnaryServerInfo,
+ handler grpc.UnaryHandler) (interface{}, error) {
+
+ return func(ctx context.Context,
+ req interface{},
+ info *grpc.UnaryServerInfo,
+ handler grpc.UnaryHandler) (interface{}, error) {
+
+ if (s.probe != nil) && (!s.probe.IsReady()) {
+ log.Warnf("Grpc request received while not ready %v", req)
+ return nil, status.Error(codes.Unavailable, "system is not ready")
+ }
+
+ // Calls the handler
+ h, err := handler(ctx, req)
+
+ return h, err
+ }
+}
+
/*
Stop servicing GRPC requests
*/