VOL-2999 - Reworked how Proxies are created & used.
Added DB Paths to separate location specification logic from entry access logic.
Also merged Update() and AddWithID() and renamed to Set().
Change-Id: I9ed5eafd63c180dddc5845a166554f89bda12325
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index b552d8f..341624f 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -37,19 +37,21 @@
type Manager struct {
adapterAgents map[string]*agent
deviceTypes map[string]*voltha.DeviceType
- clusterDataProxy *model.Proxy
+ adapterProxy *model.Proxy
+ deviceTypeProxy *model.Proxy
onAdapterRestart adapterRestartedHandler
coreInstanceID string
lockAdaptersMap sync.RWMutex
lockdDeviceTypeToAdapterMap sync.RWMutex
}
-func NewAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client) *Manager {
+func NewAdapterManager(dbPath *model.Path, coreInstanceID string, kafkaClient kafka.Client) *Manager {
aMgr := &Manager{
- coreInstanceID: coreInstanceID,
- clusterDataProxy: cdProxy,
- deviceTypes: make(map[string]*voltha.DeviceType),
- adapterAgents: make(map[string]*agent),
+ coreInstanceID: coreInstanceID,
+ adapterProxy: dbPath.Proxy("adapters"),
+ deviceTypeProxy: dbPath.Proxy("device_types"),
+ deviceTypes: make(map[string]*voltha.DeviceType),
+ adapterAgents: make(map[string]*agent),
}
kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
return aMgr
@@ -82,7 +84,7 @@
func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory() error {
// Load the adapters
var adapters []*voltha.Adapter
- if err := aMgr.clusterDataProxy.List(context.Background(), "adapters", &adapters); err != nil {
+ if err := aMgr.adapterProxy.List(context.Background(), &adapters); err != nil {
logger.Errorw("Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
return err
}
@@ -98,7 +100,7 @@
// Load the device types
var deviceTypes []*voltha.DeviceType
- if err := aMgr.clusterDataProxy.List(context.Background(), "device_types", &deviceTypes); err != nil {
+ if err := aMgr.deviceTypeProxy.List(context.Background(), &deviceTypes); err != nil {
logger.Errorw("Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
return err
}
@@ -134,11 +136,11 @@
if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
if saveToDb {
// Save the adapter to the KV store - first check if it already exist
- if have, err := aMgr.clusterDataProxy.Get(context.Background(), "adapters/"+adapter.Id, &voltha.Adapter{}); err != nil {
+ if have, err := aMgr.adapterProxy.Get(context.Background(), adapter.Id, &voltha.Adapter{}); err != nil {
logger.Errorw("failed-to-get-adapters-from-cluster-proxy", log.Fields{"error": err})
return err
} else if !have {
- if err := aMgr.clusterDataProxy.AddWithID(context.Background(), "adapters", adapter.Id, adapter); err != nil {
+ if err := aMgr.adapterProxy.Set(context.Background(), adapter.Id, adapter); err != nil {
logger.Errorw("failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
return err
@@ -176,13 +178,13 @@
if saveToDb {
// Save the device types to the KV store
for _, deviceType := range deviceTypes.Items {
- if have, err := aMgr.clusterDataProxy.Get(context.Background(), "device_types/"+deviceType.Id, &voltha.DeviceType{}); err != nil {
+ if have, err := aMgr.deviceTypeProxy.Get(context.Background(), deviceType.Id, &voltha.DeviceType{}); err != nil {
logger.Errorw("Failed-to--device-types-from-cluster-data-proxy", log.Fields{"error": err})
return err
} else if !have {
// Does not exist - save it
clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
- if err := aMgr.clusterDataProxy.AddWithID(context.Background(), "device_types", deviceType.Id, clonedDType); err != nil {
+ if err := aMgr.deviceTypeProxy.Set(context.Background(), deviceType.Id, clonedDType); err != nil {
logger.Errorw("Failed-to-add-device-types-to-cluster-data-proxy", log.Fields{"error": err})
return err
}