VOL-2098 Support for Kafka liveness testing
* Adds liveness channel to sarama_client and kafka_interadapter proxy.
The liveness channel will push true or false to the channel on
each successful or failed Kafka publish.
* Adds support to make a "liveness publish attempt", which publishes
an empty message on a _liveness channel.
* Adds ServiceStatusNotReady to Probe
* Suppresses multiple Probe.UpdateStatus of the same status
* Adds the ability to attach a Probe to the grpc server, so that
when the probe returns NotReady, the Server responds to requests
with UNAVAILABLE.
Change-Id: I996c719570a50f2f6f397887d10d489608269c3f
diff --git a/pkg/grpc/common_test.go b/pkg/grpc/common_test.go
new file mode 100644
index 0000000..6ed7b26
--- /dev/null
+++ b/pkg/grpc/common_test.go
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package grpc
+
+import (
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+const (
+ /*
+ * This sets the LogLevel of the Voltha logger. It's pinned to FatalLevel here, as we
+ * generally don't want to see logger output, even when running go test in verbose
+ * mode. Even "Error" level messages are expected to be output by some unit tests.
+ *
+ * If you are developing a unit test, and experiencing problems or wish additional
+ * debugging from Voltha, then changing this constant to log.DebugLevel may be
+ * useful.
+ */
+
+ VOLTHA_LOGLEVEL = log.FatalLevel
+)
+
+// Unit test initialization. This init() function handles all unit tests in
+// the current directory.
+func init() {
+ // Logger must be configured or bad things happen
+ _, err := log.SetDefaultLogger(log.JSON, VOLTHA_LOGLEVEL, log.Fields{"instanceId": 1})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go
index 4c95aa1..488d470 100644
--- a/pkg/grpc/server.go
+++ b/pkg/grpc/server.go
@@ -20,7 +20,9 @@
"fmt"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/status"
"net"
)
@@ -52,12 +54,19 @@
s.server.Start(ctx)
*/
+// Interface allows probes to be attached to server
+// A probe must support the IsReady() method
+type ReadyProbe interface {
+ IsReady() bool
+}
+
type GrpcServer struct {
gs *grpc.Server
address string
port int
secure bool
services []func(*grpc.Server)
+ probe ReadyProbe // optional
*GrpcSecurity
}
@@ -97,11 +106,12 @@
if err != nil {
log.Fatalf("could not load TLS keys: %s", err)
}
- s.gs = grpc.NewServer(grpc.Creds(creds))
+ s.gs = grpc.NewServer(grpc.Creds(creds),
+ withServerUnaryInterceptor(s))
} else {
log.Info("starting-insecure-grpc-server")
- s.gs = grpc.NewServer()
+ s.gs = grpc.NewServer(withServerUnaryInterceptor(s))
}
// Register all required services
@@ -114,6 +124,42 @@
}
}
+// Attach a readiness probe to the server.
+// If the probe returns NotReady, the server will return UNAVAILABLE
+func (s *GrpcServer) AttachReadyProbe(p ReadyProbe) {
+ s.probe = p
+}
+
+func withServerUnaryInterceptor(s *GrpcServer) grpc.ServerOption {
+ return grpc.UnaryInterceptor(mkServerInterceptor(s))
+}
+
+// Make a serverInterceptor for the given GrpcServer
+// This interceptor will check whether there is an attached probe,
+// and if that probe indicates NotReady, then an UNAVAILABLE
+// response will be returned.
+func mkServerInterceptor(s *GrpcServer) func(ctx context.Context,
+ req interface{},
+ info *grpc.UnaryServerInfo,
+ handler grpc.UnaryHandler) (interface{}, error) {
+
+ return func(ctx context.Context,
+ req interface{},
+ info *grpc.UnaryServerInfo,
+ handler grpc.UnaryHandler) (interface{}, error) {
+
+ if (s.probe != nil) && (!s.probe.IsReady()) {
+ log.Warnf("Grpc request received while not ready %v", req)
+ return nil, status.Error(codes.Unavailable, "system is not ready")
+ }
+
+ // Calls the handler
+ h, err := handler(ctx, req)
+
+ return h, err
+ }
+}
+
/*
Stop servicing GRPC requests
*/
diff --git a/pkg/grpc/server_test.go b/pkg/grpc/server_test.go
new file mode 100644
index 0000000..a3afc09
--- /dev/null
+++ b/pkg/grpc/server_test.go
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package grpc
+
+import (
+ "context"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc"
+ "testing"
+)
+
+// A Mock Probe that returns the Ready member using the IsReady() func
+type MockReadyProbe struct {
+ Ready bool
+}
+
+func (m *MockReadyProbe) IsReady() bool {
+ return m.Ready
+}
+
+// A Mock handler that returns the request as its result
+func MockUnaryHandler(ctx context.Context, req interface{}) (interface{}, error) {
+ _ = ctx
+ return req, nil
+}
+
+func TestNewGrpcServer(t *testing.T) {
+ server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+ assert.NotNil(t, server)
+}
+
+func TestMkServerInterceptorNoProbe(t *testing.T) {
+ server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+ assert.NotNil(t, server)
+
+ f := mkServerInterceptor(server)
+ assert.NotNil(t, f)
+
+ req := "SomeRequest"
+ serverInfo := grpc.UnaryServerInfo{Server: nil, FullMethod: "somemethod"}
+
+ result, err := f(context.Background(),
+ req,
+ &serverInfo,
+ MockUnaryHandler)
+
+ assert.Nil(t, err)
+ assert.Equal(t, "SomeRequest", result)
+}
+
+func TestMkServerInterceptorReady(t *testing.T) {
+ server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+ assert.NotNil(t, server)
+
+ f := mkServerInterceptor(server)
+ assert.NotNil(t, f)
+
+ req := "SomeRequest"
+ serverInfo := grpc.UnaryServerInfo{Server: nil, FullMethod: "somemethod"}
+
+ probe := &MockReadyProbe{Ready: true}
+ server.AttachReadyProbe(probe)
+
+ result, err := f(context.Background(),
+ req,
+ &serverInfo,
+ MockUnaryHandler)
+
+ assert.Nil(t, err)
+ assert.NotNil(t, result)
+}
+
+func TestMkServerInterceptorNotReady(t *testing.T) {
+ server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+ assert.NotNil(t, server)
+
+ f := mkServerInterceptor(server)
+ assert.NotNil(t, f)
+
+ req := "SomeRequest"
+ serverInfo := grpc.UnaryServerInfo{Server: nil, FullMethod: "somemethod"}
+
+ probe := &MockReadyProbe{Ready: false}
+ server.AttachReadyProbe(probe)
+
+ result, err := f(context.Background(),
+ req,
+ &serverInfo,
+ MockUnaryHandler)
+
+ assert.NotNil(t, err)
+ assert.Nil(t, result)
+}