The adapter last_communication is now updated when a message is received from an adapter.
For VOL-2207. Please consider these related patchsets together:
https://gerrit.opencord.org/#/q/VOL-2207
Change-Id: I52702c6e15292a9a443b11ee7f63dabf7b43e65a
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 02d0b7a..45a0ff5 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -19,8 +19,11 @@
import (
"context"
"fmt"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"reflect"
"sync"
+ "time"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
@@ -83,6 +86,22 @@
aa.deviceTypes[deviceType.Id] = deviceType
}
+// updateCommunicationTime updates the message to the specified time.
+// No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
+func (aa *AdapterAgent) updateCommunicationTime(new time.Time) {
+ // only update if new time is not in the future, and either the old time is invalid or new time > old time
+ if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
+ timestamp, err := ptypes.TimestampProto(new)
+ if err != nil {
+ return // if the new time cannot be encoded, just ignore it
+ }
+
+ aa.lock.Lock()
+ defer aa.lock.Unlock()
+ aa.adapter.LastCommunication = timestamp
+ }
+}
+
// AdapterManager represents adapter manager attributes
type AdapterManager struct {
adapterAgents map[string]*AdapterAgent
@@ -97,17 +116,17 @@
lockdDeviceTypeToAdapterMap sync.RWMutex
}
-func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, deviceMgr *DeviceManager) *AdapterManager {
- var adapterMgr AdapterManager
- adapterMgr.exitChannel = make(chan int, 1)
- adapterMgr.coreInstanceID = coreInstanceID
- adapterMgr.clusterDataProxy = cdProxy
- adapterMgr.adapterAgents = make(map[string]*AdapterAgent)
- adapterMgr.deviceTypeToAdapterMap = make(map[string]string)
- adapterMgr.lockAdaptersMap = sync.RWMutex{}
- adapterMgr.lockdDeviceTypeToAdapterMap = sync.RWMutex{}
- adapterMgr.deviceMgr = deviceMgr
- return &adapterMgr
+func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client, deviceMgr *DeviceManager) *AdapterManager {
+ aMgr := &AdapterManager{
+ exitChannel: make(chan int, 1),
+ coreInstanceID: coreInstanceID,
+ clusterDataProxy: cdProxy,
+ adapterAgents: make(map[string]*AdapterAgent),
+ deviceTypeToAdapterMap: make(map[string]string),
+ deviceMgr: deviceMgr,
+ }
+ kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
+ return aMgr
}
func (aMgr *AdapterManager) start(ctx context.Context) error {
@@ -189,6 +208,16 @@
return aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SentinelDevicetypeID, Adapter: SentinelAdapterID}}}, true)
}
+func (aMgr *AdapterManager) updateLastAdapterCommunication(adapterID string, timestamp int64) {
+ aMgr.lockAdaptersMap.RLock()
+ adapterAgent, have := aMgr.adapterAgents[adapterID]
+ aMgr.lockAdaptersMap.RUnlock()
+
+ if have {
+ adapterAgent.updateCommunicationTime(time.Unix(timestamp/1000, timestamp%1000*1000))
+ }
+}
+
//updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory(ctx context.Context, adapter *voltha.Adapter) {
aMgr.lockAdaptersMap.Lock()