[VOL-4290] Voltha go library updates for gRPC migration
Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/pkg/events/events_proxy_test.go b/pkg/events/events_proxy_test.go
index d968713..70f1e30 100644
--- a/pkg/events/events_proxy_test.go
+++ b/pkg/events/events_proxy_test.go
@@ -18,16 +18,16 @@
import (
"context"
- "github.com/golang/protobuf/ptypes"
- "github.com/opencord/voltha-lib-go/v6/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
- mock_kafka "github.com/opencord/voltha-lib-go/v6/pkg/mocks/kafka"
- "github.com/opencord/voltha-protos/v4/go/common"
- "github.com/opencord/voltha-protos/v4/go/voltha"
- "github.com/stretchr/testify/assert"
"strconv"
"testing"
"time"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ mock_kafka "github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "github.com/stretchr/testify/assert"
)
const waitForKafkaEventsTimeout = 20 * time.Second
@@ -70,14 +70,11 @@
for {
select {
case msg := <-kafkaChnl:
- if msg.Body != nil {
- event := voltha.Event{}
- if err := ptypes.UnmarshalAny(msg.Body, &event); err == nil {
- count += 1
- if count == numEvents {
- resp <- "ok"
- break loop
- }
+ if _, ok := msg.(*voltha.Event); ok {
+ count += 1
+ if count == numEvents {
+ resp <- "ok"
+ break loop
}
}
case <-timer.C:
@@ -87,10 +84,10 @@
}
}
-func createAndSendEvent(proxy *EventProxy, ID string) error {
+func createAndSendEvent(proxy *EventProxy, id string) error {
eventMsg := &voltha.RPCEvent{
Rpc: "dummy",
- OperationId: ID,
+ OperationId: id,
ResourceId: "dummy",
Service: "dummy",
StackId: "dummy",
@@ -121,7 +118,11 @@
// Init Event Proxy
ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
- go ep.Start()
+ go func() {
+ if err := ep.Start(); err != nil {
+ logger.Fatalw(context.Background(), "failure-starting-event-proxy", log.Fields{"error": err})
+ }
+ }()
time.Sleep(1 * time.Millisecond)
for i := 0; i < numEvents; i++ {
go func(ID int) {
@@ -147,7 +148,11 @@
resp := make(chan string)
// Init Event Proxy
ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
- go ep.Start()
+ go func() {
+ if err := ep.Start(); err != nil {
+ logger.Fatalw(context.Background(), "failure-starting-event-proxy", log.Fields{"error": err})
+ }
+ }()
time.Sleep(1 * time.Millisecond)
for i := 0; i < numEvents; i++ {
go func(ID int) {
@@ -171,7 +176,11 @@
resp := make(chan string)
// Init Event Proxy
ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
- go ep.Start()
+ go func() {
+ if err := ep.Start(); err != nil {
+ logger.Fatalw(context.Background(), "failure-starting-event-proxy", log.Fields{"error": err})
+ }
+ }()
time.Sleep(1 * time.Millisecond)
go ep.Stop()
go waitForEventProxyToStop(ep, resp)