VOL-1512: Set device Active ownership per Core in a Core pair
- Changed NB & SB APIs to seize requests based on device ownership
- Added queue support for change-events
- Need to make prefix & timeout for the device ownership key configurable,
currently hard-coded
- Need to make KV Transaction Monitor timeout configurable,
currently hard-coded
- Need to clean up AdapterRequestHandlerProxy & LogicalDeviceManager
constructors
Change-Id: Ieeb3df6d70baa529b87c8253cb9f0f5b2a94382a
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 13baf84..02f99fd 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -50,6 +50,7 @@
kafkaClient kafka.Client
coreMembership *voltha.Membership
membershipLock *sync.RWMutex
+ deviceOwnership *DeviceOwnership
}
func init() {
@@ -64,6 +65,10 @@
core.kvClient = kvClient
core.kafkaClient = kafkaClient
+ // Setup device ownership context
+ core.deviceOwnership = NewDeviceOwnership(id, kvClient,
+ "service/voltha/owns_device", 60)
+
// Setup the KV store
// Do not call NewBackend constructor; it creates its own KV client
// Commented the backend for now until the issue between the model and the KV store
@@ -90,8 +95,8 @@
}
log.Info("values", log.Fields{"kmp": core.kmp})
core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
- core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
- core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
+ core.deviceMgr = newDeviceManager(core)
+ core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy)
if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
log.Fatal("Failure-registering-adapterRequestHandler")
@@ -179,7 +184,7 @@
func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
) error {
- requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+ requestProxy := NewAdapterRequestHandlerProxy(core, coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
// Register the broadcast topic to handle any core-bound broadcast requests