The adapter last_communication is now updated when a message is received from an adapter.

For VOL-2207.  Please consider these related patchsets together:
https://gerrit.opencord.org/#/q/VOL-2207

Change-Id: I52702c6e15292a9a443b11ee7f63dabf7b43e65a
diff --git a/go.mod b/go.mod
index c63b854..c86670d 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,7 @@
 	github.com/golang/protobuf v1.3.2
 	github.com/google/uuid v1.1.1
 	github.com/gyuho/goraph v0.0.0-20160328020532-d460590d53a9
-	github.com/opencord/voltha-lib-go/v3 v3.0.6
+	github.com/opencord/voltha-lib-go/v3 v3.0.7
 	github.com/opencord/voltha-protos/v3 v3.2.2
 	github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
 	github.com/stretchr/testify v1.4.0
diff --git a/go.sum b/go.sum
index be10954..18a1937 100644
--- a/go.sum
+++ b/go.sum
@@ -192,8 +192,8 @@
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
 github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v3 v3.0.6 h1:J58Pquledy94urZ5Wp+COXkS9VZTIi6TbaHdVNVCT/Y=
-github.com/opencord/voltha-lib-go/v3 v3.0.6/go.mod h1:l/AgBlYqXEiHLHS6NR654Q7m5BfX5YVnrpykf7kOmGw=
+github.com/opencord/voltha-lib-go/v3 v3.0.7 h1:o0Oujm9rkNWrTCkmTiJyHRxBGzCYhjfElN2IOzif8v4=
+github.com/opencord/voltha-lib-go/v3 v3.0.7/go.mod h1:l/AgBlYqXEiHLHS6NR654Q7m5BfX5YVnrpykf7kOmGw=
 github.com/opencord/voltha-protos/v3 v3.2.1 h1:5CAxtWzHqDMNItBRklDkXN5YwE9b6vuCXr5UKTAuJBg=
 github.com/opencord/voltha-protos/v3 v3.2.1/go.mod h1:RIGHt7b80BHpHh3ceodknh0DxUjUHCWSbYbZqRx7Og0=
 github.com/opencord/voltha-protos/v3 v3.2.2 h1:Fdg2T6xtUjVZPBWV636/mMH6lYggalQ2e7uFPoOhZDc=
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 02d0b7a..45a0ff5 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -19,8 +19,11 @@
 import (
 	"context"
 	"fmt"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"reflect"
 	"sync"
+	"time"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
@@ -83,6 +86,22 @@
 	aa.deviceTypes[deviceType.Id] = deviceType
 }
 
+// updateCommunicationTime updates the message to the specified time.
+// No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
+func (aa *AdapterAgent) updateCommunicationTime(new time.Time) {
+	// only update if new time is not in the future, and either the old time is invalid or new time > old time
+	if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
+		timestamp, err := ptypes.TimestampProto(new)
+		if err != nil {
+			return // if the new time cannot be encoded, just ignore it
+		}
+
+		aa.lock.Lock()
+		defer aa.lock.Unlock()
+		aa.adapter.LastCommunication = timestamp
+	}
+}
+
 // AdapterManager represents adapter manager attributes
 type AdapterManager struct {
 	adapterAgents               map[string]*AdapterAgent
@@ -97,17 +116,17 @@
 	lockdDeviceTypeToAdapterMap sync.RWMutex
 }
 
-func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, deviceMgr *DeviceManager) *AdapterManager {
-	var adapterMgr AdapterManager
-	adapterMgr.exitChannel = make(chan int, 1)
-	adapterMgr.coreInstanceID = coreInstanceID
-	adapterMgr.clusterDataProxy = cdProxy
-	adapterMgr.adapterAgents = make(map[string]*AdapterAgent)
-	adapterMgr.deviceTypeToAdapterMap = make(map[string]string)
-	adapterMgr.lockAdaptersMap = sync.RWMutex{}
-	adapterMgr.lockdDeviceTypeToAdapterMap = sync.RWMutex{}
-	adapterMgr.deviceMgr = deviceMgr
-	return &adapterMgr
+func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client, deviceMgr *DeviceManager) *AdapterManager {
+	aMgr := &AdapterManager{
+		exitChannel:            make(chan int, 1),
+		coreInstanceID:         coreInstanceID,
+		clusterDataProxy:       cdProxy,
+		adapterAgents:          make(map[string]*AdapterAgent),
+		deviceTypeToAdapterMap: make(map[string]string),
+		deviceMgr:              deviceMgr,
+	}
+	kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
+	return aMgr
 }
 
 func (aMgr *AdapterManager) start(ctx context.Context) error {
@@ -189,6 +208,16 @@
 	return aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SentinelDevicetypeID, Adapter: SentinelAdapterID}}}, true)
 }
 
+func (aMgr *AdapterManager) updateLastAdapterCommunication(adapterID string, timestamp int64) {
+	aMgr.lockAdaptersMap.RLock()
+	adapterAgent, have := aMgr.adapterAgents[adapterID]
+	aMgr.lockAdaptersMap.RUnlock()
+
+	if have {
+		adapterAgent.updateCommunicationTime(time.Unix(timestamp/1000, timestamp%1000*1000))
+	}
+}
+
 //updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
 func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory(ctx context.Context, adapter *voltha.Adapter) {
 	aMgr.lockAdaptersMap.Lock()
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 23f4bb2..258850b 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -137,7 +137,7 @@
 
 	log.Debugw("values", log.Fields{"kmp": core.kmp})
 	core.deviceMgr = newDeviceManager(core)
-	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceID, core.deviceMgr)
+	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceID, core.kafkaClient, core.deviceMgr)
 	core.deviceMgr.adapterMgr = core.adapterMgr
 	core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy, core.config.DefaultCoreTimeout)
 
diff --git a/rw_core/main.go b/rw_core/main.go
index 9b55a87..7741d1f 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -101,33 +101,25 @@
 	return &rwCore
 }
 
-func (rw *rwCore) setKVClient() error {
-	addr := rw.config.KVStoreHost + ":" + strconv.Itoa(rw.config.KVStorePort)
-	client, err := newKVClient(rw.config.KVStoreType, addr, rw.config.KVStoreTimeout)
-	if err != nil {
-		rw.kvClient = nil
-		log.Error(err)
-		return err
-	}
-	rw.kvClient = client
-	return nil
-}
-
 func (rw *rwCore) start(ctx context.Context, instanceID string) {
 	log.Info("Starting RW Core components")
 
 	// Setup KV Client
 	log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
-	err := rw.setKVClient()
-	if err == nil {
-		// Setup KV transaction context
-		txnPrefix := rw.config.KVStoreDataPrefix + "/transactions/"
-		if err = c.SetTransactionContext(instanceID,
-			txnPrefix,
-			rw.kvClient,
-			rw.config.KVStoreTimeout); err != nil {
-			log.Fatal("creating-transaction-context-failed")
-		}
+	var err error
+	if rw.kvClient, err = newKVClient(
+		rw.config.KVStoreType,
+		rw.config.KVStoreHost+":"+strconv.Itoa(rw.config.KVStorePort),
+		rw.config.KVStoreTimeout); err != nil {
+		log.Fatal(err)
+	}
+
+	// Setup KV transaction context
+	if err := c.SetTransactionContext(instanceID,
+		rw.config.KVStoreDataPrefix+"/transactions/",
+		rw.kvClient,
+		rw.config.KVStoreTimeout); err != nil {
+		log.Fatal("creating-transaction-context-failed")
 	}
 
 	// Setup Kafka Client
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go
index 62af5db..38f147e 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go
@@ -112,7 +112,7 @@
 }
 
 func (kc *KafkaClient) SubscribeForMetadata(_ func(fromTopic string, timestamp int64)) {
-	panic("unimplemented")
+	logger.Debug("SubscribeForMetadata - unimplemented")
 }
 
 func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
@@ -137,9 +137,11 @@
 }
 
 func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
+	logger.Debug("EnableLivenessChannel - unimplemented")
 	return nil
 }
 
 func (kc *KafkaClient) EnableHealthinessChannel(enable bool) chan bool {
+	logger.Debug("EnableHealthinessChannel - unimplemented")
 	return nil
 }
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 9beacac..3649e27 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -98,7 +98,7 @@
 github.com/modern-go/concurrent
 # github.com/modern-go/reflect2 v1.0.1
 github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v3 v3.0.6
+# github.com/opencord/voltha-lib-go/v3 v3.0.7
 github.com/opencord/voltha-lib-go/v3/pkg/log
 github.com/opencord/voltha-lib-go/v3/pkg/db
 github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore