[VOL-2164] Update rw-core to use the Async Kafka API
This commit consists of the following:
1. Process per-device requests in the Core in the order they are
received. If there are lots of requests on a given device then
there will be some latencies introduced due to ordering. With
recent changes in the model along with keeping the request lock
to a minimal then these latencies are reduced. Testing did not
show and noticeable latencies.
2) Keep the request lock from the moment a request started
processing to the moment that request is sent to kafka (when
applicable). Adapter responses are received and processed
asynchronously. Therefore, an adapter can takes all the time it
needs to process a transaction. The Core still has a context
with timeout (configurable) to cater for cases where the adapter
does not return a response.
3) Adapter requests are processed to completion before sending a
reponse back to the adapter. Previously, in some cases, a
separate go routine was created to process the request and a
successful response is sent to the adapter. Now if the request
fails then the adapter will receive an error. The adapter
requests for a given device are therefore processed in the
order they are received.
4) Some changes are made when retrieving a handler to execute
a device state transition. This was necessary as there was some
transition overlap found.
Update after multiple reviews.
Change-Id: I55a189efec1549a662f2d71e18e6eca9015a3a17
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 98c45eb..0d77429 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -19,10 +19,6 @@
import (
"context"
"errors"
- "reflect"
- "runtime"
- "sync"
-
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -33,6 +29,10 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "reflect"
+ "runtime"
+ "sync"
+ "time"
)
// DeviceManager represent device manager attributes
@@ -49,7 +49,7 @@
clusterDataProxy *model.Proxy
coreInstanceID string
exitChannel chan int
- defaultTimeout int64
+ defaultTimeout time.Duration
devicesLoadingLock sync.RWMutex
deviceLoadingInProgress map[string][]chan int
}
@@ -65,7 +65,7 @@
deviceMgr.clusterDataProxy = core.clusterDataProxy
deviceMgr.adapterMgr = core.adapterMgr
deviceMgr.lockRootDeviceMap = sync.RWMutex{}
- deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
+ deviceMgr.defaultTimeout = time.Duration(core.config.DefaultCoreTimeout) * time.Millisecond
deviceMgr.devicesLoadingLock = sync.RWMutex{}
deviceMgr.deviceLoadingInProgress = make(map[string][]chan int)
return &deviceMgr
@@ -121,7 +121,7 @@
if ok {
return agent.(*DeviceAgent)
}
- // Try to load into memory - loading will also create the device agent and set the device ownership
+ // Try to load into memory - loading will also create the device agent and set the device ownership
err := dMgr.load(ctx, deviceID)
if err == nil {
agent, ok = dMgr.deviceAgents.Load(deviceID)
@@ -172,13 +172,13 @@
device.Root = true
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- dMgr.addDeviceAgentToMap(agent)
device, err = agent.start(ctx, device)
if err != nil {
- log.Errorf("Failed to start device")
+ log.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
sendResponse(ctx, ch, err)
return
}
+ dMgr.addDeviceAgentToMap(agent)
sendResponse(ctx, ch, device)
}
@@ -251,19 +251,21 @@
// We do not need to stop the child devices as this is taken care by the state machine.
}
if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
- agent.stop(ctx)
+ if err := agent.stop(ctx); err != nil {
+ log.Warnw("unable-to-stop-device-agent", log.Fields{"device-id": agent.deviceID, "error": err})
+ }
dMgr.deleteDeviceAgentFromMap(agent)
// Abandon the device ownership
err := dMgr.core.deviceOwnership.AbandonDevice(id)
if err != nil {
- log.Errorw("unable-to-abandon-device", log.Fields{"error": err})
+ log.Warnw("unable-to-abandon-device", log.Fields{"error": err})
}
}
}
}
// RunPostDeviceDelete removes any reference of this device
-func (dMgr *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
log.Infow("RunPostDeviceDelete", log.Fields{"deviceId": cDevice.Id})
dMgr.stopManagingDevice(ctx, cDevice.Id)
return nil
@@ -273,7 +275,7 @@
func (dMgr *DeviceManager) GetDevice(ctx context.Context, id string) (*voltha.Device, error) {
log.Debugw("GetDevice", log.Fields{"deviceid": id})
if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
- return agent.getDevice(), nil
+ return agent.getDevice(ctx)
}
return nil, status.Errorf(codes.NotFound, "%s", id)
}
@@ -404,19 +406,19 @@
return nil, err
}
if devices != nil {
- for _, device := range devices.([]interface{}) {
+ for _, d := range devices.([]interface{}) {
+ device := d.(*voltha.Device)
// If device is not in memory then set it up
- if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
- log.Debugw("loading-device-from-Model", log.Fields{"id": device.(*voltha.Device).Id})
- agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ if !dMgr.IsDeviceInCache(device.Id) {
+ log.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
+ agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if _, err := agent.start(ctx, nil); err != nil {
- log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
- agent.stop(ctx)
+ log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
} else {
dMgr.addDeviceAgentToMap(agent)
}
}
- result.Items = append(result.Items, device.(*voltha.Device))
+ result.Items = append(result.Items, device)
}
}
log.Debugw("ListDevices-end", log.Fields{"len": len(result.Items)})
@@ -480,7 +482,6 @@
agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if _, err = agent.start(ctx, nil); err != nil {
log.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
- agent.stop(ctx)
} else {
dMgr.addDeviceAgentToMap(agent)
}
@@ -554,7 +555,10 @@
return err
}
// Get the loaded device details
- device := dAgent.getDevice()
+ device, err := dAgent.getDevice(ctx)
+ if err != nil {
+ return err
+ }
// If the device is in Pre-provisioning or deleted state stop here
if device.AdminState == voltha.AdminState_PREPROVISIONED || device.AdminState == voltha.AdminState_DELETED {
@@ -679,14 +683,20 @@
// to the adapter. We will therefore bypass the adapter adapter and send the request directly to the adapter via
// the adapter_proxy.
response := utils.NewResponse()
- go func(device *voltha.Device) {
- if err := dMgr.adapterProxy.ReconcileDevice(ctx, device); err != nil {
- log.Errorw("reconcile-request-failed", log.Fields{"deviceId": device.Id, "error": err})
- response.Error(status.Errorf(codes.Internal, "device: %s", device.Id))
+ ch, err := dMgr.adapterProxy.reconcileDevice(ctx, device)
+ if err != nil {
+ response.Error(err)
+ }
+ // Wait for adapter response in its own routine
+ go func() {
+ resp, ok := <-ch
+ if !ok {
+ response.Error(status.Errorf(codes.Aborted, "channel-closed-device: %s", device.Id))
+ } else if resp.Err != nil {
+ response.Error(resp.Err)
}
response.Done()
- }(device)
-
+ }()
return response
}
@@ -751,14 +761,6 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) deletePeerPorts(ctx context.Context, fromDeviceID string, deviceID string) error {
- log.Debugw("deletePeerPorts", log.Fields{"fromDeviceId": fromDeviceID, "deviceid": deviceID})
- if agent := dMgr.getDeviceAgent(ctx, fromDeviceID); agent != nil {
- return agent.deletePeerPorts(ctx, deviceID)
- }
- return status.Errorf(codes.NotFound, "%s", deviceID)
-}
-
func (dMgr *DeviceManager) addFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceID, "groups:": groups, "flowMetadata": flowMetadata})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
@@ -894,6 +896,7 @@
}
}()
}
+ return nil
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
@@ -997,28 +1000,30 @@
childDevice.SerialNumber = serialNumber
childDevice.Root = false
- //Get parent device type
- parent, err := dMgr.GetDevice(ctx, parentDeviceID)
- if err != nil {
- log.Error("no-parent-found", log.Fields{"parentId": parentDeviceID})
+ // Get parent device type
+ pAgent := dMgr.getDeviceAgent(ctx, parentDeviceID)
+ if pAgent == nil {
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
+ if pAgent.deviceType == "" {
+ return nil, status.Errorf(codes.FailedPrecondition, "device Type not set %s", parentDeviceID)
+ }
if device, err := dMgr.GetChildDevice(ctx, parentDeviceID, serialNumber, onuID, parentPortNo); err == nil {
log.Warnw("child-device-exists", log.Fields{"parentId": parentDeviceID, "serialNumber": serialNumber})
return device, status.Errorf(codes.AlreadyExists, "%s", serialNumber)
}
- childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: parent.Type, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
+ childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- dMgr.addDeviceAgentToMap(agent)
- childDevice, err = agent.start(ctx, childDevice)
+ childDevice, err := agent.start(ctx, childDevice)
if err != nil {
- log.Error("error-starting-child")
+ log.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
return nil, err
}
+ dMgr.addDeviceAgentToMap(agent)
// Since this Core has handled this request then it therefore owns this child device. Set the
// ownership of this device to this Core
@@ -1055,10 +1060,10 @@
log.Debugw("no-op-transition", log.Fields{"deviceId": current.Id})
return nil
}
- log.Debugw("handler-found", log.Fields{"num-handlers": len(handlers), "isParent": current.Root, "current-data": current})
+ log.Debugw("handler-found", log.Fields{"num-expectedHandlers": len(handlers), "isParent": current.Root, "current-data": current})
for _, handler := range handlers {
log.Debugw("running-handler", log.Fields{"handler": funcName(handler)})
- if err := handler(ctx, current); err != nil {
+ if err := handler(ctx, current, previous); err != nil {
log.Warnw("handler-failed", log.Fields{"handler": funcName(handler), "error": err})
return err
}
@@ -1104,7 +1109,7 @@
}
// CreateLogicalDevice creates logical device in core
-func (dMgr *DeviceManager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
log.Info("CreateLogicalDevice")
// Verify whether the logical device has already been created
if cDevice.ParentId != "" {
@@ -1120,7 +1125,7 @@
}
// DeleteLogicalDevice deletes logical device from core
-func (dMgr *DeviceManager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
log.Info("DeleteLogicalDevice")
var err error
if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(ctx, cDevice); err != nil {
@@ -1151,10 +1156,10 @@
}
// DeleteLogicalPorts removes the logical ports associated with that deviceId
-func (dMgr *DeviceManager) DeleteLogicalPorts(ctx context.Context, device *voltha.Device) error {
- log.Info("deleteLogicalPorts")
- if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, device.Id); err != nil {
- log.Warnw("deleteLogical-ports-error", log.Fields{"deviceId": device.Id})
+func (dMgr *DeviceManager) DeleteLogicalPorts(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
+ log.Debugw("delete-all-logical-ports", log.Fields{"device-id": cDevice.Id})
+ if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, cDevice.Id); err != nil {
+ log.Warnw("deleteLogical-ports-error", log.Fields{"deviceId": cDevice.Id})
return err
}
return nil
@@ -1180,7 +1185,7 @@
log.Warnw("failed-getting-device", log.Fields{"deviceId": parentDeviceID, "error": err})
return err
}
- return dMgr.DisableAllChildDevices(ctx, parentDevice)
+ return dMgr.DisableAllChildDevices(ctx, parentDevice, nil)
}
//childDevicesDetected is invoked by an adapter when child devices are found, typically after after a
@@ -1230,15 +1235,15 @@
*/
//DisableAllChildDevices is invoked as a callback when the parent device is disabled
-func (dMgr *DeviceManager) DisableAllChildDevices(ctx context.Context, parentDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DisableAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device, parentPrevDevice *voltha.Device) error {
log.Debug("DisableAllChildDevices")
var childDeviceIds []string
var err error
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
- return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentCurrDevice); err != nil {
+ return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
}
if len(childDeviceIds) == 0 {
- log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
+ log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentCurrDevice.Id})
}
allChildDisable := true
for _, childDeviceID := range childDeviceIds {
@@ -1256,15 +1261,15 @@
}
//DeleteAllChildDevices is invoked as a callback when the parent device is deleted
-func (dMgr *DeviceManager) DeleteAllChildDevices(ctx context.Context, parentDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DeleteAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device, parentPrevDevice *voltha.Device) error {
log.Debug("DeleteAllChildDevices")
var childDeviceIds []string
var err error
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
- return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentCurrDevice); err != nil {
+ return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
}
if len(childDeviceIds) == 0 {
- log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
+ log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentCurrDevice.Id})
}
allChildDeleted := true
for _, childDeviceID := range childDeviceIds {
@@ -1284,9 +1289,9 @@
}
//DeleteAllUNILogicalPorts is invoked as a callback when the parent device is deleted
-func (dMgr *DeviceManager) DeleteAllUNILogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
- log.Debugw("delete-all-uni-logical-ports", log.Fields{"parent-device-id": parentDevice.Id})
- if err := dMgr.logicalDeviceMgr.deleteAllUNILogicalPorts(ctx, parentDevice); err != nil {
+func (dMgr *DeviceManager) DeleteAllUNILogicalPorts(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error {
+ log.Debugw("delete-all-uni-logical-ports", log.Fields{"parent-device-id": curr.Id})
+ if err := dMgr.logicalDeviceMgr.deleteAllUNILogicalPorts(ctx, curr); err != nil {
return err
}
return nil
@@ -1325,7 +1330,7 @@
}
// SetupUNILogicalPorts creates UNI ports on the logical device that represents a child UNI interface
-func (dMgr *DeviceManager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
log.Info("addUNILogicalPort")
if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice); err != nil {
log.Warnw("addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
@@ -1438,22 +1443,15 @@
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
-// SetAdminStateToEnable sets admin state of device to enabled
-func (dMgr *DeviceManager) SetAdminStateToEnable(ctx context.Context, cDevice *voltha.Device) error {
- log.Info("SetAdminStateToEnable")
- if agent := dMgr.getDeviceAgent(ctx, cDevice.Id); agent != nil {
- return agent.updateAdminState(ctx, voltha.AdminState_ENABLED)
- }
- return status.Errorf(codes.NotFound, "%s", cDevice.Id)
-}
-
-// NotifyInvalidTransition notifies about invalid transition
-func (dMgr *DeviceManager) NotifyInvalidTransition(ctx context.Context, pcDevice *voltha.Device) error {
+func (dMgr *DeviceManager) NotifyInvalidTransition(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
log.Errorw("NotifyInvalidTransition", log.Fields{
- "device": pcDevice.Id,
- "adminState": pcDevice.AdminState,
- "operState": pcDevice.OperStatus,
- "connState": pcDevice.ConnectStatus,
+ "device": cDevice.Id,
+ "prev-admin-state": pDevice.AdminState,
+ "prev-oper-state": pDevice.OperStatus,
+ "prev-conn-state": pDevice.ConnectStatus,
+ "curr-admin-state": cDevice.AdminState,
+ "curr-oper-state": cDevice.OperStatus,
+ "curr-conn-state": cDevice.ConnectStatus,
})
//TODO: notify over kafka?
return nil
@@ -1488,7 +1486,7 @@
var res interface{}
if agent := dMgr.getDeviceAgent(ctx, simulatereq.Id); agent != nil {
res = agent.simulateAlarm(ctx, simulatereq)
- log.Debugw("SimulateAlarm-result", log.Fields{"result": res})
+ log.Debugw("simulateAlarm-result", log.Fields{"result": res})
}
//TODO CLI always get successful response
sendResponse(ctx, ch, res)
@@ -1528,11 +1526,11 @@
sendResponse(ctx, ch, res)
}
-// ChildDeviceLost calls parent adapter to delete child device and all its references
-func (dMgr *DeviceManager) ChildDeviceLost(ctx context.Context, cDevice *voltha.Device) error {
- log.Debugw("ChildDeviceLost", log.Fields{"deviceid": cDevice.Id})
- if parentAgent := dMgr.getDeviceAgent(ctx, cDevice.ParentId); parentAgent != nil {
- return parentAgent.ChildDeviceLost(ctx, cDevice)
+// childDeviceLost calls parent adapter to delete child device and all its references
+func (dMgr *DeviceManager) ChildDeviceLost(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error {
+ log.Debugw("childDeviceLost", log.Fields{"device-id": curr.Id})
+ if parentAgent := dMgr.getDeviceAgent(ctx, curr.ParentId); parentAgent != nil {
+ return parentAgent.ChildDeviceLost(ctx, curr)
}
- return status.Errorf(codes.NotFound, "%s", cDevice.Id)
+ return status.Errorf(codes.NotFound, "%s", curr.Id)
}