The adapter last_communication is now updated when a message is received from an adapter.

For VOL-2207.  Please consider these related patchsets together:
https://gerrit.opencord.org/#/q/VOL-2207

Change-Id: I52702c6e15292a9a443b11ee7f63dabf7b43e65a
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 02d0b7a..45a0ff5 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -19,8 +19,11 @@
 import (
 	"context"
 	"fmt"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"reflect"
 	"sync"
+	"time"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
@@ -83,6 +86,22 @@
 	aa.deviceTypes[deviceType.Id] = deviceType
 }
 
+// updateCommunicationTime updates the message to the specified time.
+// No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
+func (aa *AdapterAgent) updateCommunicationTime(new time.Time) {
+	// only update if new time is not in the future, and either the old time is invalid or new time > old time
+	if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
+		timestamp, err := ptypes.TimestampProto(new)
+		if err != nil {
+			return // if the new time cannot be encoded, just ignore it
+		}
+
+		aa.lock.Lock()
+		defer aa.lock.Unlock()
+		aa.adapter.LastCommunication = timestamp
+	}
+}
+
 // AdapterManager represents adapter manager attributes
 type AdapterManager struct {
 	adapterAgents               map[string]*AdapterAgent
@@ -97,17 +116,17 @@
 	lockdDeviceTypeToAdapterMap sync.RWMutex
 }
 
-func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, deviceMgr *DeviceManager) *AdapterManager {
-	var adapterMgr AdapterManager
-	adapterMgr.exitChannel = make(chan int, 1)
-	adapterMgr.coreInstanceID = coreInstanceID
-	adapterMgr.clusterDataProxy = cdProxy
-	adapterMgr.adapterAgents = make(map[string]*AdapterAgent)
-	adapterMgr.deviceTypeToAdapterMap = make(map[string]string)
-	adapterMgr.lockAdaptersMap = sync.RWMutex{}
-	adapterMgr.lockdDeviceTypeToAdapterMap = sync.RWMutex{}
-	adapterMgr.deviceMgr = deviceMgr
-	return &adapterMgr
+func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client, deviceMgr *DeviceManager) *AdapterManager {
+	aMgr := &AdapterManager{
+		exitChannel:            make(chan int, 1),
+		coreInstanceID:         coreInstanceID,
+		clusterDataProxy:       cdProxy,
+		adapterAgents:          make(map[string]*AdapterAgent),
+		deviceTypeToAdapterMap: make(map[string]string),
+		deviceMgr:              deviceMgr,
+	}
+	kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
+	return aMgr
 }
 
 func (aMgr *AdapterManager) start(ctx context.Context) error {
@@ -189,6 +208,16 @@
 	return aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SentinelDevicetypeID, Adapter: SentinelAdapterID}}}, true)
 }
 
+func (aMgr *AdapterManager) updateLastAdapterCommunication(adapterID string, timestamp int64) {
+	aMgr.lockAdaptersMap.RLock()
+	adapterAgent, have := aMgr.adapterAgents[adapterID]
+	aMgr.lockAdaptersMap.RUnlock()
+
+	if have {
+		adapterAgent.updateCommunicationTime(time.Unix(timestamp/1000, timestamp%1000*1000))
+	}
+}
+
 //updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
 func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory(ctx context.Context, adapter *voltha.Adapter) {
 	aMgr.lockAdaptersMap.Lock()