[VOL-1512] Set Device Active Ownership
This update is the first commit for device ownership. It creates
a separate file to manage device ownership in a Core.
Change-Id: I11ad682056394c3f37d6c42834bcfceab880cbb5
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
new file mode 100644
index 0000000..b881351
--- /dev/null
+++ b/rw_core/core/device_ownership.go
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+ "time"
+)
+
+type ownership struct {
+ id string
+ owned bool
+ chnl chan int
+}
+
+type DeviceOwnership struct {
+ instanceId string
+ exitChannel chan int
+ kvClient kvstore.Client
+ reservationTimeout int64 // Duration in seconds
+ ownershipPrefix string
+ deviceMap map[string]*ownership
+ deviceMapLock *sync.RWMutex
+}
+
+func NewDeviceOwnership(id string, kvClient kvstore.Client, ownershipPrefix string, reservationTimeout int64) *DeviceOwnership {
+ var deviceOwnership DeviceOwnership
+ deviceOwnership.instanceId = id
+ deviceOwnership.exitChannel = make(chan int, 1)
+ deviceOwnership.kvClient = kvClient
+ deviceOwnership.ownershipPrefix = ownershipPrefix
+ deviceOwnership.reservationTimeout = reservationTimeout
+ deviceOwnership.deviceMap = make(map[string]*ownership)
+ deviceOwnership.deviceMapLock = &sync.RWMutex{}
+ return &deviceOwnership
+}
+
+func (da *DeviceOwnership) Start(ctx context.Context) {
+ log.Info("starting-deviceOwnership", log.Fields{"instanceId": da.instanceId})
+ log.Info("deviceOwnership-started")
+}
+
+func (da *DeviceOwnership) Stop(ctx context.Context) {
+ log.Info("stopping-deviceOwnership")
+ da.exitChannel <- 1
+ // Need to flush all device reservations
+ log.Info("deviceOwnership-stopped")
+}
+
+func (da *DeviceOwnership) tryToReserveKey(id string) bool {
+ var currOwner string
+ // Try to reserve the key
+ kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
+ value, err := da.kvClient.Reserve(kvKey, da.instanceId, da.reservationTimeout)
+ if value != nil {
+ if currOwner, err = kvstore.ToString(value); err != nil {
+ log.Error("unexpected-owner-type")
+ }
+ return currOwner == da.instanceId
+ }
+ return false
+}
+
+func (da *DeviceOwnership) startOwnershipMonitoring(id string, chnl chan int) {
+startloop:
+ for {
+ if err := da.setOwnership(id, da.tryToReserveKey(id)); err != nil {
+ log.Errorw("unexpected-error", log.Fields{"error": err})
+ }
+ select {
+ case <-da.exitChannel:
+ log.Infow("closing-monitoring", log.Fields{"Id": id})
+ break startloop
+ case <-time.After(time.Duration(da.reservationTimeout) / 3 * time.Second):
+ log.Infow("renew-reservation", log.Fields{"Id": id})
+ case <-chnl:
+ log.Infow("closing-device-monitoring", log.Fields{"Id": id})
+ break startloop
+ }
+ }
+}
+
+func (da *DeviceOwnership) getOwnership(id string) bool {
+ da.deviceMapLock.RLock()
+ defer da.deviceMapLock.RUnlock()
+ if val, exist := da.deviceMap[id]; exist {
+ return val.owned
+ }
+ log.Debugw("setting-up-new-ownership", log.Fields{"Id": id})
+ // Not owned by me or maybe anybody else. Try to reserve it
+ reservedByMe := da.tryToReserveKey(id)
+ myChnl := make(chan int)
+ da.deviceMap[id] = &ownership{id: id, owned: reservedByMe, chnl: myChnl}
+ go da.startOwnershipMonitoring(id, myChnl)
+ return reservedByMe
+}
+
+func (da *DeviceOwnership) setOwnership(id string, owner bool) error {
+ da.deviceMapLock.Lock()
+ defer da.deviceMapLock.Unlock()
+ if _, exist := da.deviceMap[id]; exist {
+ if da.deviceMap[id].owned != owner {
+ log.Debugw("ownership-changed", log.Fields{"Id": id, "owner": owner})
+ }
+ da.deviceMap[id].owned = owner
+ return nil
+ }
+ return status.Error(codes.NotFound, fmt.Sprintf("id-inexistent-%s", id))
+}
+
+// OwnedByMe returns where this Core instance active owns this device. This function will automatically
+// trigger the process to monitor the device and update the device ownership regularly.
+func (da *DeviceOwnership) OwnedByMe(id string) bool {
+ return da.getOwnership(id)
+}
+
+//AbandonDevice must be invoked whenever a device is deleted from the Core
+func (da *DeviceOwnership) AbandonDevice(id string) error {
+ da.deviceMapLock.Lock()
+ defer da.deviceMapLock.Unlock()
+ if o, exist := da.deviceMap[id]; exist {
+ // Stop the Go routine monitoring the device
+ close(o.chnl)
+ delete(da.deviceMap, id)
+ log.Debugw("abandoning-device", log.Fields{"Id": id})
+ return nil
+ }
+ return status.Error(codes.NotFound, fmt.Sprintf("id-inexistent-%s", id))
+}
\ No newline at end of file