The adapter last_communication is now updated when a message is received from an adapter.
For VOL-2207. Please consider these related patchsets together:
https://gerrit.opencord.org/#/q/VOL-2207
Change-Id: I52702c6e15292a9a443b11ee7f63dabf7b43e65a
diff --git a/go.mod b/go.mod
index c63b854..c86670d 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,7 @@
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/gyuho/goraph v0.0.0-20160328020532-d460590d53a9
- github.com/opencord/voltha-lib-go/v3 v3.0.6
+ github.com/opencord/voltha-lib-go/v3 v3.0.7
github.com/opencord/voltha-protos/v3 v3.2.2
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/stretchr/testify v1.4.0
diff --git a/go.sum b/go.sum
index be10954..18a1937 100644
--- a/go.sum
+++ b/go.sum
@@ -192,8 +192,8 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v3 v3.0.6 h1:J58Pquledy94urZ5Wp+COXkS9VZTIi6TbaHdVNVCT/Y=
-github.com/opencord/voltha-lib-go/v3 v3.0.6/go.mod h1:l/AgBlYqXEiHLHS6NR654Q7m5BfX5YVnrpykf7kOmGw=
+github.com/opencord/voltha-lib-go/v3 v3.0.7 h1:o0Oujm9rkNWrTCkmTiJyHRxBGzCYhjfElN2IOzif8v4=
+github.com/opencord/voltha-lib-go/v3 v3.0.7/go.mod h1:l/AgBlYqXEiHLHS6NR654Q7m5BfX5YVnrpykf7kOmGw=
github.com/opencord/voltha-protos/v3 v3.2.1 h1:5CAxtWzHqDMNItBRklDkXN5YwE9b6vuCXr5UKTAuJBg=
github.com/opencord/voltha-protos/v3 v3.2.1/go.mod h1:RIGHt7b80BHpHh3ceodknh0DxUjUHCWSbYbZqRx7Og0=
github.com/opencord/voltha-protos/v3 v3.2.2 h1:Fdg2T6xtUjVZPBWV636/mMH6lYggalQ2e7uFPoOhZDc=
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 02d0b7a..45a0ff5 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -19,8 +19,11 @@
import (
"context"
"fmt"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"reflect"
"sync"
+ "time"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
@@ -83,6 +86,22 @@
aa.deviceTypes[deviceType.Id] = deviceType
}
+// updateCommunicationTime updates the message to the specified time.
+// No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
+func (aa *AdapterAgent) updateCommunicationTime(new time.Time) {
+ // only update if new time is not in the future, and either the old time is invalid or new time > old time
+ if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
+ timestamp, err := ptypes.TimestampProto(new)
+ if err != nil {
+ return // if the new time cannot be encoded, just ignore it
+ }
+
+ aa.lock.Lock()
+ defer aa.lock.Unlock()
+ aa.adapter.LastCommunication = timestamp
+ }
+}
+
// AdapterManager represents adapter manager attributes
type AdapterManager struct {
adapterAgents map[string]*AdapterAgent
@@ -97,17 +116,17 @@
lockdDeviceTypeToAdapterMap sync.RWMutex
}
-func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, deviceMgr *DeviceManager) *AdapterManager {
- var adapterMgr AdapterManager
- adapterMgr.exitChannel = make(chan int, 1)
- adapterMgr.coreInstanceID = coreInstanceID
- adapterMgr.clusterDataProxy = cdProxy
- adapterMgr.adapterAgents = make(map[string]*AdapterAgent)
- adapterMgr.deviceTypeToAdapterMap = make(map[string]string)
- adapterMgr.lockAdaptersMap = sync.RWMutex{}
- adapterMgr.lockdDeviceTypeToAdapterMap = sync.RWMutex{}
- adapterMgr.deviceMgr = deviceMgr
- return &adapterMgr
+func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client, deviceMgr *DeviceManager) *AdapterManager {
+ aMgr := &AdapterManager{
+ exitChannel: make(chan int, 1),
+ coreInstanceID: coreInstanceID,
+ clusterDataProxy: cdProxy,
+ adapterAgents: make(map[string]*AdapterAgent),
+ deviceTypeToAdapterMap: make(map[string]string),
+ deviceMgr: deviceMgr,
+ }
+ kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
+ return aMgr
}
func (aMgr *AdapterManager) start(ctx context.Context) error {
@@ -189,6 +208,16 @@
return aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SentinelDevicetypeID, Adapter: SentinelAdapterID}}}, true)
}
+func (aMgr *AdapterManager) updateLastAdapterCommunication(adapterID string, timestamp int64) {
+ aMgr.lockAdaptersMap.RLock()
+ adapterAgent, have := aMgr.adapterAgents[adapterID]
+ aMgr.lockAdaptersMap.RUnlock()
+
+ if have {
+ adapterAgent.updateCommunicationTime(time.Unix(timestamp/1000, timestamp%1000*1000))
+ }
+}
+
//updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory(ctx context.Context, adapter *voltha.Adapter) {
aMgr.lockAdaptersMap.Lock()
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 23f4bb2..258850b 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -137,7 +137,7 @@
log.Debugw("values", log.Fields{"kmp": core.kmp})
core.deviceMgr = newDeviceManager(core)
- core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceID, core.deviceMgr)
+ core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceID, core.kafkaClient, core.deviceMgr)
core.deviceMgr.adapterMgr = core.adapterMgr
core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy, core.config.DefaultCoreTimeout)
diff --git a/rw_core/main.go b/rw_core/main.go
index 9b55a87..7741d1f 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -101,33 +101,25 @@
return &rwCore
}
-func (rw *rwCore) setKVClient() error {
- addr := rw.config.KVStoreHost + ":" + strconv.Itoa(rw.config.KVStorePort)
- client, err := newKVClient(rw.config.KVStoreType, addr, rw.config.KVStoreTimeout)
- if err != nil {
- rw.kvClient = nil
- log.Error(err)
- return err
- }
- rw.kvClient = client
- return nil
-}
-
func (rw *rwCore) start(ctx context.Context, instanceID string) {
log.Info("Starting RW Core components")
// Setup KV Client
log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
- err := rw.setKVClient()
- if err == nil {
- // Setup KV transaction context
- txnPrefix := rw.config.KVStoreDataPrefix + "/transactions/"
- if err = c.SetTransactionContext(instanceID,
- txnPrefix,
- rw.kvClient,
- rw.config.KVStoreTimeout); err != nil {
- log.Fatal("creating-transaction-context-failed")
- }
+ var err error
+ if rw.kvClient, err = newKVClient(
+ rw.config.KVStoreType,
+ rw.config.KVStoreHost+":"+strconv.Itoa(rw.config.KVStorePort),
+ rw.config.KVStoreTimeout); err != nil {
+ log.Fatal(err)
+ }
+
+ // Setup KV transaction context
+ if err := c.SetTransactionContext(instanceID,
+ rw.config.KVStoreDataPrefix+"/transactions/",
+ rw.kvClient,
+ rw.config.KVStoreTimeout); err != nil {
+ log.Fatal("creating-transaction-context-failed")
}
// Setup Kafka Client
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go
index 62af5db..38f147e 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka_client.go
@@ -112,7 +112,7 @@
}
func (kc *KafkaClient) SubscribeForMetadata(_ func(fromTopic string, timestamp int64)) {
- panic("unimplemented")
+ logger.Debug("SubscribeForMetadata - unimplemented")
}
func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
@@ -137,9 +137,11 @@
}
func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
+ logger.Debug("EnableLivenessChannel - unimplemented")
return nil
}
func (kc *KafkaClient) EnableHealthinessChannel(enable bool) chan bool {
+ logger.Debug("EnableHealthinessChannel - unimplemented")
return nil
}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 9beacac..3649e27 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -98,7 +98,7 @@
github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.1
github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v3 v3.0.6
+# github.com/opencord/voltha-lib-go/v3 v3.0.7
github.com/opencord/voltha-lib-go/v3/pkg/log
github.com/opencord/voltha-lib-go/v3/pkg/db
github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore