VOL-1774 Etcd Crash Handling
Change-Id: I1eeb726654c3972fd0a4fafae134607e5a810415
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index b5f0131..3155ab8 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -110,42 +110,65 @@
return &adapterMgr
}
-func (aMgr *AdapterManager) start(ctx context.Context) {
+func (aMgr *AdapterManager) start(ctx context.Context) error {
log.Info("starting-adapter-manager")
// Load the existing adapterAgents and device types - this will also ensure the correct paths have been
// created if there are no data in the dB to start
- aMgr.loadAdaptersAndDevicetypesInMemory()
+ err := aMgr.loadAdaptersAndDevicetypesInMemory()
+ if err != nil {
+ log.Errorw("Failed-to-load-adapters-and-device-types-in-memeory", log.Fields{"error": err})
+ return err
+ }
//// Create the proxies
- aMgr.adapterProxy = aMgr.clusterDataProxy.CreateProxy(context.Background(), "/adapters", false)
- aMgr.deviceTypeProxy = aMgr.clusterDataProxy.CreateProxy(context.Background(), "/device_types", false)
+ aMgr.adapterProxy, err = aMgr.clusterDataProxy.CreateProxy(context.Background(), "/adapters", false)
+ if err != nil {
+ log.Errorw("Failed-to-create-adapter-proxy", log.Fields{"error": err})
+ return err
+ }
+ aMgr.deviceTypeProxy, err = aMgr.clusterDataProxy.CreateProxy(context.Background(), "/device_types", false)
+ if err != nil {
+ log.Errorw("Failed-to-create-device-proxy", log.Fields{"error": err})
+ return err
+ }
// Register the callbacks
aMgr.adapterProxy.RegisterCallback(model.POST_UPDATE, aMgr.adapterUpdated)
aMgr.deviceTypeProxy.RegisterCallback(model.POST_UPDATE, aMgr.deviceTypesUpdated)
probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusRunning)
log.Info("adapter-manager-started")
+ return nil
}
//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
-func (aMgr *AdapterManager) loadAdaptersAndDevicetypesInMemory() {
+func (aMgr *AdapterManager) loadAdaptersAndDevicetypesInMemory() error {
// Load the adapters
- if adaptersIf := aMgr.clusterDataProxy.List(context.Background(), "/adapters", 0, false, ""); adaptersIf != nil {
+ adaptersIf, err := aMgr.clusterDataProxy.List(context.Background(), "/adapters", 0, false, "")
+ if err != nil {
+ log.Errorw("Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
+ return err
+ }
+ if adaptersIf != nil {
for _, adapterIf := range adaptersIf.([]interface{}) {
if adapter, ok := adapterIf.(*voltha.Adapter); ok {
log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
- aMgr.addAdapter(adapter, false)
+ return aMgr.addAdapter(adapter, false)
}
}
} else {
log.Debug("no-existing-adapter-found")
// No adapter data. In order to have a proxy setup for that path let's create a fake adapter
- aMgr.addAdapter(&voltha.Adapter{Id: SentinelAdapterID}, true)
+ return aMgr.addAdapter(&voltha.Adapter{Id: SentinelAdapterID}, true)
}
// Load the device types
- if deviceTypesIf := aMgr.clusterDataProxy.List(context.Background(), "/device_types", 0, false, ""); deviceTypesIf != nil {
+ deviceTypesIf, err := aMgr.clusterDataProxy.List(context.Background(), "/device_types", 0, false, "")
+ if err != nil {
+ log.Errorw("Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
+ return err
+ }
+ if deviceTypesIf != nil {
dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
@@ -153,12 +176,12 @@
dTypes.Items = append(dTypes.Items, dType)
}
}
- aMgr.addDeviceTypes(dTypes, false)
- } else {
- log.Debug("no-existing-device-type-found")
- // No device types data. In order to have a proxy setup for that path let's create a fake device type
- aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SentinelDevicetypeID, Adapter: SentinelAdapterID}}}, true)
+ return aMgr.addDeviceTypes(dTypes, false)
}
+
+ log.Debug("no-existing-device-type-found")
+ // No device types data. In order to have a proxy setup for that path let's create a fake device type
+ return aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SentinelDevicetypeID, Adapter: SentinelAdapterID}}}, true)
}
//updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
@@ -180,7 +203,12 @@
}
// Update the adapters
- if adaptersIf := aMgr.clusterDataProxy.List(context.Background(), "/adapters", 0, false, ""); adaptersIf != nil {
+ adaptersIf, err := aMgr.clusterDataProxy.List(context.Background(), "/adapters", 0, false, "")
+ if err != nil {
+ log.Errorw("failed-to-list-adapters-from-cluster-proxy", log.Fields{"error": err})
+ return
+ }
+ if adaptersIf != nil {
for _, adapterIf := range adaptersIf.([]interface{}) {
if adapter, ok := adapterIf.(*voltha.Adapter); ok {
log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
@@ -191,7 +219,12 @@
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
// Update the device types
- if deviceTypesIf := aMgr.clusterDataProxy.List(context.Background(), "/device_types", 0, false, ""); deviceTypesIf != nil {
+ deviceTypesIf, err := aMgr.clusterDataProxy.List(context.Background(), "/device_types", 0, false, "")
+ if err != nil {
+ log.Errorw("Failed-to-list-device-types-in-cluster-data-proxy", log.Fields{"error": err})
+ return
+ }
+ if deviceTypesIf != nil {
dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
@@ -202,7 +235,7 @@
}
}
-func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) {
+func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
log.Debugw("adding-adapter", log.Fields{"adapter": adapter})
@@ -211,8 +244,18 @@
aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter, nil)
if saveToDb {
// Save the adapter to the KV store - first check if it already exist
- if kvAdapter := aMgr.clusterDataProxy.Get(context.Background(), "/adapters/"+adapter.Id, 0, false, ""); kvAdapter == nil {
- if added := aMgr.clusterDataProxy.AddWithID(context.Background(), "/adapters", adapter.Id, clonedAdapter, ""); added == nil {
+ kvAdapter, err := aMgr.clusterDataProxy.Get(context.Background(), "/adapters/"+adapter.Id, 0, false, "")
+ if err != nil {
+ log.Errorw("failed-to-get-adapters-from-cluster-proxy", log.Fields{"error": err})
+ return err
+ }
+ if kvAdapter == nil {
+ added, err := aMgr.clusterDataProxy.AddWithID(context.Background(), "/adapters", adapter.Id, clonedAdapter, "")
+ if err != nil {
+ log.Errorw("failed-to-save-adapter-to-cluster-proxy", log.Fields{"error": err})
+ return err
+ }
+ if added == nil {
//TODO: Errors when saving to KV would require a separate go routine to be launched and try the saving again
log.Errorw("failed-to-save-adapter", log.Fields{"adapter": adapter})
} else {
@@ -221,11 +264,12 @@
}
}
}
+ return nil
}
-func (aMgr *AdapterManager) addDeviceTypes(deviceTypes *voltha.DeviceTypes, saveToDb bool) {
+func (aMgr *AdapterManager) addDeviceTypes(deviceTypes *voltha.DeviceTypes, saveToDb bool) error {
if deviceTypes == nil {
- return
+ return fmt.Errorf("no-device-type")
}
log.Debugw("adding-device-types", log.Fields{"deviceTypes": deviceTypes})
aMgr.lockAdaptersMap.Lock()
@@ -245,10 +289,20 @@
if saveToDb {
// Save the device types to the KV store as well
for _, deviceType := range deviceTypes.Items {
- if dType := aMgr.clusterDataProxy.Get(context.Background(), "/device_types/"+deviceType.Id, 0, false, ""); dType == nil {
+ dType, err := aMgr.clusterDataProxy.Get(context.Background(), "/device_types/"+deviceType.Id, 0, false, "")
+ if err != nil {
+ log.Errorw("Failed-to--device-types-from-cluster-data-proxy", log.Fields{"error": err})
+ return err
+ }
+ if dType == nil {
// Does not exist - save it
clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
- if added := aMgr.clusterDataProxy.AddWithID(context.Background(), "/device_types", deviceType.Id, clonedDType, ""); added == nil {
+ added, err := aMgr.clusterDataProxy.AddWithID(context.Background(), "/device_types", deviceType.Id, clonedDType, "")
+ if err != nil {
+ log.Errorw("Failed-to-add-device-types-to-cluster-data-proxy", log.Fields{"error": err})
+ return err
+ }
+ if added == nil {
log.Errorw("failed-to-save-deviceType", log.Fields{"deviceType": deviceType})
} else {
log.Debugw("device-type-saved-to-KV-Store", log.Fields{"deviceType": deviceType})
@@ -256,6 +310,7 @@
}
}
}
+ return nil
}
func (aMgr *AdapterManager) listAdapters(ctx context.Context) (*voltha.Adapters, error) {
@@ -315,7 +370,7 @@
aMgr.deviceTypeToAdapterMap[deviceType.Id] = deviceType.Adapter
}
-func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) *voltha.CoreInstance {
+func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
log.Debugw("registerAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
if aMgr.getAdapter(adapter.Id) != nil {
@@ -326,15 +381,21 @@
log.Errorw("unable-to-restart-adapter", log.Fields{"error": err})
}
}()
- return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}
+ return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
}
// Save the adapter and the device types
- aMgr.addAdapter(adapter, true)
- aMgr.addDeviceTypes(deviceTypes, true)
+ if err := aMgr.addAdapter(adapter, true); err != nil {
+ log.Errorw("failed-to-add-adapter", log.Fields{"error": err})
+ return nil, err
+ }
+ if err := aMgr.addDeviceTypes(deviceTypes, true); err != nil {
+ log.Errorw("failed-to-add-device-types", log.Fields{"error": err})
+ return nil, err
+ }
log.Debugw("adapter-registered", log.Fields{"adapter": adapter.Id})
- return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}
+ return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
}
//getAdapterName returns the name of the device adapter that service this device type
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 8f43643..0ffe8e1 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -150,7 +150,7 @@
if rhp.TestMode { // Execute only for test cases
return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
}
- return rhp.adapterMgr.registerAdapter(adapter, deviceTypes), nil
+ return rhp.adapterMgr.registerAdapter(adapter, deviceTypes)
}
// GetDevice returns device info
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index ef08402..cb23586 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -18,6 +18,7 @@
import (
"context"
+ "fmt"
"time"
"github.com/opencord/voltha-go/db/model"
@@ -63,7 +64,7 @@
}
// NewCore creates instance of rw core
-func NewCore(id string, cf *config.RWCoreFlags, kvClient kvstore.Client, kafkaClient kafka.Client) *Core {
+func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags, kvClient kvstore.Client, kafkaClient kafka.Client) *Core {
var core Core
core.instanceID = id
core.exitChannel = make(chan int, 1)
@@ -86,13 +87,11 @@
PathPrefix: cf.KVStoreDataPrefix}
core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &core.backend)
core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &core.backend)
- core.clusterDataProxy = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
- core.localDataProxy = core.localDataRoot.CreateProxy(context.Background(), "/", false)
return &core
}
// Start brings up core services
-func (core *Core) Start(ctx context.Context) {
+func (core *Core) Start(ctx context.Context) error {
// If the context has a probe then fetch it and register our services
var p *probe.Probe
@@ -119,6 +118,18 @@
if p != nil {
p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
}
+ var err error
+
+ core.clusterDataProxy, err = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
+ if err != nil {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+ return fmt.Errorf("Failed to create cluster data proxy")
+ }
+ core.localDataProxy, err = core.localDataRoot.CreateProxy(context.Background(), "/", false)
+ if err != nil {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+ return fmt.Errorf("Failed to create local data proxy")
+ }
// core.kmp must be created before deviceMgr and adapterMgr, as they will make
// private copies of the poiner to core.kmp.
@@ -152,6 +163,7 @@
"service/voltha/owns_device", 10)
log.Info("core-services-started")
+ return nil
}
// Stop brings down core services
@@ -415,7 +427,10 @@
func (core *Core) startAdapterManager(ctx context.Context) {
log.Info("Adapter-Manager-Starting...")
- core.adapterMgr.start(ctx)
+ err := core.adapterMgr.start(ctx)
+ if err != nil {
+ log.Fatalf("failed-to-start-adapter-manager: error %v ", err)
+ }
log.Info("Adapter-Manager-Started")
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index ca3ffbb..8cbc63a 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -88,7 +88,12 @@
log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceID})
if deviceToCreate == nil {
// Load the existing device
- if loadedDevice := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, ""); loadedDevice != nil {
+ loadedDevice, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
+ if err != nil {
+ log.Errorw("failed-to-get-from-cluster-data-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+ if loadedDevice != nil {
var ok bool
if device, ok = loadedDevice.(*voltha.Device); ok {
agent.deviceType = device.Adapter
@@ -119,14 +124,22 @@
}
// Add the initial device to the local model
- if added := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceID, device, ""); added == nil {
+ added, err := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceID, device, "")
+ if err != nil {
+ log.Errorw("failed-to-save-devices-to-cluster-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+ if added == nil {
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceID})
return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceID)
}
agent.device = proto.Clone(device).(*voltha.Device)
}
-
- agent.deviceProxy = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceID, false)
+ var err error
+ if agent.deviceProxy, err = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceID, false); err != nil {
+ log.Errorw("failed-to-add-devices-to-cluster-proxy", log.Fields{"error": err})
+ return nil, err
+ }
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
log.Debugw("device-agent-started", log.Fields{"deviceId": agent.deviceID})
@@ -143,7 +156,12 @@
agent.deviceProxy.UnregisterCallback(model.POST_UPDATE, agent.processUpdate)
// Remove the device from the KV store
- if removed := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceID, ""); removed == nil {
+ removed, err := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceID, "")
+ if err != nil {
+ log.Errorw("Failed-to-remove-device-from-cluster-data-proxy", log.Fields{"error": err})
+ return
+ }
+ if removed == nil {
log.Debugw("device-already-removed", log.Fields{"id": agent.deviceID})
}
agent.exitChannel <- 1
@@ -157,7 +175,12 @@
defer agent.lockDevice.Unlock()
log.Debug("reconciling-device-agent-devicetype")
// TODO: context timeout
- if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceID, 1, true, ""); device != nil {
+ device, err := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceID, 1, true, "")
+ if err != nil {
+ log.Errorw("Failed to get device info from cluster data proxy", log.Fields{"error": err})
+ return
+ }
+ if device != nil {
if d, ok := device.(*voltha.Device); ok {
agent.deviceType = d.Adapter
agent.device = proto.Clone(d).(*voltha.Device)
@@ -743,7 +766,10 @@
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Store the device
updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, cloned, false, "")
+ afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, cloned, false, "")
+ if err != nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceID)
+ }
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceID)
}
@@ -1331,7 +1357,11 @@
// It is an internal helper function.
func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(device *voltha.Device, strict bool, txid string) error {
updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, device, strict, txid); afterUpdate == nil {
+ afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, device, strict, txid)
+ if err != nil {
+ return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
+ }
+ if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
}
log.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
diff --git a/rw_core/core/device_agent_test.go b/rw_core/core/device_agent_test.go
index 8200ae7..b00c42c 100755
--- a/rw_core/core/device_agent_test.go
+++ b/rw_core/core/device_agent_test.go
@@ -112,8 +112,11 @@
cfg.GrpcHost = "127.0.0.1"
setCoreCompeteMode(inCompeteMode)
client := setupKVClient(cfg, dat.coreInstanceID)
- dat.core = NewCore(dat.coreInstanceID, cfg, client, dat.kClient)
- dat.core.Start(context.Background())
+ dat.core = NewCore(context.Background(), dat.coreInstanceID, cfg, client, dat.kClient)
+ err = dat.core.Start(context.Background())
+ if err != nil {
+ log.Fatal("Cannot start core")
+ }
}
func (dat *DATest) stopAll() {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index a876bbe..7bb869f 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -155,7 +155,13 @@
}
func (dMgr *DeviceManager) createDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
- if dMgr.isParentDeviceExist(device) {
+ deviceExist, err := dMgr.isParentDeviceExist(device)
+ if err != nil {
+ log.Errorf("Failed to fetch parent device info")
+ sendResponse(ctx, ch, err)
+ return
+ }
+ if deviceExist {
log.Errorf("Device is Pre-provisioned already with same IP-Port or MAC Address")
sendResponse(ctx, ch, errors.New("Device is already pre-provisioned"))
return
@@ -167,10 +173,10 @@
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
dMgr.addDeviceAgentToMap(agent)
- device, err := agent.start(ctx, device)
+ device, err = agent.start(ctx, device)
if err != nil {
log.Errorf("Failed to start device")
- sendResponse(ctx, ch, errors.New("Failed to start device"))
+ sendResponse(ctx, ch, err)
return
}
@@ -394,7 +400,12 @@
func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
log.Debug("ListDevices")
result := &voltha.Devices{}
- if devices := dMgr.clusterDataProxy.List(context.Background(), "/devices", 0, false, ""); devices != nil {
+ devices, err := dMgr.clusterDataProxy.List(context.Background(), "/devices", 0, false, "")
+ if err != nil {
+ log.Errorw("failed-to-list-devices-from-cluster-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+ if devices != nil {
for _, device := range devices.([]interface{}) {
// If device is not in memory then set it up
if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
@@ -415,27 +426,37 @@
}
//isParentDeviceExist checks whether device is already preprovisioned.
-func (dMgr *DeviceManager) isParentDeviceExist(newDevice *voltha.Device) bool {
+func (dMgr *DeviceManager) isParentDeviceExist(newDevice *voltha.Device) (bool, error) {
hostPort := newDevice.GetHostAndPort()
- if devices := dMgr.clusterDataProxy.List(context.Background(), "/devices", 0, false, ""); devices != nil {
+ devices, err := dMgr.clusterDataProxy.List(context.Background(), "/devices", 0, false, "")
+ if err != nil {
+ log.Errorw("Failed to list devices from cluster data proxy", log.Fields{"error": err})
+ return false, err
+ }
+ if devices != nil {
for _, device := range devices.([]interface{}) {
if !device.(*voltha.Device).Root {
continue
}
if hostPort != "" && hostPort == device.(*voltha.Device).GetHostAndPort() {
- return true
+ return true, nil
}
if newDevice.MacAddress != "" && newDevice.MacAddress == device.(*voltha.Device).MacAddress {
- return true
+ return true, nil
}
}
}
- return false
+ return false, nil
}
//getDeviceFromModelretrieves the device data from the model.
func (dMgr *DeviceManager) getDeviceFromModel(deviceID string) (*voltha.Device, error) {
- if device := dMgr.clusterDataProxy.Get(context.Background(), "/devices/"+deviceID, 0, false, ""); device != nil {
+ device, err := dMgr.clusterDataProxy.Get(context.Background(), "/devices/"+deviceID, 0, false, "")
+ if err != nil {
+ log.Errorw("failed-to-get-device-info-from-cluster-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+ if device != nil {
if d, ok := device.(*voltha.Device); ok {
return d, nil
}
@@ -938,7 +959,12 @@
if deviceType == "" && vendorID != "" {
log.Debug("device-type-is-nil-fetching-device-type")
- if deviceTypesIf := dMgr.adapterMgr.clusterDataProxy.List(context.Background(), "/device_types", 0, false, ""); deviceTypesIf != nil {
+ deviceTypesIf, err := dMgr.adapterMgr.clusterDataProxy.List(context.Background(), "/device_types", 0, false, "")
+ if err != nil {
+ log.Errorw("failed-to-get-device-type-info", log.Fields{"error": err})
+ return nil, err
+ }
+ if deviceTypesIf != nil {
OLoop:
for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 36cb647..52f8c3b 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -352,7 +352,12 @@
// ListDevices retrieves the latest devices from the data model
func (handler *APIHandler) ListDevices(ctx context.Context, empty *empty.Empty) (*voltha.Devices, error) {
log.Debug("ListDevices")
- return handler.deviceMgr.ListDevices()
+ devices, err := handler.deviceMgr.ListDevices()
+ if err != nil {
+ log.Errorw("Failed to list devices", log.Fields{"error": err})
+ return nil, err
+ }
+ return devices, nil
}
// ListDeviceIds returns the list of device ids managed by a voltha core
@@ -459,7 +464,7 @@
func (handler *APIHandler) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
if device.MacAddress == "" && device.GetHostAndPort() == "" {
log.Errorf("No Device Info Present")
- return &voltha.Device{}, errors.New("No Device Info Present; MAC or HOSTIP&PORT")
+ return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
}
log.Debugw("create-device", log.Fields{"device": *device})
if isTestMode(ctx) {
@@ -482,7 +487,8 @@
case res := <-ch:
if res != nil {
if err, ok := res.(error); ok {
- return &voltha.Device{}, err
+ log.Errorw("create-device-failed", log.Fields{"error": err})
+ return nil, err
}
if d, ok := res.(*voltha.Device); ok {
_, err := handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{ID: d.Id})
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index e9499e3..4acb8aa 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -69,6 +69,8 @@
}
func (nb *NBTest) startCore(inCompeteMode bool) {
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
cfg := config.NewRWCoreFlags()
cfg.CorePairTopic = "rw_core"
cfg.DefaultRequestTimeout = nb.defaultTimeout.Nanoseconds() / 1000000 //TODO: change when Core changes to Duration
@@ -82,11 +84,14 @@
cfg.GrpcHost = "127.0.0.1"
setCoreCompeteMode(inCompeteMode)
client := setupKVClient(cfg, nb.coreInstanceID)
- nb.core = NewCore(nb.coreInstanceID, cfg, client, nb.kClient)
- nb.core.Start(context.Background())
+ nb.core = NewCore(ctx, nb.coreInstanceID, cfg, client, nb.kClient)
+ err = nb.core.Start(context.Background())
+ if err != nil {
+ log.Fatal("Cannot start core")
+ }
}
-func (nb *NBTest) createAndregisterAdapters() {
+func (nb *NBTest) createAndregisterAdapters(t *testing.T) {
// Setup the mock OLT adapter
oltAdapter, err := createMockAdapter(OltAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.oltAdapterName)
if err != nil {
@@ -103,7 +108,10 @@
}
types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
deviceTypes := &voltha.DeviceTypes{Items: types}
- nb.core.adapterMgr.registerAdapter(registrationData, deviceTypes)
+ if _, err := nb.core.adapterMgr.registerAdapter(registrationData, deviceTypes); err != nil {
+ log.Errorw("failed-to-register-adapter", log.Fields{"error": err})
+ assert.NotNil(t, err)
+ }
// Setup the mock ONU adapter
if _, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName); err != nil {
@@ -117,7 +125,10 @@
}
types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
deviceTypes = &voltha.DeviceTypes{Items: types}
- nb.core.adapterMgr.registerAdapter(registrationData, deviceTypes)
+ if _, err := nb.core.adapterMgr.registerAdapter(registrationData, deviceTypes); err != nil {
+ log.Errorw("failed-to-register-adapter", log.Fields{"error": err})
+ assert.NotNil(t, err)
+ }
}
func (nb *NBTest) stopAll() {
@@ -314,7 +325,7 @@
// Try to create a device with invalid data
_, err = nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName})
assert.NotNil(t, err)
- assert.Equal(t, "No Device Info Present; MAC or HOSTIP&PORT", err.Error())
+ assert.Equal(t, "no-device-info-present; MAC or HOSTIP&PORT", err.Error())
// Ensure we only have 1 device in the Core
devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
@@ -477,7 +488,7 @@
nb.testCoreWithoutData(t, nbi)
// Create/register the adapters
- nb.createAndregisterAdapters()
+ nb.createAndregisterAdapters(t)
// 2. Test adapter registration
nb.testAdapterRegistration(t, nbi)
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index e0aac9d..b85d572 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -88,6 +88,7 @@
func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromdB bool) error {
log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceID, "loadFromdB": loadFromdB})
var ld *voltha.LogicalDevice
+ var err error
if !loadFromdB {
//Build the logical device based on information retrieved from the device adapter
var switchCap *ic.SwitchCapability
@@ -113,7 +114,13 @@
agent.lockLogicalDevice.Lock()
// Save the logical device
- if added := agent.clusterDataProxy.AddWithID(ctx, "/logical_devices", ld.Id, ld, ""); added == nil {
+ added, err := agent.clusterDataProxy.AddWithID(ctx, "/logical_devices", ld.Id, ld, "")
+ if err != nil {
+ log.Errorw("failed-to-save-logical-devices-to-cluster-proxy", log.Fields{"error": err})
+ agent.lockLogicalDevice.Unlock()
+ return err
+ }
+ if added == nil {
log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
} else {
log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
@@ -134,7 +141,10 @@
// load from dB - the logical may not exist at this time. On error, just return and the calling function
// will destroy this agent.
agent.lockLogicalDevice.Lock()
- logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
+ logicalDevice, err := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
+ if err != nil {
+ return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
+ }
ld, ok := logicalDevice.(*voltha.LogicalDevice)
if !ok {
agent.lockLogicalDevice.Unlock()
@@ -154,23 +164,38 @@
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
- agent.flowProxy = agent.clusterDataProxy.CreateProxy(
+ agent.flowProxy, err = agent.clusterDataProxy.CreateProxy(
ctx,
fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceID),
false)
- agent.meterProxy = agent.clusterDataProxy.CreateProxy(
+ if err != nil {
+ log.Errorw("failed-to-create-flow-proxy", log.Fields{"error": err})
+ return err
+ }
+ agent.meterProxy, err = agent.clusterDataProxy.CreateProxy(
ctx,
fmt.Sprintf("/logical_devices/%s/meters", agent.logicalDeviceID),
false)
- agent.groupProxy = agent.clusterDataProxy.CreateProxy(
+ if err != nil {
+ log.Errorw("failed-to-create-meter-proxy", log.Fields{"error": err})
+ return err
+ }
+ agent.groupProxy, err = agent.clusterDataProxy.CreateProxy(
ctx,
fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceID),
false)
- agent.ldProxy = agent.clusterDataProxy.CreateProxy(
+ if err != nil {
+ log.Errorw("failed-to-create-group-proxy", log.Fields{"error": err})
+ return err
+ }
+ agent.ldProxy, err = agent.clusterDataProxy.CreateProxy(
ctx,
fmt.Sprintf("/logical_devices/%s", agent.logicalDeviceID),
false)
-
+ if err != nil {
+ log.Errorw("failed-to-create-logical-device-proxy", log.Fields{"error": err})
+ return err
+ }
// TODO: Use a port proxy once the POST_ADD is fixed
if agent.ldProxy != nil {
agent.ldProxy.RegisterCallback(model.POST_UPDATE, agent.portUpdated)
@@ -187,19 +212,23 @@
}
// stop stops the logical devuce agent. This removes the logical device from the data model.
-func (agent *LogicalDeviceAgent) stop(ctx context.Context) {
+func (agent *LogicalDeviceAgent) stop(ctx context.Context) error {
log.Info("stopping-logical_device-agent")
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
//Remove the logical device from the model
- if removed := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceID, ""); removed == nil {
+ if removed, err := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceID, ""); err != nil {
+ log.Errorw("failed-to-remove-device", log.Fields{"error": err})
+ return err
+ } else if removed == nil {
log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
} else {
log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
}
agent.exitChannel <- 1
log.Info("logical_device-agent-stopped")
+ return nil
}
// GetLogicalDevice returns the latest logical device data
@@ -246,7 +275,6 @@
// ListLogicalDevicePorts returns logical device ports
func (agent *LogicalDeviceAgent) ListLogicalDevicePorts() *voltha.LogicalPorts {
log.Debug("ListLogicalDevicePorts")
-
logicalDevice := agent.GetLogicalDevice()
lPorts := make([]*voltha.LogicalPort, 0)
lPorts = append(lPorts, logicalDevice.Ports...)
@@ -491,7 +519,11 @@
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/logical_devices/"+agent.logicalDeviceID, logicalDevice, false, "")
+ afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/logical_devices/"+agent.logicalDeviceID, logicalDevice, false, "")
+ if err != nil {
+ log.Errorw("failed-to-update-logical-devices-to-cluster-proxy", log.Fields{"error": err})
+ return err
+ }
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceID)
}
@@ -945,7 +977,7 @@
}
if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
- log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
}
@@ -1068,7 +1100,7 @@
if changedFlow {
var flowMetadata voltha.FlowMetadata
if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
- log.Error("Meter-referred-in-flows-not-present")
+ log.Error("meter-referred-in-flows-not-present")
return err
}
deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{})
@@ -1080,7 +1112,7 @@
}
if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
- log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
}
@@ -1124,7 +1156,7 @@
}
if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
- log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
} else {
@@ -1173,13 +1205,13 @@
if groupsChanged {
if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
- log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
}
if flowsChanged {
if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
- log.Errorw("Cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ log.Errorw("cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
}
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index b5789cd..f8f9348 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -446,6 +446,7 @@
}
func (lda *LDATest) startCore(inCompeteMode bool) {
+ ctx := context.Background()
cfg := config.NewRWCoreFlags()
cfg.CorePairTopic = "rw_core"
cfg.DefaultRequestTimeout = lda.defaultTimeout.Nanoseconds() / 1000000 //TODO: change when Core changes to Duration
@@ -459,8 +460,11 @@
cfg.GrpcHost = "127.0.0.1"
setCoreCompeteMode(inCompeteMode)
client := setupKVClient(cfg, lda.coreInstanceID)
- lda.core = NewCore(lda.coreInstanceID, cfg, client, lda.kClient)
- lda.core.Start(context.Background())
+ lda.core = NewCore(ctx, lda.coreInstanceID, cfg, client, lda.kClient)
+ err = lda.core.Start(ctx)
+ if err != nil {
+ log.Fatal("Cannot start core")
+ }
}
func (lda *LDATest) stopAll() {
@@ -483,7 +487,8 @@
clonedLD.DatapathId = rand.Uint64()
lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
lDeviceAgent.logicalDevice = clonedLD
- added := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "")
+ added, err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "")
+ assert.Nil(t, err)
assert.NotNil(t, added)
lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
return lDeviceAgent
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 53711d7..6f8b2fa 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -142,7 +142,12 @@
func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
log.Debug("ListAllLogicalDevices")
result := &voltha.LogicalDevices{}
- if logicalDevices := ldMgr.clusterDataProxy.List(context.Background(), "/logical_devices", 0, true, ""); logicalDevices != nil {
+ logicalDevices, err := ldMgr.clusterDataProxy.List(context.Background(), "/logical_devices", 0, true, "")
+ if err != nil {
+ log.Errorw("failed-to-list-logical-devices-from-cluster-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+ if logicalDevices != nil {
for _, logicalDevice := range logicalDevices.([]interface{}) {
result.Items = append(result.Items, logicalDevice.(*voltha.LogicalDevice))
}
@@ -200,7 +205,10 @@
ldAgent := value.(*LogicalDeviceAgent)
if ldAgent.rootDeviceID == id {
log.Infow("stopping-logical-device-agent", log.Fields{"lDeviceId": key})
- ldAgent.stop(context.TODO())
+ if err := ldAgent.stop(context.TODO()); err != nil {
+ log.Errorw("failed-to-stop-LDAgent", log.Fields{"error": err})
+ return false
+ }
ldID = key.(string)
ldMgr.logicalDeviceAgents.Delete(ldID)
}
@@ -211,7 +219,12 @@
//getLogicalDeviceFromModel retrieves the logical device data from the model.
func (ldMgr *LogicalDeviceManager) getLogicalDeviceFromModel(lDeviceID string) (*voltha.LogicalDevice, error) {
- if logicalDevice := ldMgr.clusterDataProxy.Get(context.Background(), "/logical_devices/"+lDeviceID, 0, false, ""); logicalDevice != nil {
+ logicalDevice, err := ldMgr.clusterDataProxy.Get(context.Background(), "/logical_devices/"+lDeviceID, 0, false, "")
+ if err != nil {
+ log.Errorw("failed-to-get-logical-devices-from-cluster-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+ if logicalDevice != nil {
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice, nil
}
@@ -234,7 +247,10 @@
log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
agent := newLogicalDeviceAgent(lDeviceID, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
if err := agent.start(context.TODO(), true); err != nil {
- agent.stop(context.TODO())
+ if err := agent.stop(context.TODO()); err != nil {
+ log.Errorw("failed-to-stop-agent", log.Fields{"error": err})
+ return err
+ }
} else {
ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
}
@@ -275,7 +291,10 @@
logDeviceID := device.ParentId
if agent := ldMgr.getLogicalDeviceAgent(logDeviceID); agent != nil {
// Stop the logical device agent
- agent.stop(ctx)
+ if err := agent.stop(ctx); err != nil {
+ log.Errorw("failed-to-stop-agent", log.Fields{"error": err})
+ return err
+ }
//Remove the logical device agent from the Map
ldMgr.deleteLogicalDeviceAgent(logDeviceID)
err := ldMgr.core.deviceOwnership.AbandonDevice(logDeviceID)