VOL-2861 replace uint64 kafka interadapter timestamps with timestamp proto

Change-Id: Ib7de52ce7a663b4817e383f03f34e4398d6c980d
diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go
index 9abad93..0919a0c 100755
--- a/pkg/kafka/client.go
+++ b/pkg/kafka/client.go
@@ -66,7 +66,7 @@
 	DeleteTopic(topic *Topic) error
 	Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
 	UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
-	SubscribeForMetadata(func(fromTopic string, timestamp int64))
+	SubscribeForMetadata(func(fromTopic string, timestamp time.Time))
 	Send(msg interface{}, topic *Topic, keys ...string) error
 	SendLiveness() error
 	EnableLivenessChannel(enable bool) chan bool
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index fc2334d..beda537 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -221,7 +221,7 @@
 		Type:      ic.MessageType_DEVICE_DISCOVERED,
 		FromTopic: kp.defaultTopic.Name,
 		ToTopic:   kp.deviceDiscoveryTopic.Name,
-		Timestamp: time.Now().UnixNano(),
+		Timestamp: ptypes.TimestampNow(),
 	}
 	body := &ic.DeviceDiscovered{
 		Id:         deviceId,
@@ -664,7 +664,7 @@
 		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
-		Timestamp: time.Now().UnixNano(),
+		Timestamp: ptypes.TimestampNow(),
 	}
 	responseBody := &ic.InterContainerResponseBody{
 		Success: false,
@@ -694,7 +694,7 @@
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
 		KeyTopic:  request.Header.KeyTopic,
-		Timestamp: time.Now().UnixNano(),
+		Timestamp: ptypes.TimestampNow(),
 	}
 
 	// Go over all returned values
@@ -934,7 +934,7 @@
 		FromTopic: replyTopic.Name,
 		ToTopic:   toTopic.Name,
 		KeyTopic:  key,
-		Timestamp: time.Now().UnixNano(),
+		Timestamp: ptypes.TimestampNow(),
 	}
 	requestBody := &ic.InterContainerRequestBody{
 		Rpc:              rpc,
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index deb72fd..468e546 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -27,6 +27,7 @@
 	scc "github.com/bsm/sarama-cluster"
 	"github.com/eapache/go-resiliency/breaker"
 	"github.com/golang/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
@@ -68,7 +69,7 @@
 	numReplicas                   int
 	autoCreateTopic               bool
 	doneCh                        chan int
-	metadataCallback              func(fromTopic string, timestamp int64)
+	metadataCallback              func(fromTopic string, timestamp time.Time)
 	topicToConsumerChannelMap     map[string]*consumerChannels
 	lockTopicToConsumerChannelMap sync.RWMutex
 	topicLockMap                  map[string]*sync.RWMutex
@@ -461,7 +462,7 @@
 	return err
 }
 
-func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
+func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp time.Time)) {
 	sc.metadataCallback = callback
 }
 
@@ -917,7 +918,8 @@
 	sc.lockTopicToConsumerChannelMap.RUnlock()
 
 	if callback := sc.metadataCallback; callback != nil {
-		callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
+		ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
+		callback(protoMessage.Header.FromTopic, ts)
 	}
 }