VOL-1465 : Integrate rw sync fixes into ro core
- Added new config-map for affinity router
- Removed some really verbose model logs
- Fail core when kv client cannot be established
Change-Id: I7f10c7ce3121abaf88fa622aecb3affd40813d67
diff --git a/ro_core/core/device_agent.go b/ro_core/core/device_agent.go
index 1ae1275..ce4125e 100644
--- a/ro_core/core/device_agent.go
+++ b/ro_core/core/device_agent.go
@@ -51,11 +51,23 @@
}
// start save the device to the data model and registers for callbacks on that device
-func (agent *DeviceAgent) start(ctx context.Context) {
- log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
+func (agent *DeviceAgent) start(ctx context.Context, loadFromDb bool) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
+ log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
+ if loadFromDb {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ agent.lastData = proto.Clone(d).(*voltha.Device)
+ }
+ } else {
+ log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceId})
+ return status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+ }
+ log.Debugw("device-loaded-from-dB", log.Fields{"device": agent.lastData})
+ }
log.Debug("device-agent-started")
+ return nil
}
// stop stops the device agent. Not much to do for now
@@ -71,7 +83,7 @@
func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
@@ -83,7 +95,7 @@
// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
// This function is meant so that we do not have duplicate code all over the device agent functions
func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
diff --git a/ro_core/core/device_manager.go b/ro_core/core/device_manager.go
index 43bc35f..92c6516 100644
--- a/ro_core/core/device_manager.go
+++ b/ro_core/core/device_manager.go
@@ -83,15 +83,25 @@
}
func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
- // TODO If the device is not in memory it needs to be loaded first
dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
if agent, ok := dMgr.deviceAgents[deviceId]; ok {
+ dMgr.lockDeviceAgentsMap.Unlock()
return agent
+ } else {
+ // Try to load into memory - loading will also create the device agent
+ dMgr.lockDeviceAgentsMap.Unlock()
+ if err := dMgr.load(deviceId); err == nil {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ if agent, ok = dMgr.deviceAgents[deviceId]; ok {
+ return agent
+ }
+ }
}
return nil
}
+// listDeviceIdsFromMap returns the list of device IDs that are in memory
func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
dMgr.lockDeviceAgentsMap.Lock()
defer dMgr.lockDeviceAgentsMap.Unlock()
@@ -102,6 +112,7 @@
return result
}
+// GetDevice will returns a device, either from memory or from the dB, if present
func (dMgr *DeviceManager) GetDevice(id string) (*voltha.Device, error) {
log.Debugw("GetDevice", log.Fields{"deviceid": id})
if agent := dMgr.getDeviceAgent(id); agent != nil {
@@ -110,6 +121,13 @@
return nil, status.Errorf(codes.NotFound, "%s", id)
}
+func (dMgr *DeviceManager) IsDeviceInCache(id string) bool {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ _, exist := dMgr.deviceAgents[id]
+ return exist
+}
+
func (dMgr *DeviceManager) IsRootDevice(id string) (bool, error) {
device, err := dMgr.GetDevice(id)
if err != nil {
@@ -124,10 +142,15 @@
result := &voltha.Devices{}
if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
for _, device := range devices.([]interface{}) {
- if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
- agent = newDeviceAgent(device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
- dMgr.addDeviceAgentToMap(agent)
- agent.start(nil)
+ // If device is not in memory then set it up
+ if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
+ agent := newDeviceAgent(device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
+ agent.stop(nil)
+ } else {
+ dMgr.addDeviceAgentToMap(agent)
+ }
}
result.Items = append(result.Items, device.(*voltha.Device))
}
@@ -135,6 +158,97 @@
return result, nil
}
+// loadDevice loads the deviceId in memory, if not present
+func (dMgr *DeviceManager) loadDevice(deviceId string) (*DeviceAgent, error) {
+ log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
+ // Sanity check
+ if deviceId == "" {
+ return nil, status.Error(codes.InvalidArgument, "deviceId empty")
+ }
+ if !dMgr.IsDeviceInCache(deviceId) {
+ agent := newDeviceAgent(&voltha.Device{Id: deviceId}, dMgr, dMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ agent.stop(nil)
+ return nil, err
+ }
+ dMgr.addDeviceAgentToMap(agent)
+ }
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent, nil
+ }
+ return nil, status.Error(codes.NotFound, deviceId) // This should nto happen
+}
+
+// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
+func (dMgr *DeviceManager) loadRootDeviceParentAndChildren(device *voltha.Device) error {
+ log.Debugw("loading-parent-and-children", log.Fields{"deviceId": device.Id})
+ if device.Root {
+ // Scenario A
+ if device.ParentId != "" {
+ // Load logical device if needed.
+ if err := dMgr.logicalDeviceMgr.load(device.ParentId); err != nil {
+ log.Warnw("failure-loading-logical-device", log.Fields{"lDeviceId": device.ParentId})
+ }
+ } else {
+ log.Debugw("no-parent-to-load", log.Fields{"deviceId": device.Id})
+ }
+ // Load all child devices, if needed
+ if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
+ for _, childDeviceId := range childDeviceIds {
+ if _, err := dMgr.loadDevice(childDeviceId); err != nil {
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId})
+ return err
+ }
+ }
+ log.Debugw("loaded-children", log.Fields{"deviceId": device.Id, "numChildren": len(childDeviceIds)})
+ } else {
+ log.Debugw("no-child-to-load", log.Fields{"deviceId": device.Id})
+ }
+ }
+ return nil
+}
+
+// load loads the deviceId in memory, if not present, and also loads its accompanying parents and children. Loading
+// in memory is for improved performance. It is not imperative that a device needs to be in memory when a request
+// acting on the device is received by the core. In such a scenario, the Core will load the device in memory first
+// and the proceed with the request.
+func (dMgr *DeviceManager) load(deviceId string) error {
+ log.Debug("load...")
+ // First load the device - this may fail in case the device was deleted intentionally by the other core
+ var dAgent *DeviceAgent
+ var err error
+ if dAgent, err = dMgr.loadDevice(deviceId); err != nil {
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": deviceId})
+ return err
+ }
+ // Get the loaded device details
+ var device *voltha.Device
+ if device, err = dAgent.getDevice(); err != nil {
+ return err
+ }
+
+ // If the device is in Pre-provisioning or deleted state stop here
+ if device.AdminState == voltha.AdminState_PREPROVISIONED || device.AdminState == voltha.AdminState_DELETED {
+ return nil
+ }
+
+ // Now we face two scenarios
+ if device.Root {
+ // Load all children as well as the parent of this device (logical_device)
+ if err := dMgr.loadRootDeviceParentAndChildren(device); err != nil {
+ log.Warnw("failure-loading-device-parent-and-children", log.Fields{"deviceId": deviceId})
+ return err
+ }
+ log.Debugw("successfully-loaded-parent-and-children", log.Fields{"deviceId": deviceId})
+ } else {
+ // Scenario B - use the parentId of that device (root device) to trigger the loading
+ if device.ParentId != "" {
+ return dMgr.load(device.ParentId)
+ }
+ }
+ return nil
+}
+
// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
func (dMgr *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
log.Debug("ListDeviceIDs")
@@ -151,17 +265,17 @@
reconciled := 0
for _, id := range ids.Items {
// Act on the device only if its not present in the agent map
- if agent := dMgr.getDeviceAgent(id.Id); agent == nil {
+ if !dMgr.IsDeviceInCache(id.Id) {
// Device Id not in memory
log.Debugw("reconciling-device", log.Fields{"id": id.Id})
- // Load device from model
- if device := dMgr.clusterDataProxy.Get("/devices/"+id.Id, 0, false, ""); device != nil {
- agent = newDeviceAgent(device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
- dMgr.addDeviceAgentToMap(agent)
- agent.start(nil)
- reconciled += 1
+ // Load device from dB
+ agent := newDeviceAgent(&voltha.Device{Id: id.Id}, dMgr, dMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
+ agent.stop(nil)
} else {
- log.Warnw("device-inexistent", log.Fields{"id": id.Id})
+ dMgr.addDeviceAgentToMap(agent)
+ reconciled += 1
}
} else {
reconciled += 1
@@ -209,7 +323,6 @@
return agent.ListDeviceFlows(ctx)
}
return nil, status.Errorf(codes.NotFound, "%s", deviceId)
-
}
func (dMgr *DeviceManager) ListDeviceFlowGroups(ctx context.Context, deviceId string) (*voltha.FlowGroups, error) {
@@ -230,7 +343,7 @@
}
-func (dMgr *DeviceManager) GetImageDownload(ctx context.Context, deviceId string, imageName string) ( *voltha.ImageDownload, error) {
+func (dMgr *DeviceManager) GetImageDownload(ctx context.Context, deviceId string, imageName string) (*voltha.ImageDownload, error) {
log.Debugw("GetImageDownload", log.Fields{"deviceid": deviceId, "imagename": imageName})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.GetImageDownload(ctx, imageName)
@@ -239,7 +352,7 @@
}
-func (dMgr *DeviceManager) ListImageDownloads(ctx context.Context, deviceId string) ( *voltha.ImageDownloads, error) {
+func (dMgr *DeviceManager) ListImageDownloads(ctx context.Context, deviceId string) (*voltha.ImageDownloads, error) {
log.Debugw("ListImageDownloads", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.ListImageDownloads(ctx)
@@ -248,7 +361,7 @@
}
-func (dMgr *DeviceManager) GetImages(ctx context.Context, deviceId string) ( *voltha.Images, error) {
+func (dMgr *DeviceManager) GetImages(ctx context.Context, deviceId string) (*voltha.Images, error) {
log.Debugw("GetImages", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.GetImages(ctx)
@@ -266,3 +379,18 @@
parentDevice, _ := dMgr.GetDevice(childDevice.ParentId)
return parentDevice
}
+
+//getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
+func (dMgr *DeviceManager) getAllChildDeviceIds(parentDevice *voltha.Device) ([]string, error) {
+ log.Debugw("getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id})
+ childDeviceIds := make([]string, 0)
+ if parentDevice != nil {
+ for _, port := range parentDevice.Ports {
+ for _, peer := range port.Peers {
+ childDeviceIds = append(childDeviceIds, peer.DeviceId)
+ }
+ }
+ }
+ return childDeviceIds, nil
+}
+
diff --git a/ro_core/core/logical_device_agent.go b/ro_core/core/logical_device_agent.go
index 9cb6655..f3153ca 100644
--- a/ro_core/core/logical_device_agent.go
+++ b/ro_core/core/logical_device_agent.go
@@ -17,6 +17,7 @@
import (
"context"
+ "github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/protos/voltha"
@@ -49,10 +50,22 @@
}
// start creates the logical device and add it to the data model
-func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
- log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromDb bool) error {
+ log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId, "loadFromdB": loadFromDb})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
+ if loadFromDb {
+ // load from dB - the logical may not exist at this time. On error, just return and the calling function
+ // will destroy this agent.
+ if logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, ""); logicalDevice != nil {
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ agent.lastData = proto.Clone(lDevice).(*voltha.LogicalDevice)
+ }
+ } else {
+ log.Errorw("failed-to-load-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ return status.Errorf(codes.NotFound, "logicaldeviceId-%s", agent.logicalDeviceId)
+ }
+ }
log.Info("logical_device-agent-started")
return nil
}
@@ -72,21 +85,20 @@
log.Debug("GetLogicalDevice")
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
- if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
- return lDevice, nil
+ if logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false,
+ ""); logicalDevice != nil {
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ return lDevice, nil
+ }
}
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
func (agent *LogicalDeviceAgent) ListLogicalDevicePorts() (*voltha.LogicalPorts, error) {
- log.Debug("!!!!!ListLogicalDevicePorts")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
- if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ log.Debug("ListLogicalDevicePorts")
+ if logicalDevice, _ := agent.ldeviceMgr.getLogicalDevice(agent.logicalDeviceId); logicalDevice != nil {
lPorts := make([]*voltha.LogicalPort, 0)
- for _, port := range lDevice.Ports {
+ for _, port := range logicalDevice.Ports {
lPorts = append(lPorts, port)
}
return &voltha.LogicalPorts{Items: lPorts}, nil
@@ -96,24 +108,18 @@
// listFlows locks the logical device model and then retrieves the latest flow information
func (agent *LogicalDeviceAgent) ListLogicalDeviceFlows() (*voltha.Flows, error) {
- log.Debug("listFlows")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId+"/flows", 1, false, "")
- if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
- return lDevice.Flows, nil
+ log.Debug("ListLogicalDeviceFlows")
+ if logicalDevice, _ := agent.ldeviceMgr.getLogicalDevice(agent.logicalDeviceId); logicalDevice != nil {
+ return logicalDevice.GetFlows(), nil
}
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
// listFlowGroups locks the logical device model and then retrieves the latest flow groups information
func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups() (*voltha.FlowGroups, error) {
- log.Debug("listFlowGroups")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
- if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
- return lDevice.FlowGroups, nil
+ log.Debug("ListLogicalDeviceFlowGroups")
+ if logicalDevice, _ := agent.ldeviceMgr.getLogicalDevice(agent.logicalDeviceId); logicalDevice != nil {
+ return logicalDevice.GetFlowGroups(), nil
}
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
diff --git a/ro_core/core/logical_device_manager.go b/ro_core/core/logical_device_manager.go
index 64ccf28..9ee3456 100644
--- a/ro_core/core/logical_device_manager.go
+++ b/ro_core/core/logical_device_manager.go
@@ -91,21 +91,33 @@
return nil, status.Errorf(codes.NotFound, "%s", id)
}
+func (ldMgr *LogicalDeviceManager) IsLogicalDeviceInCache(id string) bool {
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ _, exist := ldMgr.logicalDeviceAgents[id]
+ return exist
+}
+
func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
log.Debug("ListAllLogicalDevices")
result := &voltha.LogicalDevices{}
- if logicalDevices := ldMgr.clusterDataProxy.Get("/logical_devices", 0, false, ""); logicalDevices != nil {
+ if logicalDevices := ldMgr.clusterDataProxy.List("/logical_devices", 0, false, ""); logicalDevices != nil {
for _, logicalDevice := range logicalDevices.([]interface{}) {
- if agent := ldMgr.getLogicalDeviceAgent(logicalDevice.(*voltha.LogicalDevice).Id); agent == nil {
- agent = newLogicalDeviceAgent(
+ // If device is not in memory then set it up
+ if !ldMgr.IsLogicalDeviceInCache(logicalDevice.(*voltha.LogicalDevice).Id) {
+ agent := newLogicalDeviceAgent(
logicalDevice.(*voltha.LogicalDevice).Id,
logicalDevice.(*voltha.LogicalDevice).RootDeviceId,
ldMgr,
ldMgr.deviceMgr,
ldMgr.clusterDataProxy,
)
- ldMgr.addLogicalDeviceAgentToMap(agent)
- go agent.start(nil)
+ if err := agent.start(nil, true); err != nil {
+ log.Warnw("failure-starting-agent", log.Fields{"logicalDeviceId": logicalDevice.(*voltha.LogicalDevice).Id})
+ agent.stop(nil)
+ } else {
+ ldMgr.addLogicalDeviceAgentToMap(agent)
+ }
}
result.Items = append(result.Items, logicalDevice.(*voltha.LogicalDevice))
}
@@ -113,6 +125,26 @@
return result, nil
}
+// load loads a logical device manager in memory
+func (ldMgr *LogicalDeviceManager) load(lDeviceId string) error {
+ log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
+ // To prevent a race condition, let's hold the logical device agent map lock. This will prevent a loading and
+ // a create logical device callback from occurring at the same time.
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ if ldAgent, _ := ldMgr.logicalDeviceAgents[lDeviceId]; ldAgent == nil {
+ // Logical device not in memory - create a temp logical device Agent and let it load from memory
+ agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ agent.stop(nil)
+ return err
+ }
+ ldMgr.logicalDeviceAgents[agent.logicalDeviceId] = agent
+ }
+ // TODO: load the child device
+ return nil
+}
+
func (ldMgr *LogicalDeviceManager) getLogicalDeviceId(device *voltha.Device) (*string, error) {
// Device can either be a parent or a child device
if device.Root {
diff --git a/ro_core/main.go b/ro_core/main.go
index c86cd45..599fe9f 100644
--- a/ro_core/main.go
+++ b/ro_core/main.go
@@ -96,7 +96,11 @@
// Setup KV Client
log.Debugw("create-kv-client", log.Fields{"kvstore": ro.config.KVStoreType})
- ro.setKVClient()
+
+ if err := ro.setKVClient(); err != nil {
+ log.Fatalw("failed-to-connect-kv-client", log.Fields{"error": err})
+ return
+ }
// Create the core service
ro.core = c.NewCore(ro.config.InstanceID, ro.config, ro.kvClient)