VOL-2098 Support for Kafka liveness testing

* Adds liveness channel to sarama_client and kafka_interadapter proxy.
The liveness channel will push true or false to the channel on
each successful or failed Kafka publish.

* Adds support to make a "liveness publish attempt", which publishes
an empty message on a _liveness channel.

* Adds ServiceStatusNotReady to Probe

* Suppresses multiple Probe.UpdateStatus of the same status

* Adds the ability to attach a Probe to the grpc server, so that
when the probe returns NotReady, the Server responds to requests
with UNAVAILABLE.

Change-Id: I996c719570a50f2f6f397887d10d489608269c3f
diff --git a/pkg/grpc/common_test.go b/pkg/grpc/common_test.go
new file mode 100644
index 0000000..6ed7b26
--- /dev/null
+++ b/pkg/grpc/common_test.go
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package grpc
+
+import (
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+const (
+	/*
+	 * This sets the LogLevel of the Voltha logger. It's pinned to FatalLevel here, as we
+	 * generally don't want to see logger output, even when running go test in verbose
+	 * mode. Even "Error" level messages are expected to be output by some unit tests.
+	 *
+	 * If you are developing a unit test, and experiencing problems or wish additional
+	 * debugging from Voltha, then changing this constant to log.DebugLevel may be
+	 * useful.
+	 */
+
+	VOLTHA_LOGLEVEL = log.FatalLevel
+)
+
+// Unit test initialization. This init() function handles all unit tests in
+// the current directory.
+func init() {
+	// Logger must be configured or bad things happen
+	_, err := log.SetDefaultLogger(log.JSON, VOLTHA_LOGLEVEL, log.Fields{"instanceId": 1})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go
index 4c95aa1..488d470 100644
--- a/pkg/grpc/server.go
+++ b/pkg/grpc/server.go
@@ -20,7 +20,9 @@
 	"fmt"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
+	"google.golang.org/grpc/status"
 	"net"
 )
 
@@ -52,12 +54,19 @@
 	s.server.Start(ctx)
 */
 
+// Interface allows probes to be attached to server
+// A probe must support the IsReady() method
+type ReadyProbe interface {
+	IsReady() bool
+}
+
 type GrpcServer struct {
 	gs       *grpc.Server
 	address  string
 	port     int
 	secure   bool
 	services []func(*grpc.Server)
+	probe    ReadyProbe // optional
 
 	*GrpcSecurity
 }
@@ -97,11 +106,12 @@
 		if err != nil {
 			log.Fatalf("could not load TLS keys: %s", err)
 		}
-		s.gs = grpc.NewServer(grpc.Creds(creds))
+		s.gs = grpc.NewServer(grpc.Creds(creds),
+			withServerUnaryInterceptor(s))
 
 	} else {
 		log.Info("starting-insecure-grpc-server")
-		s.gs = grpc.NewServer()
+		s.gs = grpc.NewServer(withServerUnaryInterceptor(s))
 	}
 
 	// Register all required services
@@ -114,6 +124,42 @@
 	}
 }
 
+// Attach a readiness probe to the server.
+// If the probe returns NotReady, the server will return UNAVAILABLE
+func (s *GrpcServer) AttachReadyProbe(p ReadyProbe) {
+	s.probe = p
+}
+
+func withServerUnaryInterceptor(s *GrpcServer) grpc.ServerOption {
+	return grpc.UnaryInterceptor(mkServerInterceptor(s))
+}
+
+// Make a serverInterceptor for the given GrpcServer
+// This interceptor will check whether there is an attached probe,
+// and if that probe indicates NotReady, then an UNAVAILABLE
+// response will be returned.
+func mkServerInterceptor(s *GrpcServer) func(ctx context.Context,
+	req interface{},
+	info *grpc.UnaryServerInfo,
+	handler grpc.UnaryHandler) (interface{}, error) {
+
+	return func(ctx context.Context,
+		req interface{},
+		info *grpc.UnaryServerInfo,
+		handler grpc.UnaryHandler) (interface{}, error) {
+
+		if (s.probe != nil) && (!s.probe.IsReady()) {
+			log.Warnf("Grpc request received while not ready %v", req)
+			return nil, status.Error(codes.Unavailable, "system is not ready")
+		}
+
+		// Calls the handler
+		h, err := handler(ctx, req)
+
+		return h, err
+	}
+}
+
 /*
 Stop servicing GRPC requests
 */
diff --git a/pkg/grpc/server_test.go b/pkg/grpc/server_test.go
new file mode 100644
index 0000000..a3afc09
--- /dev/null
+++ b/pkg/grpc/server_test.go
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package grpc
+
+import (
+	"context"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc"
+	"testing"
+)
+
+// A Mock Probe that returns the Ready member using the IsReady() func
+type MockReadyProbe struct {
+	Ready bool
+}
+
+func (m *MockReadyProbe) IsReady() bool {
+	return m.Ready
+}
+
+// A Mock handler that returns the request as its result
+func MockUnaryHandler(ctx context.Context, req interface{}) (interface{}, error) {
+	_ = ctx
+	return req, nil
+}
+
+func TestNewGrpcServer(t *testing.T) {
+	server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+	assert.NotNil(t, server)
+}
+
+func TestMkServerInterceptorNoProbe(t *testing.T) {
+	server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+	assert.NotNil(t, server)
+
+	f := mkServerInterceptor(server)
+	assert.NotNil(t, f)
+
+	req := "SomeRequest"
+	serverInfo := grpc.UnaryServerInfo{Server: nil, FullMethod: "somemethod"}
+
+	result, err := f(context.Background(),
+		req,
+		&serverInfo,
+		MockUnaryHandler)
+
+	assert.Nil(t, err)
+	assert.Equal(t, "SomeRequest", result)
+}
+
+func TestMkServerInterceptorReady(t *testing.T) {
+	server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+	assert.NotNil(t, server)
+
+	f := mkServerInterceptor(server)
+	assert.NotNil(t, f)
+
+	req := "SomeRequest"
+	serverInfo := grpc.UnaryServerInfo{Server: nil, FullMethod: "somemethod"}
+
+	probe := &MockReadyProbe{Ready: true}
+	server.AttachReadyProbe(probe)
+
+	result, err := f(context.Background(),
+		req,
+		&serverInfo,
+		MockUnaryHandler)
+
+	assert.Nil(t, err)
+	assert.NotNil(t, result)
+}
+
+func TestMkServerInterceptorNotReady(t *testing.T) {
+	server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+	assert.NotNil(t, server)
+
+	f := mkServerInterceptor(server)
+	assert.NotNil(t, f)
+
+	req := "SomeRequest"
+	serverInfo := grpc.UnaryServerInfo{Server: nil, FullMethod: "somemethod"}
+
+	probe := &MockReadyProbe{Ready: false}
+	server.AttachReadyProbe(probe)
+
+	result, err := f(context.Background(),
+		req,
+		&serverInfo,
+		MockUnaryHandler)
+
+	assert.NotNil(t, err)
+	assert.Nil(t, result)
+}