VOL-2861 replace uint64 kafka interadapter timestamps with timestamp proto
Change-Id: Ib7de52ce7a663b4817e383f03f34e4398d6c980d
diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go
index 9abad93..0919a0c 100755
--- a/pkg/kafka/client.go
+++ b/pkg/kafka/client.go
@@ -66,7 +66,7 @@
DeleteTopic(topic *Topic) error
Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
- SubscribeForMetadata(func(fromTopic string, timestamp int64))
+ SubscribeForMetadata(func(fromTopic string, timestamp time.Time))
Send(msg interface{}, topic *Topic, keys ...string) error
SendLiveness() error
EnableLivenessChannel(enable bool) chan bool
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index fc2334d..beda537 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -221,7 +221,7 @@
Type: ic.MessageType_DEVICE_DISCOVERED,
FromTopic: kp.defaultTopic.Name,
ToTopic: kp.deviceDiscoveryTopic.Name,
- Timestamp: time.Now().UnixNano(),
+ Timestamp: ptypes.TimestampNow(),
}
body := &ic.DeviceDiscovered{
Id: deviceId,
@@ -664,7 +664,7 @@
Type: ic.MessageType_RESPONSE,
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
- Timestamp: time.Now().UnixNano(),
+ Timestamp: ptypes.TimestampNow(),
}
responseBody := &ic.InterContainerResponseBody{
Success: false,
@@ -694,7 +694,7 @@
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
KeyTopic: request.Header.KeyTopic,
- Timestamp: time.Now().UnixNano(),
+ Timestamp: ptypes.TimestampNow(),
}
// Go over all returned values
@@ -934,7 +934,7 @@
FromTopic: replyTopic.Name,
ToTopic: toTopic.Name,
KeyTopic: key,
- Timestamp: time.Now().UnixNano(),
+ Timestamp: ptypes.TimestampNow(),
}
requestBody := &ic.InterContainerRequestBody{
Rpc: rpc,
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index deb72fd..468e546 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -27,6 +27,7 @@
scc "github.com/bsm/sarama-cluster"
"github.com/eapache/go-resiliency/breaker"
"github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
"github.com/google/uuid"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
@@ -68,7 +69,7 @@
numReplicas int
autoCreateTopic bool
doneCh chan int
- metadataCallback func(fromTopic string, timestamp int64)
+ metadataCallback func(fromTopic string, timestamp time.Time)
topicToConsumerChannelMap map[string]*consumerChannels
lockTopicToConsumerChannelMap sync.RWMutex
topicLockMap map[string]*sync.RWMutex
@@ -461,7 +462,7 @@
return err
}
-func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
+func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp time.Time)) {
sc.metadataCallback = callback
}
@@ -917,7 +918,8 @@
sc.lockTopicToConsumerChannelMap.RUnlock()
if callback := sc.metadataCallback; callback != nil {
- callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
+ ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
+ callback(protoMessage.Header.FromTopic, ts)
}
}