blob: 2389c78adb8d9f63aef907df8a811701f437abce [file] [log] [blame]
/*
* Copyright 2019-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package core
import (
"context"
"fmt"
"sync"
"time"
"github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type ownership struct {
id string
owned bool
chnl chan int
}
// DeviceOwnership represent device ownership attributes
type DeviceOwnership struct {
instanceID string
exitChannel chan int
kvClient kvstore.Client
reservationTimeout int64 // Duration in seconds
ownershipPrefix string
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
deviceMap map[string]*ownership
deviceMapLock sync.RWMutex
deviceToKeyMap map[string]string
deviceToKeyMapLock sync.RWMutex
ownershipLock sync.RWMutex
}
// NewDeviceOwnership creates device ownership instance
func NewDeviceOwnership(id string, kvClient kvstore.Client, deviceMgr *DeviceManager, logicalDeviceMgr *LogicalDeviceManager, ownershipPrefix string, reservationTimeout int64) *DeviceOwnership {
var deviceOwnership DeviceOwnership
deviceOwnership.instanceID = id
deviceOwnership.exitChannel = make(chan int, 1)
deviceOwnership.kvClient = kvClient
deviceOwnership.deviceMgr = deviceMgr
deviceOwnership.logicalDeviceMgr = logicalDeviceMgr
deviceOwnership.ownershipPrefix = ownershipPrefix
deviceOwnership.reservationTimeout = reservationTimeout
deviceOwnership.deviceMap = make(map[string]*ownership)
deviceOwnership.deviceMapLock = sync.RWMutex{}
deviceOwnership.deviceToKeyMap = make(map[string]string)
deviceOwnership.deviceToKeyMapLock = sync.RWMutex{}
deviceOwnership.ownershipLock = sync.RWMutex{}
return &deviceOwnership
}
// Start starts device device ownership
func (da *DeviceOwnership) Start(ctx context.Context) {
logger.Info("starting-deviceOwnership", log.Fields{"instanceId": da.instanceID})
logger.Info("deviceOwnership-started")
}
// Stop stops device ownership
func (da *DeviceOwnership) Stop(ctx context.Context) {
logger.Info("stopping-deviceOwnership")
da.exitChannel <- 1
// Need to flush all device reservations
da.abandonAllDevices()
logger.Info("deviceOwnership-stopped")
}
func (da *DeviceOwnership) tryToReserveKey(ctx context.Context, id string) bool {
var currOwner string
//Try to reserve the key
kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
value, err := da.kvClient.Reserve(ctx, kvKey, da.instanceID, da.reservationTimeout)
if err != nil {
logger.Errorw("error", log.Fields{"error": err, "id": id, "instanceId": da.instanceID})
}
if value != nil {
if currOwner, err = kvstore.ToString(value); err != nil {
logger.Error("unexpected-owner-type")
}
return currOwner == da.instanceID
}
return false
}
func (da *DeviceOwnership) renewReservation(ctx context.Context, id string) bool {
// Try to reserve the key
kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
if err := da.kvClient.RenewReservation(ctx, kvKey); err != nil {
logger.Errorw("reservation-renewal-error", log.Fields{"error": err, "instance": da.instanceID})
return false
}
return true
}
func (da *DeviceOwnership) monitorOwnership(ctx context.Context, id string, chnl chan int) {
logger.Debugw("start-device-monitoring", log.Fields{"id": id})
op := "starting"
exit := false
ticker := time.NewTicker(time.Duration(da.reservationTimeout) / 3 * time.Second)
for {
select {
case <-da.exitChannel:
logger.Debugw("closing-monitoring", log.Fields{"Id": id})
exit = true
case <-ticker.C:
logger.Debugw(fmt.Sprintf("%s-reservation", op), log.Fields{"Id": id})
case <-chnl:
logger.Debugw("closing-device-monitoring", log.Fields{"Id": id})
exit = true
}
if exit {
logger.Infow("exiting-device-monitoring", log.Fields{"Id": id})
ticker.Stop()
break
}
deviceOwned, ownedByMe := da.getOwnership(id)
if deviceOwned && ownedByMe {
// Device owned; renew reservation
op = "renew"
if da.renewReservation(ctx, id) {
logger.Debugw("reservation-renewed", log.Fields{"id": id, "instanceId": da.instanceID})
} else {
logger.Debugw("reservation-not-renewed", log.Fields{"id": id, "instanceId": da.instanceID})
}
} else {
// Device not owned or not owned by me; try to seize ownership
op = "retry"
if err := da.setOwnership(id, da.tryToReserveKey(ctx, id)); err != nil {
logger.Errorw("unexpected-error", log.Fields{"error": err})
}
}
}
logger.Debugw("device-monitoring-stopped", log.Fields{"id": id})
}
func (da *DeviceOwnership) getOwnership(id string) (bool, bool) {
da.deviceMapLock.RLock()
defer da.deviceMapLock.RUnlock()
if val, exist := da.deviceMap[id]; exist {
return true, val.owned
}
return false, false
}
func (da *DeviceOwnership) setOwnership(id string, owner bool) error {
da.deviceMapLock.Lock()
defer da.deviceMapLock.Unlock()
if _, exist := da.deviceMap[id]; exist {
if da.deviceMap[id].owned != owner {
logger.Debugw("ownership-changed", log.Fields{"Id": id, "owner": owner})
}
da.deviceMap[id].owned = owner
return nil
}
return status.Error(codes.NotFound, fmt.Sprintf("id-inexistent-%s", id))
}
// GetAllDeviceIdsOwnedByMe returns all the deviceIds (root device Ids) that is managed by this Core
func (da *DeviceOwnership) GetAllDeviceIdsOwnedByMe() []string {
deviceIds := []string{}
da.deviceMapLock.Lock()
defer da.deviceMapLock.Unlock()
for _, ownership := range da.deviceMap {
if ownership.owned {
deviceIds = append(deviceIds, ownership.id)
}
}
return deviceIds
}
// OwnedByMe returns whether this Core instance active owns this device. This function will automatically
// trigger the process to monitor the device and update the device ownership regularly.
func (da *DeviceOwnership) OwnedByMe(ctx context.Context, id interface{}) (bool, error) {
// Retrieve the ownership key based on the id
var ownershipKey string
var err error
var idStr string
var cache bool
if ownershipKey, idStr, cache, err = da.getOwnershipKey(ctx, id); err != nil {
logger.Warnw("no-ownershipkey", log.Fields{"error": err})
return false, err
}
// Update the deviceToKey map, if not from cache
if !cache {
da.deviceToKeyMapLock.Lock()
da.deviceToKeyMap[idStr] = ownershipKey
da.deviceToKeyMapLock.Unlock()
}
// Add a lock to prevent creation of two separate monitoring routines for the same device. When a NB request for a
// device not in memory is received this results in this function being called in rapid succession, once when
// loading the device and once when handling the NB request.
da.ownershipLock.Lock()
defer da.ownershipLock.Unlock()
deviceOwned, ownedByMe := da.getOwnership(ownershipKey)
if deviceOwned {
logger.Debugw("ownership", log.Fields{"Id": ownershipKey, "owned": ownedByMe})
return ownedByMe, nil
}
// Not owned by me or maybe nobody else. Try to reserve it
reservedByMe := da.tryToReserveKey(ctx, ownershipKey)
myChnl := make(chan int)
da.deviceMapLock.Lock()
da.deviceMap[ownershipKey] = &ownership{
id: ownershipKey,
owned: reservedByMe,
chnl: myChnl}
da.deviceMapLock.Unlock()
logger.Debugw("set-new-ownership", log.Fields{"Id": ownershipKey, "owned": reservedByMe})
go da.monitorOwnership(context.Background(), ownershipKey, myChnl)
return reservedByMe, nil
}
//AbandonDevice must be invoked whenever a device is deleted from the Core
func (da *DeviceOwnership) AbandonDevice(id string) error {
if id == "" {
return status.Error(codes.FailedPrecondition, "id-nil")
}
da.deviceMapLock.Lock()
defer da.deviceMapLock.Unlock()
o, exist := da.deviceMap[id]
if exist { // id is ownership key
// Need to clean up all deviceToKeyMap entries using this device as key
da.deviceToKeyMapLock.Lock()
defer da.deviceToKeyMapLock.Unlock()
for k, v := range da.deviceToKeyMap {
if id == v {
delete(da.deviceToKeyMap, k)
}
}
// Remove the device reference from the deviceMap
delete(da.deviceMap, id)
// Stop the Go routine monitoring the device
close(o.chnl)
delete(da.deviceMap, id)
logger.Debugw("abandoning-device", log.Fields{"Id": id})
return nil
}
// id is not ownership key
da.deleteDeviceKey(id)
return nil
}
//abandonAllDevices must be invoked whenever a device is deleted from the Core
func (da *DeviceOwnership) abandonAllDevices() {
da.deviceMapLock.Lock()
defer da.deviceMapLock.Unlock()
da.deviceToKeyMapLock.Lock()
defer da.deviceToKeyMapLock.Unlock()
for k := range da.deviceToKeyMap {
delete(da.deviceToKeyMap, k)
}
for _, val := range da.deviceMap {
close(val.chnl)
}
}
func (da *DeviceOwnership) deleteDeviceKey(id string) {
da.deviceToKeyMapLock.Lock()
defer da.deviceToKeyMapLock.Unlock()
delete(da.deviceToKeyMap, id)
}
// getOwnershipKey returns the ownership key that the id param uses. Ownership key is the parent
// device Id of a child device or the rootdevice of a logical device. This function also returns the
// id in string format of the id param via the ref output as well as if the data was retrieved from cache
func (da *DeviceOwnership) getOwnershipKey(ctx context.Context, id interface{}) (ownershipKey string, ref string, cached bool, err error) {
if id == nil {
return "", "", false, status.Error(codes.InvalidArgument, "nil-id")
}
da.deviceToKeyMapLock.RLock()
defer da.deviceToKeyMapLock.RUnlock()
var device *voltha.Device
var lDevice *voltha.LogicalDevice
// The id can either be a device Id or a logical device id.
if dID, ok := id.(*utils.DeviceID); ok {
// Use cache if present
if val, exist := da.deviceToKeyMap[dID.ID]; exist {
return val, dID.ID, true, nil
}
if device, _ = da.deviceMgr.GetDevice(ctx, dID.ID); device == nil {
return "", dID.ID, false, status.Errorf(codes.NotFound, "id-absent-%s", dID)
}
if device.Root {
return device.Id, dID.ID, false, nil
}
return device.ParentId, dID.ID, false, nil
} else if ldID, ok := id.(*utils.LogicalDeviceID); ok {
// Use cache if present
if val, exist := da.deviceToKeyMap[ldID.ID]; exist {
return val, ldID.ID, true, nil
}
if lDevice, _ = da.logicalDeviceMgr.getLogicalDevice(ctx, ldID.ID); lDevice == nil {
return "", ldID.ID, false, status.Errorf(codes.NotFound, "id-absent-%s", dID)
}
return lDevice.RootDeviceId, ldID.ID, false, nil
}
return "", "", false, status.Error(codes.NotFound, fmt.Sprintf("id-%v", id))
}