VOL-2098 Support for Kafka liveness testing

* Adds liveness channel to sarama_client and kafka_interadapter proxy.
The liveness channel will push true or false to the channel on
each successful or failed Kafka publish.

* Adds support to make a "liveness publish attempt", which publishes
an empty message on a _liveness channel.

* Adds ServiceStatusNotReady to Probe

* Suppresses multiple Probe.UpdateStatus of the same status

* Adds the ability to attach a Probe to the grpc server, so that
when the probe returns NotReady, the Server responds to requests
with UNAVAILABLE.

Change-Id: I996c719570a50f2f6f397887d10d489608269c3f
diff --git a/pkg/probe/probe.go b/pkg/probe/probe.go
index 9823566..7e6dbf9 100644
--- a/pkg/probe/probe.go
+++ b/pkg/probe/probe.go
@@ -47,6 +47,9 @@
 
 	// ServiceStatusFailed service has stopped because of an error
 	ServiceStatusFailed
+
+	// ServiceStatusNotReady service has started but is unable to accept requests
+	ServiceStatusNotReady
 )
 
 const (
@@ -71,6 +74,8 @@
 		return "Stopped"
 	case ServiceStatusFailed:
 		return "Failed"
+	case ServiceStatusNotReady:
+		return "NotReady"
 	}
 }
 
@@ -137,6 +142,13 @@
 	if p.status == nil {
 		p.status = make(map[string]ServiceStatus)
 	}
+
+	// if status hasn't changed, avoid doing useless work
+	existingStatus, ok := p.status[name]
+	if ok && (existingStatus == status) {
+		return
+	}
+
 	p.status[name] = status
 	if p.readyFunc != nil {
 		p.isReady = p.readyFunc(p.status)
@@ -158,17 +170,41 @@
 		})
 }
 
+func (p *Probe) GetStatus(name string) ServiceStatus {
+	p.mutex.Lock()
+	defer p.mutex.Unlock()
+
+	if p.status == nil {
+		p.status = make(map[string]ServiceStatus)
+	}
+
+	currentStatus, ok := p.status[name]
+	if ok {
+		return currentStatus
+	}
+
+	return ServiceStatusUnknown
+}
+
+func GetProbeFromContext(ctx context.Context) *Probe {
+	if ctx != nil {
+		if value := ctx.Value(ProbeContextKey); value != nil {
+			if p, ok := value.(*Probe); ok {
+				return p
+			}
+		}
+	}
+	return nil
+}
+
 // UpdateStatusFromContext a convenience function to pull the Probe reference from the
 // Context, if it exists, and then calling UpdateStatus on that Probe reference. If Context
 // is nil or if a Probe reference is not associated with the ProbeContextKey then nothing
 // happens
 func UpdateStatusFromContext(ctx context.Context, name string, status ServiceStatus) {
-	if ctx != nil {
-		if value := ctx.Value(ProbeContextKey); value != nil {
-			if p, ok := value.(*Probe); ok {
-				p.UpdateStatus(name, status)
-			}
-		}
+	p := GetProbeFromContext(ctx)
+	if p != nil {
+		p.UpdateStatus(name, status)
 	}
 }
 
@@ -225,6 +261,10 @@
 	log.Fatal(s.ListenAndServe())
 }
 
+func (p *Probe) IsReady() bool {
+	return p.isReady
+}
+
 // defaultReadyFunc if all services are running then ready, else not
 func defaultReadyFunc(services map[string]ServiceStatus) bool {
 	if len(services) == 0 {
diff --git a/pkg/probe/probe_test.go b/pkg/probe/probe_test.go
index 2a797d6..537bf7d 100644
--- a/pkg/probe/probe_test.go
+++ b/pkg/probe/probe_test.go
@@ -37,6 +37,7 @@
 	assert.Equal(t, "Running", ServiceStatusRunning.String(), "ServiceStatusRunning")
 	assert.Equal(t, "Stopped", ServiceStatusStopped.String(), "ServiceStatusStopped")
 	assert.Equal(t, "Failed", ServiceStatusFailed.String(), "ServiceStatusFailed")
+	assert.Equal(t, "NotReady", ServiceStatusNotReady.String(), "ServiceStatusNotReady")
 }
 
 func AlwaysTrue(map[string]ServiceStatus) bool {
@@ -333,6 +334,20 @@
 	assert.Nil(t, p.healthFunc, "health func not reset to nil")
 }
 
+func TestGetProbeFromContext(t *testing.T) {
+	p := &Probe{}
+	p.RegisterService("one")
+	ctx := context.WithValue(context.Background(), ProbeContextKey, p)
+	pc := GetProbeFromContext(ctx)
+	assert.Equal(t, p, pc, "Probe from context was not identical to original probe")
+}
+
+func TestGetProbeFromContextMssing(t *testing.T) {
+	ctx := context.Background()
+	pc := GetProbeFromContext(ctx)
+	assert.Nil(t, pc, "Context had a non-nil probe when it should have been nil")
+}
+
 func TestUpdateStatusFromContext(t *testing.T) {
 	p := &Probe{}
 	p.RegisterService("one")
@@ -343,7 +358,6 @@
 	_, ok := p.status["one"]
 	assert.True(t, ok, "unable to find registered service")
 	assert.Equal(t, ServiceStatusRunning, p.status["one"], "status not set correctly from context")
-
 }
 
 func TestUpdateStatusFromNilContext(t *testing.T) {
@@ -392,3 +406,38 @@
 	assert.True(t, ok, "unable to find registered service")
 	assert.Equal(t, ServiceStatusRunning, p.status["one"], "status not set correctly from context")
 }
+
+func TestIsReadyTrue(t *testing.T) {
+	p := (&Probe{}).WithReadyFunc(AlwaysTrue).WithHealthFunc(AlwaysFalse)
+
+	p.RegisterService("SomeService")
+
+	assert.True(t, p.IsReady(), "IsReady should have been true")
+}
+
+func TestIsReadyFalse(t *testing.T) {
+	p := (&Probe{}).WithReadyFunc(AlwaysFalse).WithHealthFunc(AlwaysFalse)
+
+	p.RegisterService("SomeService")
+
+	assert.False(t, p.IsReady(), "IsReady should have been false")
+}
+
+func TestGetStatus(t *testing.T) {
+	p := &Probe{}
+
+	p.RegisterService("one", "two")
+	p.UpdateStatus("one", ServiceStatusRunning)
+
+	ss := p.GetStatus("one")
+	assert.Equal(t, ServiceStatusRunning, ss, "Service status should have been ServiceStatusRunning")
+}
+
+func TestGetStatusMissingService(t *testing.T) {
+	p := &Probe{}
+
+	p.RegisterService("one", "two")
+
+	ss := p.GetStatus("three")
+	assert.Equal(t, ServiceStatusUnknown, ss, "Service status should have been ServiceStatusUnknown")
+}