[VOL-4290] Voltha go library updates for gRPC migration
Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/pkg/events/common.go b/pkg/events/common.go
index 9ef5bfe..0f0468e 100644
--- a/pkg/events/common.go
+++ b/pkg/events/common.go
@@ -16,7 +16,7 @@
package events
import (
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/events/eventif/events_proxy_if.go b/pkg/events/eventif/events_proxy_if.go
index 35f821f..e4ebc36 100644
--- a/pkg/events/eventif/events_proxy_if.go
+++ b/pkg/events/eventif/events_proxy_if.go
@@ -18,7 +18,8 @@
import (
"context"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
// EventProxy interface for eventproxy
@@ -31,6 +32,8 @@
subCategory *EventSubCategory, raisedTs int64) error
EnableLivenessChannel(ctx context.Context, enable bool) chan bool
SendLiveness(ctx context.Context) error
+ Start() error
+ Stop()
}
const (
diff --git a/pkg/events/events_proxy.go b/pkg/events/events_proxy.go
index 89d58a0..e4493f9 100644
--- a/pkg/events/events_proxy.go
+++ b/pkg/events/events_proxy.go
@@ -26,11 +26,11 @@
"sync"
"time"
- "github.com/golang/protobuf/ptypes"
- "github.com/opencord/voltha-lib-go/v6/pkg/events/eventif"
- "github.com/opencord/voltha-lib-go/v6/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
+ "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "google.golang.org/protobuf/types/known/timestamppb"
)
// TODO: Make configurable through helm chart
@@ -97,17 +97,8 @@
header.TypeVersion = eventif.EventTypeVersion
// raisedTs is in seconds
- timestamp, err := ptypes.TimestampProto(time.Unix(raisedTs, 0))
- if err != nil {
- return nil, err
- }
- header.RaisedTs = timestamp
-
- timestamp, err = ptypes.TimestampProto(time.Now())
- if err != nil {
- return nil, err
- }
- header.ReportedTs = timestamp
+ header.RaisedTs = timestamppb.New(time.Unix(raisedTs, 0))
+ header.ReportedTs = timestamppb.New(time.Now())
return &header, nil
}
@@ -201,7 +192,7 @@
}
// Start the event proxy
-func (ep *EventProxy) Start() {
+func (ep *EventProxy) Start() error {
eq := ep.eventQueue
go eq.start(ep.queueCtx)
logger.Debugw(context.Background(), "event-proxy-starting...", log.Fields{"events-threashold": EVENT_THRESHOLD})
@@ -235,10 +226,13 @@
"reported-ts": event.Header.ReportedTs, "event-type": event.EventType})
}
}
+ return nil
}
func (ep *EventProxy) Stop() {
- ep.eventQueue.stop()
+ if ep.eventQueue != nil {
+ ep.eventQueue.stop()
+ }
}
type EventQueue struct {
diff --git a/pkg/events/events_proxy_test.go b/pkg/events/events_proxy_test.go
index d968713..70f1e30 100644
--- a/pkg/events/events_proxy_test.go
+++ b/pkg/events/events_proxy_test.go
@@ -18,16 +18,16 @@
import (
"context"
- "github.com/golang/protobuf/ptypes"
- "github.com/opencord/voltha-lib-go/v6/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
- mock_kafka "github.com/opencord/voltha-lib-go/v6/pkg/mocks/kafka"
- "github.com/opencord/voltha-protos/v4/go/common"
- "github.com/opencord/voltha-protos/v4/go/voltha"
- "github.com/stretchr/testify/assert"
"strconv"
"testing"
"time"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ mock_kafka "github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "github.com/stretchr/testify/assert"
)
const waitForKafkaEventsTimeout = 20 * time.Second
@@ -70,14 +70,11 @@
for {
select {
case msg := <-kafkaChnl:
- if msg.Body != nil {
- event := voltha.Event{}
- if err := ptypes.UnmarshalAny(msg.Body, &event); err == nil {
- count += 1
- if count == numEvents {
- resp <- "ok"
- break loop
- }
+ if _, ok := msg.(*voltha.Event); ok {
+ count += 1
+ if count == numEvents {
+ resp <- "ok"
+ break loop
}
}
case <-timer.C:
@@ -87,10 +84,10 @@
}
}
-func createAndSendEvent(proxy *EventProxy, ID string) error {
+func createAndSendEvent(proxy *EventProxy, id string) error {
eventMsg := &voltha.RPCEvent{
Rpc: "dummy",
- OperationId: ID,
+ OperationId: id,
ResourceId: "dummy",
Service: "dummy",
StackId: "dummy",
@@ -121,7 +118,11 @@
// Init Event Proxy
ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
- go ep.Start()
+ go func() {
+ if err := ep.Start(); err != nil {
+ logger.Fatalw(context.Background(), "failure-starting-event-proxy", log.Fields{"error": err})
+ }
+ }()
time.Sleep(1 * time.Millisecond)
for i := 0; i < numEvents; i++ {
go func(ID int) {
@@ -147,7 +148,11 @@
resp := make(chan string)
// Init Event Proxy
ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
- go ep.Start()
+ go func() {
+ if err := ep.Start(); err != nil {
+ logger.Fatalw(context.Background(), "failure-starting-event-proxy", log.Fields{"error": err})
+ }
+ }()
time.Sleep(1 * time.Millisecond)
for i := 0; i < numEvents; i++ {
go func(ID int) {
@@ -171,7 +176,11 @@
resp := make(chan string)
// Init Event Proxy
ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
- go ep.Start()
+ go func() {
+ if err := ep.Start(); err != nil {
+ logger.Fatalw(context.Background(), "failure-starting-event-proxy", log.Fields{"error": err})
+ }
+ }()
time.Sleep(1 * time.Millisecond)
go ep.Stop()
go waitForEventProxyToStop(ep, resp)
diff --git a/pkg/events/utils.go b/pkg/events/utils.go
index fe3a017..4598161 100644
--- a/pkg/events/utils.go
+++ b/pkg/events/utils.go
@@ -19,7 +19,8 @@
"fmt"
"strconv"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
type ContextType string
@@ -64,8 +65,8 @@
//CreateDeviceStateChangeEvent forms and returns a new DeviceStateChange Event
func CreateDeviceStateChangeEvent(serialNumber string, deviceID string, parentID string,
- prevOperStatus voltha.OperStatus_Types, prevConnStatus voltha.ConnectStatus_Types, prevAdminStatus voltha.AdminState_Types,
- operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types, adminStatus voltha.AdminState_Types,
+ prevOperStatus common.OperStatus_Types, prevConnStatus common.ConnectStatus_Types, prevAdminStatus common.AdminState_Types,
+ operStatus common.OperStatus_Types, connStatus common.ConnectStatus_Types, adminStatus common.AdminState_Types,
parentPort uint32, isRoot bool) *voltha.DeviceEvent {
context := make(map[string]string)