VOL-2098 Support for Kafka liveness testing
* Adds liveness channel to sarama_client and kafka_interadapter proxy.
The liveness channel will push true or false to the channel on
each successful or failed Kafka publish.
* Adds support to make a "liveness publish attempt", which publishes
an empty message on a _liveness channel.
* Adds ServiceStatusNotReady to Probe
* Suppresses multiple Probe.UpdateStatus of the same status
* Adds the ability to attach a Probe to the grpc server, so that
when the probe returns NotReady, the Server responds to requests
with UNAVAILABLE.
Change-Id: I996c719570a50f2f6f397887d10d489608269c3f
diff --git a/pkg/grpc/common_test.go b/pkg/grpc/common_test.go
new file mode 100644
index 0000000..6ed7b26
--- /dev/null
+++ b/pkg/grpc/common_test.go
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package grpc
+
+import (
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+const (
+ /*
+ * This sets the LogLevel of the Voltha logger. It's pinned to FatalLevel here, as we
+ * generally don't want to see logger output, even when running go test in verbose
+ * mode. Even "Error" level messages are expected to be output by some unit tests.
+ *
+ * If you are developing a unit test, and experiencing problems or wish additional
+ * debugging from Voltha, then changing this constant to log.DebugLevel may be
+ * useful.
+ */
+
+ VOLTHA_LOGLEVEL = log.FatalLevel
+)
+
+// Unit test initialization. This init() function handles all unit tests in
+// the current directory.
+func init() {
+ // Logger must be configured or bad things happen
+ _, err := log.SetDefaultLogger(log.JSON, VOLTHA_LOGLEVEL, log.Fields{"instanceId": 1})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go
index 4c95aa1..488d470 100644
--- a/pkg/grpc/server.go
+++ b/pkg/grpc/server.go
@@ -20,7 +20,9 @@
"fmt"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/status"
"net"
)
@@ -52,12 +54,19 @@
s.server.Start(ctx)
*/
+// Interface allows probes to be attached to server
+// A probe must support the IsReady() method
+type ReadyProbe interface {
+ IsReady() bool
+}
+
type GrpcServer struct {
gs *grpc.Server
address string
port int
secure bool
services []func(*grpc.Server)
+ probe ReadyProbe // optional
*GrpcSecurity
}
@@ -97,11 +106,12 @@
if err != nil {
log.Fatalf("could not load TLS keys: %s", err)
}
- s.gs = grpc.NewServer(grpc.Creds(creds))
+ s.gs = grpc.NewServer(grpc.Creds(creds),
+ withServerUnaryInterceptor(s))
} else {
log.Info("starting-insecure-grpc-server")
- s.gs = grpc.NewServer()
+ s.gs = grpc.NewServer(withServerUnaryInterceptor(s))
}
// Register all required services
@@ -114,6 +124,42 @@
}
}
+// Attach a readiness probe to the server.
+// If the probe returns NotReady, the server will return UNAVAILABLE
+func (s *GrpcServer) AttachReadyProbe(p ReadyProbe) {
+ s.probe = p
+}
+
+func withServerUnaryInterceptor(s *GrpcServer) grpc.ServerOption {
+ return grpc.UnaryInterceptor(mkServerInterceptor(s))
+}
+
+// Make a serverInterceptor for the given GrpcServer
+// This interceptor will check whether there is an attached probe,
+// and if that probe indicates NotReady, then an UNAVAILABLE
+// response will be returned.
+func mkServerInterceptor(s *GrpcServer) func(ctx context.Context,
+ req interface{},
+ info *grpc.UnaryServerInfo,
+ handler grpc.UnaryHandler) (interface{}, error) {
+
+ return func(ctx context.Context,
+ req interface{},
+ info *grpc.UnaryServerInfo,
+ handler grpc.UnaryHandler) (interface{}, error) {
+
+ if (s.probe != nil) && (!s.probe.IsReady()) {
+ log.Warnf("Grpc request received while not ready %v", req)
+ return nil, status.Error(codes.Unavailable, "system is not ready")
+ }
+
+ // Calls the handler
+ h, err := handler(ctx, req)
+
+ return h, err
+ }
+}
+
/*
Stop servicing GRPC requests
*/
diff --git a/pkg/grpc/server_test.go b/pkg/grpc/server_test.go
new file mode 100644
index 0000000..a3afc09
--- /dev/null
+++ b/pkg/grpc/server_test.go
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package grpc
+
+import (
+ "context"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc"
+ "testing"
+)
+
+// A Mock Probe that returns the Ready member using the IsReady() func
+type MockReadyProbe struct {
+ Ready bool
+}
+
+func (m *MockReadyProbe) IsReady() bool {
+ return m.Ready
+}
+
+// A Mock handler that returns the request as its result
+func MockUnaryHandler(ctx context.Context, req interface{}) (interface{}, error) {
+ _ = ctx
+ return req, nil
+}
+
+func TestNewGrpcServer(t *testing.T) {
+ server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+ assert.NotNil(t, server)
+}
+
+func TestMkServerInterceptorNoProbe(t *testing.T) {
+ server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+ assert.NotNil(t, server)
+
+ f := mkServerInterceptor(server)
+ assert.NotNil(t, f)
+
+ req := "SomeRequest"
+ serverInfo := grpc.UnaryServerInfo{Server: nil, FullMethod: "somemethod"}
+
+ result, err := f(context.Background(),
+ req,
+ &serverInfo,
+ MockUnaryHandler)
+
+ assert.Nil(t, err)
+ assert.Equal(t, "SomeRequest", result)
+}
+
+func TestMkServerInterceptorReady(t *testing.T) {
+ server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+ assert.NotNil(t, server)
+
+ f := mkServerInterceptor(server)
+ assert.NotNil(t, f)
+
+ req := "SomeRequest"
+ serverInfo := grpc.UnaryServerInfo{Server: nil, FullMethod: "somemethod"}
+
+ probe := &MockReadyProbe{Ready: true}
+ server.AttachReadyProbe(probe)
+
+ result, err := f(context.Background(),
+ req,
+ &serverInfo,
+ MockUnaryHandler)
+
+ assert.Nil(t, err)
+ assert.NotNil(t, result)
+}
+
+func TestMkServerInterceptorNotReady(t *testing.T) {
+ server := NewGrpcServer("127.0.0.1", 1234, nil, false)
+ assert.NotNil(t, server)
+
+ f := mkServerInterceptor(server)
+ assert.NotNil(t, f)
+
+ req := "SomeRequest"
+ serverInfo := grpc.UnaryServerInfo{Server: nil, FullMethod: "somemethod"}
+
+ probe := &MockReadyProbe{Ready: false}
+ server.AttachReadyProbe(probe)
+
+ result, err := f(context.Background(),
+ req,
+ &serverInfo,
+ MockUnaryHandler)
+
+ assert.NotNil(t, err)
+ assert.Nil(t, result)
+}
diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go
index 007aa74..488bf9f 100755
--- a/pkg/kafka/client.go
+++ b/pkg/kafka/client.go
@@ -55,6 +55,7 @@
DefaultNumberReplicas = 1
DefaultAutoCreateTopic = false
DefaultMetadataMaxRetry = 3
+ DefaultLivenessChannelInterval = time.Second * 30
)
// MsgClient represents the set of APIs a Kafka MsgClient must implement
@@ -66,4 +67,6 @@
Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
Send(msg interface{}, topic *Topic, keys ...string) error
+ SendLiveness() error
+ EnableLivenessChannel(enable bool) chan bool
}
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index c576bc6..3326191 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -764,6 +764,14 @@
return nil
}
+func (kp *InterContainerProxy) EnableLivenessChannel(enable bool) chan bool {
+ return kp.kafkaClient.EnableLivenessChannel(enable)
+}
+
+func (kp *InterContainerProxy) SendLiveness() error {
+ return kp.kafkaClient.SendLiveness()
+}
+
//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
//or an error on failure
func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index 790425e..c3eace7 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -81,3 +81,32 @@
assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
assert.Equal(t, actualResult.DefaultTopic.Name, "Adapter")
}
+
+func TestKafkaProxyEnableLivenessChannel(t *testing.T) {
+ var m *myInterface
+
+ // Note: This doesn't actually start the client
+ client := NewSaramaClient()
+
+ probe, err := NewInterContainerProxy(
+ InterContainerHost("10.20.30.40"),
+ InterContainerPort(1020),
+ DefaultTopic(&Topic{Name: "Adapter"}),
+ RequestHandlerInterface(m),
+ MsgClient(client),
+ )
+
+ assert.Nil(t, err)
+
+ ch := probe.EnableLivenessChannel(true)
+
+ // The channel should have one "true" message on it
+ assert.NotEmpty(t, ch)
+
+ select {
+ case stuff := <-ch:
+ assert.True(t, stuff)
+ default:
+ t.Error("Failed to read from the channel")
+ }
+}
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index fc75026..a251c56 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -74,6 +74,11 @@
topicLockMap map[string]*sync.RWMutex
lockOfTopicLockMap sync.RWMutex
metadataMaxRetry int
+ alive bool
+ liveness chan bool
+ livenessChannelInterval time.Duration
+ lastLivenessTime time.Time
+ started bool
}
type SaramaClientOption func(*SaramaClient)
@@ -186,6 +191,12 @@
}
}
+func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.livenessChannelInterval = opt
+ }
+}
+
func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
client := &SaramaClient{
KafkaHost: DefaultKafkaHost,
@@ -205,6 +216,7 @@
client.numReplicas = DefaultNumberReplicas
client.autoCreateTopic = DefaultAutoCreateTopic
client.metadataMaxRetry = DefaultMetadataMaxRetry
+ client.livenessChannelInterval = DefaultLivenessChannelInterval
for _, option := range opts {
option(client)
@@ -216,6 +228,10 @@
client.topicLockMap = make(map[string]*sync.RWMutex)
client.lockOfTopicLockMap = sync.RWMutex{}
client.lockOfGroupConsumers = sync.RWMutex{}
+
+ // alive until proven otherwise
+ client.alive = true
+
return client
}
@@ -259,12 +275,16 @@
log.Info("kafka-sarama-client-started")
+ sc.started = true
+
return nil
}
func (sc *SaramaClient) Stop() {
log.Info("stopping-sarama-client")
+ sc.started = false
+
//Send a message over the done channel to close all long running routines
sc.doneCh <- 1
@@ -438,6 +458,30 @@
return err
}
+func (sc *SaramaClient) updateLiveness(alive bool) {
+ // Post a consistent stream of liveness data to the channel,
+ // so that in a live state, the core does not timeout and
+ // send a forced liveness message. Production of liveness
+ // events to the channel is rate-limited by livenessChannelInterval.
+ if sc.liveness != nil {
+ if sc.alive != alive {
+ log.Info("update-liveness-channel-because-change")
+ sc.liveness <- alive
+ sc.lastLivenessTime = time.Now()
+ } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
+ log.Info("update-liveness-channel-because-interval")
+ sc.liveness <- alive
+ sc.lastLivenessTime = time.Now()
+ }
+ }
+
+ // Only emit a log message when the state changes
+ if sc.alive != alive {
+ log.Info("set-client-alive", log.Fields{"alive": alive})
+ sc.alive = alive
+ }
+}
+
// send formats and sends the request onto the kafka messaging bus.
func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
@@ -474,8 +518,68 @@
select {
case ok := <-sc.producer.Successes():
log.Debugw("message-sent", log.Fields{"status": ok.Topic})
+ sc.updateLiveness(true)
case notOk := <-sc.producer.Errors():
log.Debugw("error-sending", log.Fields{"status": notOk})
+ if strings.Contains(notOk.Error(), "Failed to produce") {
+ sc.updateLiveness(false)
+ }
+ return notOk
+ }
+ return nil
+}
+
+// Enable the liveness monitor channel. This channel will report
+// a "true" or "false" on every publish, which indicates whether
+// or not the channel is still live. This channel is then picked up
+// by the service (i.e. rw_core / ro_core) to update readiness status
+// and/or take other actions.
+func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
+ log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
+ if enable {
+ if sc.liveness == nil {
+ log.Info("kafka-create-liveness-channel")
+ // At least 1, so we can immediately post to it without blocking
+ // Setting a bigger number (10) allows the monitor to fall behind
+ // without blocking others. The monitor shouldn't really fall
+ // behind...
+ sc.liveness = make(chan bool, 10)
+ // post intial state to the channel
+ sc.liveness <- sc.alive
+ }
+ } else {
+ // TODO: Think about whether we need the ability to turn off
+ // liveness monitoring
+ panic("Turning off liveness reporting is not supported")
+ }
+ return sc.liveness
+}
+
+// send an empty message on the liveness channel to check whether connectivity has
+// been restored.
+func (sc *SaramaClient) SendLiveness() error {
+ if !sc.started {
+ return fmt.Errorf("SendLiveness() called while not started")
+ }
+
+ kafkaMsg := &sarama.ProducerMessage{
+ Topic: "_liveness_test",
+ Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
+ }
+
+ // Send message to kafka
+ sc.producer.Input() <- kafkaMsg
+ // Wait for result
+ // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+ select {
+ case ok := <-sc.producer.Successes():
+ log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
+ sc.updateLiveness(true)
+ case notOk := <-sc.producer.Errors():
+ log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
+ if strings.Contains(notOk.Error(), "Failed to produce") {
+ sc.updateLiveness(false)
+ }
return notOk
}
return nil
@@ -713,7 +817,8 @@
config := scc.NewConfig()
config.ClientID = uuid.New().String()
config.Group.Mode = scc.ConsumerModeMultiplex
- //config.Consumer.Return.Errors = true
+ config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
+ config.Consumer.Return.Errors = true
//config.Group.Return.Notifications = false
//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
@@ -791,16 +896,20 @@
select {
case err, ok := <-consumer.Errors():
if ok {
+ sc.updateLiveness(false)
log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
} else {
+ log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
// channel is closed
break startloop
}
case msg, ok := <-consumer.Messages():
if !ok {
+ log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
// Channel closed
break startloop
}
+ sc.updateLiveness(true)
log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
msgBody := msg.Value
icm := &ic.InterContainerMessage{}
diff --git a/pkg/kafka/sarama_client_test.go b/pkg/kafka/sarama_client_test.go
new file mode 100644
index 0000000..6dd9fd8
--- /dev/null
+++ b/pkg/kafka/sarama_client_test.go
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestSaramaClientEnableLivenessChannel(t *testing.T) {
+ // Note: This doesn't actually start the client
+ client := NewSaramaClient()
+
+ ch := client.EnableLivenessChannel(true)
+
+ // The channel should have one "true" message on it
+ assert.NotEmpty(t, ch)
+
+ select {
+ case stuff := <-ch:
+ assert.True(t, stuff)
+ default:
+ t.Error("Failed to read from the channel")
+ }
+}
diff --git a/pkg/probe/probe.go b/pkg/probe/probe.go
index 9823566..7e6dbf9 100644
--- a/pkg/probe/probe.go
+++ b/pkg/probe/probe.go
@@ -47,6 +47,9 @@
// ServiceStatusFailed service has stopped because of an error
ServiceStatusFailed
+
+ // ServiceStatusNotReady service has started but is unable to accept requests
+ ServiceStatusNotReady
)
const (
@@ -71,6 +74,8 @@
return "Stopped"
case ServiceStatusFailed:
return "Failed"
+ case ServiceStatusNotReady:
+ return "NotReady"
}
}
@@ -137,6 +142,13 @@
if p.status == nil {
p.status = make(map[string]ServiceStatus)
}
+
+ // if status hasn't changed, avoid doing useless work
+ existingStatus, ok := p.status[name]
+ if ok && (existingStatus == status) {
+ return
+ }
+
p.status[name] = status
if p.readyFunc != nil {
p.isReady = p.readyFunc(p.status)
@@ -158,17 +170,41 @@
})
}
+func (p *Probe) GetStatus(name string) ServiceStatus {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ if p.status == nil {
+ p.status = make(map[string]ServiceStatus)
+ }
+
+ currentStatus, ok := p.status[name]
+ if ok {
+ return currentStatus
+ }
+
+ return ServiceStatusUnknown
+}
+
+func GetProbeFromContext(ctx context.Context) *Probe {
+ if ctx != nil {
+ if value := ctx.Value(ProbeContextKey); value != nil {
+ if p, ok := value.(*Probe); ok {
+ return p
+ }
+ }
+ }
+ return nil
+}
+
// UpdateStatusFromContext a convenience function to pull the Probe reference from the
// Context, if it exists, and then calling UpdateStatus on that Probe reference. If Context
// is nil or if a Probe reference is not associated with the ProbeContextKey then nothing
// happens
func UpdateStatusFromContext(ctx context.Context, name string, status ServiceStatus) {
- if ctx != nil {
- if value := ctx.Value(ProbeContextKey); value != nil {
- if p, ok := value.(*Probe); ok {
- p.UpdateStatus(name, status)
- }
- }
+ p := GetProbeFromContext(ctx)
+ if p != nil {
+ p.UpdateStatus(name, status)
}
}
@@ -225,6 +261,10 @@
log.Fatal(s.ListenAndServe())
}
+func (p *Probe) IsReady() bool {
+ return p.isReady
+}
+
// defaultReadyFunc if all services are running then ready, else not
func defaultReadyFunc(services map[string]ServiceStatus) bool {
if len(services) == 0 {
diff --git a/pkg/probe/probe_test.go b/pkg/probe/probe_test.go
index 2a797d6..537bf7d 100644
--- a/pkg/probe/probe_test.go
+++ b/pkg/probe/probe_test.go
@@ -37,6 +37,7 @@
assert.Equal(t, "Running", ServiceStatusRunning.String(), "ServiceStatusRunning")
assert.Equal(t, "Stopped", ServiceStatusStopped.String(), "ServiceStatusStopped")
assert.Equal(t, "Failed", ServiceStatusFailed.String(), "ServiceStatusFailed")
+ assert.Equal(t, "NotReady", ServiceStatusNotReady.String(), "ServiceStatusNotReady")
}
func AlwaysTrue(map[string]ServiceStatus) bool {
@@ -333,6 +334,20 @@
assert.Nil(t, p.healthFunc, "health func not reset to nil")
}
+func TestGetProbeFromContext(t *testing.T) {
+ p := &Probe{}
+ p.RegisterService("one")
+ ctx := context.WithValue(context.Background(), ProbeContextKey, p)
+ pc := GetProbeFromContext(ctx)
+ assert.Equal(t, p, pc, "Probe from context was not identical to original probe")
+}
+
+func TestGetProbeFromContextMssing(t *testing.T) {
+ ctx := context.Background()
+ pc := GetProbeFromContext(ctx)
+ assert.Nil(t, pc, "Context had a non-nil probe when it should have been nil")
+}
+
func TestUpdateStatusFromContext(t *testing.T) {
p := &Probe{}
p.RegisterService("one")
@@ -343,7 +358,6 @@
_, ok := p.status["one"]
assert.True(t, ok, "unable to find registered service")
assert.Equal(t, ServiceStatusRunning, p.status["one"], "status not set correctly from context")
-
}
func TestUpdateStatusFromNilContext(t *testing.T) {
@@ -392,3 +406,38 @@
assert.True(t, ok, "unable to find registered service")
assert.Equal(t, ServiceStatusRunning, p.status["one"], "status not set correctly from context")
}
+
+func TestIsReadyTrue(t *testing.T) {
+ p := (&Probe{}).WithReadyFunc(AlwaysTrue).WithHealthFunc(AlwaysFalse)
+
+ p.RegisterService("SomeService")
+
+ assert.True(t, p.IsReady(), "IsReady should have been true")
+}
+
+func TestIsReadyFalse(t *testing.T) {
+ p := (&Probe{}).WithReadyFunc(AlwaysFalse).WithHealthFunc(AlwaysFalse)
+
+ p.RegisterService("SomeService")
+
+ assert.False(t, p.IsReady(), "IsReady should have been false")
+}
+
+func TestGetStatus(t *testing.T) {
+ p := &Probe{}
+
+ p.RegisterService("one", "two")
+ p.UpdateStatus("one", ServiceStatusRunning)
+
+ ss := p.GetStatus("one")
+ assert.Equal(t, ServiceStatusRunning, ss, "Service status should have been ServiceStatusRunning")
+}
+
+func TestGetStatusMissingService(t *testing.T) {
+ p := &Probe{}
+
+ p.RegisterService("one", "two")
+
+ ss := p.GetStatus("three")
+ assert.Equal(t, ServiceStatusUnknown, ss, "Service status should have been ServiceStatusUnknown")
+}