[VOL-2164] Update rw-core to use the Async Kafka API
This commit consists of the following:
1. Process per-device requests in the Core in the order they are
received. If there are lots of requests on a given device then
there will be some latencies introduced due to ordering. With
recent changes in the model along with keeping the request lock
to a minimal then these latencies are reduced. Testing did not
show and noticeable latencies.
2) Keep the request lock from the moment a request started
processing to the moment that request is sent to kafka (when
applicable). Adapter responses are received and processed
asynchronously. Therefore, an adapter can takes all the time it
needs to process a transaction. The Core still has a context
with timeout (configurable) to cater for cases where the adapter
does not return a response.
3) Adapter requests are processed to completion before sending a
reponse back to the adapter. Previously, in some cases, a
separate go routine was created to process the request and a
successful response is sent to the adapter. Now if the request
fails then the adapter will receive an error. The adapter
requests for a given device are therefore processed in the
order they are received.
4) Some changes are made when retrieving a handler to execute
a device state transition. This was necessary as there was some
transition overlap found.
Update after multiple reviews.
Change-Id: I55a189efec1549a662f2d71e18e6eca9015a3a17
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 42d628a..837e884 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -20,6 +20,8 @@
"context"
"encoding/hex"
"fmt"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"reflect"
"sync"
"time"
@@ -36,6 +38,10 @@
"google.golang.org/grpc/status"
)
+const (
+ maxOrderedDeviceRequestQueueSize = 1000
+)
+
// DeviceAgent represents device agent attributes
type DeviceAgent struct {
deviceID string
@@ -48,13 +54,16 @@
clusterDataProxy *model.Proxy
deviceProxy *model.Proxy
exitChannel chan int
- lockDevice sync.RWMutex
device *voltha.Device
- defaultTimeout int64
+ requestQueue *coreutils.RequestQueue
+ defaultTimeout time.Duration
+ startOnce sync.Once
+ stopOnce sync.Once
+ stopped bool
}
//newDeviceAgent creates a new device agent. The device will be initialized when start() is called.
-func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout int64) *DeviceAgent {
+func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout time.Duration) *DeviceAgent {
var agent DeviceAgent
agent.adapterProxy = ap
if device.Id == "" {
@@ -70,27 +79,38 @@
agent.adapterMgr = deviceMgr.adapterMgr
agent.exitChannel = make(chan int, 1)
agent.clusterDataProxy = cdProxy
- agent.lockDevice = sync.RWMutex{}
agent.defaultTimeout = timeout
agent.device = proto.Clone(device).(*voltha.Device)
+ agent.requestQueue = coreutils.NewRequestQueue(agent.deviceID, maxOrderedDeviceRequestQueueSize)
return &agent
}
-// start()
-// save the device to the data model and registers for callbacks on that device if deviceToCreate!=nil. Otherwise,
-// it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that
+// start() saves the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.
+// Otherwise, it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that
// was started.
func (agent *DeviceAgent) start(ctx context.Context, deviceToCreate *voltha.Device) (*voltha.Device, error) {
- var device *voltha.Device
+ needToStart := false
+ if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
+ return agent.getDevice(ctx)
+ }
+ var startSucceeded bool
+ defer func() {
+ if !startSucceeded {
+ if err := agent.stop(ctx); err != nil {
+ log.Errorw("failed-to-cleanup-after-unsuccessful-start", log.Fields{"device-id": agent.deviceID, "error": err})
+ }
+ }
+ }()
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceID})
+ // Start the request queue. If this start fails then stop will be invoked and it requires
+ // that the request sequencer is present
+ agent.requestQueue.Start()
+
+ var device *voltha.Device
if deviceToCreate == nil {
// Load the existing device
loadedDevice, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
if err != nil {
- log.Errorw("failed-to-get-from-cluster-data-proxy", log.Fields{"error": err})
return nil, err
}
if loadedDevice != nil {
@@ -99,14 +119,12 @@
agent.deviceType = device.Adapter
agent.device = proto.Clone(device).(*voltha.Device)
} else {
- log.Errorw("failed-to-convert-device", log.Fields{"deviceId": agent.deviceID})
return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
}
} else {
- log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceID})
- return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
+ return nil, status.Errorf(codes.NotFound, "device-%s-loading-failed", agent.deviceID)
}
- log.Debugw("device-loaded-from-dB", log.Fields{"deviceId": agent.deviceID})
+ log.Infow("device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
} else {
// Create a new device
// Assumption is that AdminState, FlowGroups, and Flows are unitialized since this
@@ -126,74 +144,129 @@
// Add the initial device to the local model
added, err := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceID, device, "")
if err != nil {
- log.Errorw("failed-to-save-devices-to-cluster-proxy", log.Fields{"error": err})
return nil, err
}
if added == nil {
- log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceID})
return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceID)
}
- agent.device = proto.Clone(device).(*voltha.Device)
+ agent.device = device
}
var err error
if agent.deviceProxy, err = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceID, false); err != nil {
- log.Errorw("failed-to-add-devices-to-cluster-proxy", log.Fields{"error": err})
return nil, err
}
agent.deviceProxy.RegisterCallback(model.PostUpdate, agent.processUpdate)
- log.Debugw("device-agent-started", log.Fields{"deviceId": agent.deviceID})
- return device, nil
+ startSucceeded = true
+ log.Debugw("device-agent-started", log.Fields{"device-id": agent.deviceID})
+
+ return agent.getDevice(ctx)
}
// stop stops the device agent. Not much to do for now
-func (agent *DeviceAgent) stop(ctx context.Context) {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+func (agent *DeviceAgent) stop(ctx context.Context) error {
+ needToStop := false
+ if agent.stopOnce.Do(func() { needToStop = true }); !needToStop {
+ return nil
+ }
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
- log.Debugw("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
+ log.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
// First unregister any callbacks
- agent.deviceProxy.UnregisterCallback(model.PostUpdate, agent.processUpdate)
+ if agent.deviceProxy != nil {
+ agent.deviceProxy.UnregisterCallback(model.PostUpdate, agent.processUpdate)
+ }
// Remove the device from the KV store
removed, err := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceID, "")
if err != nil {
- log.Errorw("Failed-to-remove-device-from-cluster-data-proxy", log.Fields{"error": err})
- return
+ return err
}
if removed == nil {
- log.Debugw("device-already-removed", log.Fields{"id": agent.deviceID})
+ log.Debugw("device-already-removed", log.Fields{"device-id": agent.deviceID})
}
- agent.exitChannel <- 1
- log.Debugw("device-agent-stopped", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
+
+ // Stop the request queue - no more requests can be processed
+ agent.requestQueue.Stop()
+
+ close(agent.exitChannel)
+
+ agent.stopped = true
+
+ log.Infow("device-agent-stopped", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
+
+ return nil
}
// Load the most recent state from the KVStore for the device.
func (agent *DeviceAgent) reconcileWithKVStore(ctx context.Context) {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ log.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "error": err})
+ return
+ }
+ defer agent.requestQueue.RequestComplete()
log.Debug("reconciling-device-agent-devicetype")
// TODO: context timeout
device, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
if err != nil {
- log.Errorw("Failed to get device info from cluster data proxy", log.Fields{"error": err})
+ log.Errorw("kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
return
}
if device != nil {
if d, ok := device.(*voltha.Device); ok {
agent.deviceType = d.Adapter
agent.device = proto.Clone(d).(*voltha.Device)
- log.Debugw("reconciled-device-agent-devicetype", log.Fields{"Id": agent.deviceID, "type": agent.deviceType})
+ log.Debugw("reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
}
}
}
+// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
+// and the only action required is to publish a successful result on kafka
+func (agent *DeviceAgent) onSuccess(rpc string, response interface{}, reqArgs ...interface{}) {
+ log.Debugw("response successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+ // TODO: Post success message onto kafka
+}
+
+// onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
+// and the only action required is to publish the failed result on kafka
+func (agent *DeviceAgent) onFailure(rpc string, response interface{}, reqArgs ...interface{}) {
+ if res, ok := response.(error); ok {
+ log.Errorw("rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
+ } else {
+ log.Errorw("rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+ }
+ // TODO: Post failure message onto kafka
+}
+
+func (agent *DeviceAgent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+ onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
+ defer cancel()
+ select {
+ case rpcResponse, ok := <-ch:
+ if !ok {
+ onFailure(rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+ } else if rpcResponse.Err != nil {
+ onFailure(rpc, rpcResponse.Err, reqArgs)
+ } else {
+ onSuccess(rpc, rpcResponse.Reply, reqArgs)
+ }
+ case <-ctx.Done():
+ onFailure(rpc, ctx.Err(), reqArgs)
+ }
+}
+
// getDevice returns the device data from cache
-func (agent *DeviceAgent) getDevice() *voltha.Device {
- agent.lockDevice.RLock()
- defer agent.lockDevice.RUnlock()
- return proto.Clone(agent.device).(*voltha.Device)
+func (agent *DeviceAgent) getDevice(ctx context.Context) (*voltha.Device, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ return proto.Clone(agent.device).(*voltha.Device), nil
}
// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
@@ -203,31 +276,32 @@
// enableDevice activates a preprovisioned or a disable device
func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("enableDevice", log.Fields{"id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ log.Debugw("enableDevice", log.Fields{"device-id": agent.deviceID})
cloned := agent.getDeviceWithoutLock()
// First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
- // pre-provisionned with the required adapter not registered. At this stage, since we need to communicate
+ // pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
// with the adapter then we need to know the adapter that will handle this request
adapterName, err := agent.adapterMgr.getAdapterName(cloned.Type)
if err != nil {
- log.Warnw("no-adapter-registered-for-device-type", log.Fields{"deviceType": cloned.Type, "deviceAdapter": cloned.Adapter})
return err
}
cloned.Adapter = adapterName
if cloned.AdminState == voltha.AdminState_ENABLED {
- log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceID})
+ log.Debugw("device-already-enabled", log.Fields{"device-id": agent.deviceID})
return nil
}
if cloned.AdminState == voltha.AdminState_DELETED {
// This is a temporary state when a device is deleted before it gets removed from the model.
err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-a-deleted-device: %s ", cloned.Id))
- log.Warnw("invalid-state", log.Fields{"id": agent.deviceID, "state": cloned.AdminState, "error": err})
return err
}
@@ -242,36 +316,38 @@
return err
}
- // Adopt the device if it was in preprovision state. In all other cases, try to reenable it.
+ // Adopt the device if it was in pre-provision state. In all other cases, try to re-enable it.
device := proto.Clone(cloned).(*voltha.Device)
+ var ch chan *kafka.RpcResponse
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
if previousAdminState == voltha.AdminState_PREPROVISIONED {
- if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
- log.Debugw("adoptDevice-error", log.Fields{"id": agent.deviceID, "error": err})
- return err
- }
+ ch, err = agent.adapterProxy.adoptDevice(subCtx, device)
} else {
- if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
- log.Debugw("renableDevice-error", log.Fields{"id": agent.deviceID, "error": err})
- return err
- }
+ ch, err = agent.adapterProxy.reEnableDevice(subCtx, device)
}
+ if err != nil {
+ cancel()
+ return err
+ }
+ // Wait for response
+ go agent.waitForAdapterResponse(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure)
return nil
}
-func (agent *DeviceAgent) sendBulkFlowsToAdapters(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
- if err := agent.adapterProxy.UpdateFlowsBulk(ctx, device, flows, groups, flowMetadata); err != nil {
- log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.deviceID, "error": err})
- response.Error(err)
+func (agent *DeviceAgent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
+ defer cancel()
+ select {
+ case rpcResponse, ok := <-ch:
+ if !ok {
+ response.Error(status.Errorf(codes.Aborted, "channel-closed"))
+ } else if rpcResponse.Err != nil {
+ response.Error(rpcResponse.Err)
+ } else {
+ response.Done()
+ }
+ case <-ctx.Done():
+ response.Error(ctx.Err())
}
- response.Done()
-}
-
-func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(ctx context.Context, device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
- if err := agent.adapterProxy.UpdateFlowsIncremental(ctx, device, flows, groups, flowMetadata); err != nil {
- log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.deviceID, "error": err})
- response.Error(err)
- }
- response.Done()
}
//deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice. This function will
@@ -334,17 +410,24 @@
}
func (agent *DeviceAgent) addFlowsAndGroupsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups, "flowMetadata": flowMetadata})
+ log.Debugw("add-flows-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups, "flow-metadata": flowMetadata})
if (len(newFlows) | len(newGroups)) == 0 {
- log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
+ log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
return coreutils.DoneResponse(), nil
}
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return coreutils.DoneResponse(), err
+ }
+ defer agent.requestQueue.RequestComplete()
device := agent.getDeviceWithoutLock()
+ dType := agent.adapterMgr.getDeviceType(device.Type)
+ if dType == nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
+
existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
@@ -356,29 +439,32 @@
// Sanity check
if (len(updatedAllFlows) | len(flowsToDelete) | len(updatedAllGroups) | len(groupsToDelete)) == 0 {
- log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
+ log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
return coreutils.DoneResponse(), nil
}
- // Send update to adapters
- // Create two channels to receive responses from the dB and from the adapters.
- // Do not close these channels as this function may exit on timeout before the dB or adapters get a chance
- // to send their responses. These channels will be garbage collected once all the responses are
- // received
- response := coreutils.NewResponse()
- dType := agent.adapterMgr.getDeviceType(device.Type)
- if dType == nil {
- log.Errorw("non-existent device type", log.Fields{"deviceType": device.Type})
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent device type %s", device.Type)
+ // store the changed data
+ device.Flows = &voltha.Flows{Items: updatedAllFlows}
+ device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
+ if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-device-%s", agent.deviceID)
}
- if !dType.AcceptsAddRemoveFlowUpdates {
+ // Send update to adapters
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
+ if !dType.AcceptsAddRemoveFlowUpdates {
if len(updatedAllGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedAllGroups) && len(updatedAllFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedAllFlows) {
- log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
+ log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
+ cancel()
return coreutils.DoneResponse(), nil
}
- go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata, response)
-
+ rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: newFlows},
@@ -389,16 +475,13 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
+ rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
}
-
- // store the changed data
- device.Flows = &voltha.Flows{Items: updatedAllFlows}
- device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
- if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
- }
-
return response, nil
}
@@ -409,25 +492,31 @@
if err != nil {
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
- log.Debugw("Failed to get response from adapter[or] DB", log.Fields{"result": res})
- return status.Errorf(codes.Aborted, "errors-%s", res)
+ if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); errs != nil {
+ log.Warnw("no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
+ return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
}
return nil
}
func (agent *DeviceAgent) deleteFlowsAndGroupsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+ log.Debugw("delete-flows-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
if (len(flowsToDel) | len(groupsToDel)) == 0 {
- log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+ log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
return coreutils.DoneResponse(), nil
}
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return coreutils.DoneResponse(), err
+ }
+ defer agent.requestQueue.RequestComplete()
device := agent.getDeviceWithoutLock()
+ dType := agent.adapterMgr.getDeviceType(device.Type)
+ if dType == nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
@@ -451,32 +540,41 @@
log.Debugw("deleteFlowsAndGroups",
log.Fields{
- "deviceId": agent.deviceID,
- "flowsToDel": len(flowsToDel),
- "flowsToKeep": len(flowsToKeep),
- "groupsToDel": len(groupsToDel),
- "groupsToKeep": len(groupsToKeep),
+ "device-id": agent.deviceID,
+ "flows-to-del": len(flowsToDel),
+ "flows-to-keep": len(flowsToKeep),
+ "groups-to-del": len(groupsToDel),
+ "groups-to-keep": len(groupsToKeep),
})
// Sanity check
if (len(flowsToKeep) | len(flowsToDel) | len(groupsToKeep) | len(groupsToDel)) == 0 {
- log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
+ log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows-to-del": flowsToDel, "groups-to-del": groupsToDel})
return coreutils.DoneResponse(), nil
}
- // Send update to adapters
- response := coreutils.NewResponse()
- dType := agent.adapterMgr.getDeviceType(device.Type)
- if dType == nil {
- log.Errorw("non-existent device type", log.Fields{"deviceType": device.Type})
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent device type %s", device.Type)
+ // store the changed data
+ device.Flows = &voltha.Flows{Items: flowsToKeep}
+ device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
+ if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
}
+
+ // Send update to adapters
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
+ cancel()
return coreutils.DoneResponse(), nil
}
- go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, response)
+ rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -487,18 +585,14 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDel},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
+ rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
}
-
- // store the changed data
- device.Flows = &voltha.Flows{Items: flowsToKeep}
- device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
- if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
- }
-
return response, nil
-
}
//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
@@ -515,43 +609,59 @@
}
func (agent *DeviceAgent) updateFlowsAndGroupsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- log.Debugw("updateFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+ log.Debugw("updateFlowsAndGroups", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
if (len(updatedFlows) | len(updatedGroups)) == 0 {
- log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+ log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
return coreutils.DoneResponse(), nil
}
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return coreutils.DoneResponse(), err
+ }
+ defer agent.requestQueue.RequestComplete()
device := agent.getDeviceWithoutLock()
+ if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
+ }
+ dType := agent.adapterMgr.getDeviceType(device.Type)
+ if dType == nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
- log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+ log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
return coreutils.DoneResponse(), nil
}
log.Debugw("updating-flows-and-groups",
log.Fields{
- "deviceId": agent.deviceID,
- "updatedFlows": updatedFlows,
- "updatedGroups": updatedGroups,
+ "device-id": agent.deviceID,
+ "updated-flows": updatedFlows,
+ "updated-groups": updatedGroups,
})
- response := coreutils.NewResponse()
- dType := agent.adapterMgr.getDeviceType(device.Type)
- if dType == nil {
- log.Errorw("non-existent device type", log.Fields{"deviceType": device.Type})
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent device type %s", device.Type)
+ // store the updated data
+ device.Flows = &voltha.Flows{Items: updatedFlows}
+ device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+ if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
}
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
- go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, response)
+ rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
} else {
var flowsToAdd []*ofp.OfpFlowStats
var flowsToDelete []*ofp.OfpFlowStats
@@ -584,16 +694,17 @@
log.Debugw("updating-flows-and-groups",
log.Fields{
- "deviceId": agent.deviceID,
- "flowsToAdd": flowsToAdd,
- "flowsToDelete": flowsToDelete,
- "groupsToAdd": groupsToAdd,
- "groupsToDelete": groupsToDelete,
+ "device-id": agent.deviceID,
+ "flows-to-add": flowsToAdd,
+ "flows-to-delete": flowsToDelete,
+ "groups-to-add": groupsToAdd,
+ "groups-to-delete": groupsToDelete,
})
// Sanity check
if (len(flowsToAdd) | len(flowsToDelete) | len(groupsToAdd) | len(groupsToDelete) | len(updatedGroups)) == 0 {
- log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+ log.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+ cancel()
return coreutils.DoneResponse(), nil
}
@@ -606,14 +717,12 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
}
- go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
- }
-
- // store the updated data
- device.Flows = &voltha.Flows{Items: updatedFlows}
- device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
- if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+ rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
}
return response, nil
@@ -634,9 +743,11 @@
//disableDevice disable a device
func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("disableDevice", log.Fields{"id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("disableDevice", log.Fields{"device-id": agent.deviceID})
cloned := agent.getDeviceWithoutLock()
@@ -646,7 +757,6 @@
}
if cloned.AdminState == voltha.AdminState_PREPROVISIONED ||
cloned.AdminState == voltha.AdminState_DELETED {
- log.Debugw("device-not-enabled", log.Fields{"id": agent.deviceID})
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
}
@@ -656,75 +766,75 @@
if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return err
}
- if err := agent.adapterProxy.DisableDevice(ctx, proto.Clone(cloned).(*voltha.Device)); err != nil {
- log.Debugw("disableDevice-error", log.Fields{"id": agent.deviceID, "error": err})
+
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.disableDevice(subCtx, proto.Clone(cloned).(*voltha.Device))
+ if err != nil {
+ cancel()
return err
}
- return nil
-}
+ go agent.waitForAdapterResponse(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure)
-func (agent *DeviceAgent) updateAdminState(ctx context.Context, adminState voltha.AdminState_Types) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("updateAdminState", log.Fields{"id": agent.deviceID})
-
- cloned := agent.getDeviceWithoutLock()
-
- if cloned.AdminState == adminState {
- log.Debugw("no-change-needed", log.Fields{"id": agent.deviceID, "state": adminState})
- return nil
- }
- // Received an Ack (no error found above). Now update the device in the model to the expected state
- cloned.AdminState = adminState
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
- return err
- }
return nil
}
func (agent *DeviceAgent) rebootDevice(ctx context.Context) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("rebootDevice", log.Fields{"id": agent.deviceID})
-
- device := agent.getDeviceWithoutLock()
- if err := agent.adapterProxy.RebootDevice(ctx, device); err != nil {
- log.Debugw("rebootDevice-error", log.Fields{"id": agent.deviceID, "error": err})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("rebootDevice", log.Fields{"device-id": agent.deviceID})
+
+ device := agent.getDeviceWithoutLock()
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.rebootDevice(subCtx, device)
+ if err != nil {
+ cancel()
+ return err
+ }
+ go agent.waitForAdapterResponse(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure)
return nil
}
func (agent *DeviceAgent) deleteDevice(ctx context.Context) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("deleteDevice", log.Fields{"id": agent.deviceID})
+ log.Debugw("deleteDevice", log.Fields{"device-id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
cloned := agent.getDeviceWithoutLock()
- if cloned.AdminState == voltha.AdminState_DELETED {
- log.Debugw("device-already-in-deleted-state", log.Fields{"id": agent.deviceID})
- return nil
- }
- if cloned.AdminState != voltha.AdminState_PREPROVISIONED {
- if err := agent.adapterProxy.DeleteDevice(ctx, cloned); err != nil {
- log.Debugw("deleteDevice-error", log.Fields{"id": agent.deviceID, "error": err})
- return err
- }
- }
- //Set the state to deleted - this will trigger some background process to invoke parent adapter to delete child
- //device and clean up the device as well as its association with the logical device
- cloned.AdminState = voltha.AdminState_DELETED
+ previousState := cloned.AdminState
+
+ // No check is required when deleting a device. Changing the state to DELETE will trigger the removal of this
+ // device by the state machine
+ cloned.AdminState = voltha.AdminState_DELETED
if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return err
}
+
+ // If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
+ // adapter
+ if previousState != ic.AdminState_PREPROVISIONED {
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.deleteDevice(subCtx, cloned)
+ if err != nil {
+ cancel()
+ return err
+ }
+ go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onSuccess, agent.onFailure)
+ }
return nil
}
func (agent *DeviceAgent) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("setParentId", log.Fields{"deviceId": device.Id, "parentId": parentID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ log.Debugw("setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
cloned := agent.getDeviceWithoutLock()
cloned.ParentId = parentID
@@ -732,13 +842,16 @@
if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return err
}
+
return nil
}
func (agent *DeviceAgent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("updatePmConfigs", log.Fields{"id": pmConfigs.Id})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
cloned := agent.getDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
@@ -747,50 +860,58 @@
return err
}
// Send the request to the adapter
- if err := agent.adapterProxy.UpdatePmConfigs(ctx, cloned, pmConfigs); err != nil {
- log.Errorw("update-pm-configs-error", log.Fields{"id": agent.deviceID, "error": err})
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.updatePmConfigs(subCtx, cloned, pmConfigs)
+ if err != nil {
+ cancel()
return err
}
+ go agent.waitForAdapterResponse(subCtx, cancel, "updatePmConfigs", ch, agent.onSuccess, agent.onFailure)
return nil
}
func (agent *DeviceAgent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("initPmConfigs", log.Fields{"id": pmConfigs.Id})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
cloned := agent.getDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
- // Store the device
updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, cloned, false, "")
if err != nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceID)
+ return err
}
if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceID)
+ return status.Errorf(codes.Internal, "pm-kv-update-failed-for-device-id-%s", agent.deviceID)
}
return nil
}
func (agent *DeviceAgent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
- agent.lockDevice.RLock()
- defer agent.lockDevice.RUnlock()
- log.Debugw("listPmConfigs", log.Fields{"id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("listPmConfigs", log.Fields{"device-id": agent.deviceID})
return agent.getDeviceWithoutLock().PmConfigs, nil
}
func (agent *DeviceAgent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("downloadImage", log.Fields{"id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ log.Debugw("downloadImage", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceWithoutLock()
if device.AdminState != voltha.AdminState_ENABLED {
- log.Debugw("device-not-enabled", log.Fields{"id": agent.deviceID})
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
+ return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
}
// Save the image
clonedImg := proto.Clone(img).(*voltha.ImageDownload)
@@ -817,10 +938,13 @@
return nil, err
}
// Send the request to the adapter
- if err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg); err != nil {
- log.Debugw("downloadImage-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.downloadImage(ctx, cloned, clonedImg)
+ if err != nil {
+ cancel()
return nil, err
}
+ go agent.waitForAdapterResponse(subCtx, cancel, "downloadImage", ch, agent.onSuccess, agent.onFailure)
}
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
@@ -836,15 +960,18 @@
}
func (agent *DeviceAgent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("cancelImageDownload", log.Fields{"id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ log.Debugw("cancelImageDownload", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceWithoutLock()
// Verify whether the Image is in the list of image being downloaded
if !isImageRegistered(img, device) {
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+ return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
}
// Update image download state
@@ -861,28 +988,32 @@
if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
return nil, err
}
- // Send the request to the adapter
- if err := agent.adapterProxy.CancelImageDownload(ctx, device, img); err != nil {
- log.Debugw("cancelImageDownload-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.cancelImageDownload(subCtx, device, img)
+ if err != nil {
+ cancel()
return nil, err
}
+ go agent.waitForAdapterResponse(subCtx, cancel, "cancelImageDownload", ch, agent.onSuccess, agent.onFailure)
}
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
func (agent *DeviceAgent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("activateImage", log.Fields{"id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("activateImage", log.Fields{"device-id": agent.deviceID})
cloned := agent.getDeviceWithoutLock()
// Verify whether the Image is in the list of image being downloaded
if !isImageRegistered(img, cloned) {
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+ return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
}
if cloned.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
+ return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
}
// Update image download state
for _, image := range cloned.ImageDownloads {
@@ -896,19 +1027,25 @@
return nil, err
}
- if err := agent.adapterProxy.ActivateImageUpdate(ctx, proto.Clone(cloned).(*voltha.Device), img); err != nil {
- log.Debugw("activateImage-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.activateImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+ if err != nil {
+ cancel()
return nil, err
}
+ go agent.waitForAdapterResponse(subCtx, cancel, "activateImageUpdate", ch, agent.onSuccess, agent.onFailure)
+
// The status of the AdminState will be changed following the update_download_status response from the adapter
// The image name will also be removed from the device list
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
func (agent *DeviceAgent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("revertImage", log.Fields{"id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("revertImage", log.Fields{"device-id": agent.deviceID})
cloned := agent.getDeviceWithoutLock()
@@ -931,31 +1068,51 @@
return nil, err
}
- if err := agent.adapterProxy.RevertImageUpdate(ctx, proto.Clone(cloned).(*voltha.Device), img); err != nil {
- log.Debugw("revertImage-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.revertImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+ if err != nil {
+ cancel()
return nil, err
}
+ go agent.waitForAdapterResponse(subCtx, cancel, "revertImageUpdate", ch, agent.onSuccess, agent.onFailure)
+
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
func (agent *DeviceAgent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("getImageDownloadStatus", log.Fields{"id": agent.deviceID})
+ log.Debugw("getImageDownloadStatus", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
- resp, err := agent.adapterProxy.GetImageDownloadStatus(ctx, cloned, img)
- if err != nil {
- log.Debugw("getImageDownloadStatus-error", log.Fields{"id": agent.deviceID, "error": err, "image": img.Name})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- return resp, nil
+ device := agent.getDeviceWithoutLock()
+ ch, err := agent.adapterProxy.getImageDownloadStatus(ctx, device, img)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+ // Wait for the adapter response
+ rpcResponse, ok := <-ch
+ if !ok {
+ return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+ }
+ if rpcResponse.Err != nil {
+ return nil, rpcResponse.Err
+ }
+ // Successful response
+ imgDownload := &voltha.ImageDownload{}
+ if err := ptypes.UnmarshalAny(rpcResponse.Reply, imgDownload); err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ }
+ return imgDownload, nil
}
func (agent *DeviceAgent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("updateImageDownload", log.Fields{"id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
cloned := agent.getDeviceWithoutLock()
@@ -983,9 +1140,11 @@
}
func (agent *DeviceAgent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- agent.lockDevice.RLock()
- defer agent.lockDevice.RUnlock()
- log.Debugw("getImageDownload", log.Fields{"id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("getImageDownload", log.Fields{"device-id": agent.deviceID})
cloned := agent.getDeviceWithoutLock()
for _, image := range cloned.ImageDownloads {
@@ -997,16 +1156,18 @@
}
func (agent *DeviceAgent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
- agent.lockDevice.RLock()
- defer agent.lockDevice.RUnlock()
- log.Debugw("listImageDownloads", log.Fields{"id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("listImageDownloads", log.Fields{"device-id": agent.deviceID})
return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock().ImageDownloads}, nil
}
// getPorts retrieves the ports information of the device based on the port type.
func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
- log.Debugw("getPorts", log.Fields{"id": agent.deviceID, "portType": portType})
+ log.Debugw("getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
ports := &voltha.Ports{}
if device, _ := agent.deviceMgr.GetDevice(ctx, agent.deviceID); device != nil {
for _, port := range device.Ports {
@@ -1018,38 +1179,81 @@
return ports
}
-// getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
-// parent device
+// getSwitchCapability retrieves the switch capability of a parent device
func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
- log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceID})
- device, err := agent.deviceMgr.GetDevice(ctx, agent.deviceID)
- if device == nil {
+ log.Debugw("getSwitchCapability", log.Fields{"device-id": agent.deviceID})
+
+ cloned, err := agent.getDevice(ctx)
+ if err != nil {
return nil, err
}
- var switchCap *ic.SwitchCapability
- if switchCap, err = agent.adapterProxy.GetOfpDeviceInfo(ctx, device); err != nil {
- log.Debugw("getSwitchCapability-error", log.Fields{"id": device.Id, "error": err})
+ ch, err := agent.adapterProxy.getOfpDeviceInfo(ctx, cloned)
+ if err != nil {
+ return nil, err
+ }
+
+ // Wait for adapter response
+ rpcResponse, ok := <-ch
+ if !ok {
+ return nil, status.Errorf(codes.Aborted, "channel-closed")
+ }
+ if rpcResponse.Err != nil {
+ return nil, rpcResponse.Err
+ }
+ // Successful response
+ switchCap := &ic.SwitchCapability{}
+ if err := ptypes.UnmarshalAny(rpcResponse.Reply, switchCap); err != nil {
return nil, err
}
return switchCap, nil
}
-// getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
-// device
+// getPortCapability retrieves the port capability of a device
func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
- log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceID})
- device, err := agent.deviceMgr.GetDevice(ctx, agent.deviceID)
- if device == nil {
+ log.Debugw("getPortCapability", log.Fields{"device-id": agent.deviceID})
+ device, err := agent.getDevice(ctx)
+ if err != nil {
return nil, err
}
- var portCap *ic.PortCapability
- if portCap, err = agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo); err != nil {
- log.Debugw("getPortCapability-error", log.Fields{"id": device.Id, "error": err})
+ ch, err := agent.adapterProxy.getOfpPortInfo(ctx, device, portNo)
+ if err != nil {
return nil, err
}
+ // Wait for adapter response
+ rpcResponse, ok := <-ch
+ if !ok {
+ return nil, status.Errorf(codes.Aborted, "channel-closed")
+ }
+ if rpcResponse.Err != nil {
+ return nil, rpcResponse.Err
+ }
+ // Successful response
+ portCap := &ic.PortCapability{}
+ if err := ptypes.UnmarshalAny(rpcResponse.Reply, portCap); err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ }
return portCap, nil
}
+func (agent *DeviceAgent) onPacketFailure(rpc string, response interface{}, args ...interface{}) {
+ // packet data is encoded in the args param as the first parameter
+ var packet []byte
+ if len(args) >= 1 {
+ if pkt, ok := args[0].([]byte); ok {
+ packet = pkt
+ }
+ }
+ var errResp error
+ if err, ok := response.(error); ok {
+ errResp = err
+ }
+ log.Warnw("packet-out-error", log.Fields{
+ "device-id": agent.deviceID,
+ "error": errResp,
+ "packet": hex.EncodeToString(packet),
+ })
+}
+
func (agent *DeviceAgent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
// If deviceType=="" then we must have taken ownership of this device.
// Fixes VOL-2226 where a core would take ownership and have stale data
@@ -1057,20 +1261,19 @@
agent.reconcileWithKVStore(ctx)
}
// Send packet to adapter
- if err := agent.adapterProxy.packetOut(ctx, agent.deviceType, agent.deviceID, outPort, packet); err != nil {
- log.Debugw("packet-out-error", log.Fields{
- "id": agent.deviceID,
- "error": err,
- "packet": hex.EncodeToString(packet.Data),
- })
- return err
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.packetOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
+ if err != nil {
+ cancel()
+ return nil
}
+ go agent.waitForAdapterResponse(subCtx, cancel, "packetOut", ch, agent.onSuccess, agent.onPacketFailure, packet.Data)
return nil
}
-// processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
+// processUpdate is a respCallback invoked whenever there is a change on the device manages by this device agent
func (agent *DeviceAgent) processUpdate(ctx context.Context, args ...interface{}) interface{} {
- //// Run this callback in its own go routine
+ //// Run this respCallback in its own go routine
go func(args ...interface{}) interface{} {
var previous *voltha.Device
var current *voltha.Device
@@ -1088,10 +1291,10 @@
log.Errorw("too-many-args-in-callback", log.Fields{"len": len(args)})
return nil
}
- // Perform the state transition in it's own go routine (since the caller doesn't wait for this, use a background context)
+ // Perform the state transition in it's own go routine
if err := agent.deviceMgr.processTransition(context.Background(), previous, current); err != nil {
- log.Errorw("failed-process-transition", log.Fields{"deviceId": previous.Id,
- "previousAdminState": previous.AdminState, "currentAdminState": current.AdminState})
+ log.Errorw("failed-process-transition", log.Fields{"device-id": previous.Id,
+ "previous-admin-state": previous.AdminState, "current-admin-state": current.AdminState})
}
return nil
}(args...)
@@ -1112,13 +1315,16 @@
cloned.Reason = device.Reason
return cloned, nil
}
+
func (agent *DeviceAgent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("updateDeviceUsingAdapterData", log.Fields{"deviceId": device.Id})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+
updatedDevice, err := agent.mergeDeviceInfoFromAdapter(device)
if err != nil {
- log.Errorw("failed to update device ", log.Fields{"deviceId": device.Id})
return status.Errorf(codes.Internal, "%s", err.Error())
}
cloned := proto.Clone(updatedDevice).(*voltha.Device)
@@ -1127,13 +1333,16 @@
func (agent *DeviceAgent) updateDeviceWithoutLock(ctx context.Context, device *voltha.Device) error {
log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
- cloned := proto.Clone(device).(*voltha.Device)
+ //cloned := proto.Clone(device).(*voltha.Device)
+ cloned := device
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
func (agent *DeviceAgent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
cloned := agent.getDeviceWithoutLock()
@@ -1153,8 +1362,10 @@
func (agent *DeviceAgent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
log.Debugw("updatePortsOperState", log.Fields{"device-id": agent.deviceID})
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
cloned := agent.getDeviceWithoutLock()
for _, port := range cloned.Ports {
port.OperStatus = operStatus
@@ -1164,8 +1375,10 @@
}
func (agent *DeviceAgent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
// Work only on latest data
// TODO: Get list of ports from device directly instead of the entire device
cloned := agent.getDeviceWithoutLock()
@@ -1186,8 +1399,10 @@
func (agent *DeviceAgent) deleteAllPorts(ctx context.Context) error {
log.Debugw("deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
cloned := agent.getDeviceWithoutLock()
@@ -1208,9 +1423,11 @@
}
func (agent *DeviceAgent) addPort(ctx context.Context, port *voltha.Port) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("addPort", log.Fields{"deviceId": agent.deviceID, "port": port})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ log.Debugw("addPort", log.Fields{"deviceId": agent.deviceID})
cloned := agent.getDeviceWithoutLock()
updatePort := false
@@ -1245,8 +1462,10 @@
}
func (agent *DeviceAgent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
log.Debugw("adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
cloned := agent.getDeviceWithoutLock()
@@ -1279,30 +1498,13 @@
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) deletePeerPorts(ctx context.Context, deviceID string) error {
- log.Debug("deletePeerPorts")
-
- cloned := agent.getDeviceWithoutLock()
-
- var updatedPeers []*voltha.Port_PeerPort
- for _, port := range cloned.Ports {
- updatedPeers = make([]*voltha.Port_PeerPort, 0)
- for _, peerPort := range port.Peers {
- if peerPort.DeviceId != deviceID {
- updatedPeers = append(updatedPeers, peerPort)
- }
- }
- port.Peers = updatedPeers
- }
-
- // Store the device with updated peer ports
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
// TODO: A generic device update by attribute
func (agent *DeviceAgent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ log.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
+ return
+ }
+ defer agent.requestQueue.RequestComplete()
if value == nil {
return
}
@@ -1336,17 +1538,21 @@
}
func (agent *DeviceAgent) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
log.Debugw("simulateAlarm", log.Fields{"id": agent.deviceID})
cloned := agent.getDeviceWithoutLock()
- // First send the request to an Adapter and wait for a response
- if err := agent.adapterProxy.SimulateAlarm(ctx, cloned, simulatereq); err != nil {
- log.Debugw("simulateAlarm-error", log.Fields{"id": agent.deviceID, "error": err})
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.simulateAlarm(subCtx, cloned, simulatereq)
+ if err != nil {
+ cancel()
return err
}
+ go agent.waitForAdapterResponse(subCtx, cancel, "simulateAlarm", ch, agent.onSuccess, agent.onFailure)
return nil
}
@@ -1369,8 +1575,10 @@
}
func (agent *DeviceAgent) updateDeviceReason(ctx context.Context, reason string) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
cloned := agent.getDeviceWithoutLock()
cloned.Reason = reason
@@ -1380,10 +1588,12 @@
}
func (agent *DeviceAgent) disablePort(ctx context.Context, Port *voltha.Port) error {
- var cp *voltha.Port
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
log.Debugw("disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+ var cp *voltha.Port
// Get the most up to date the device info
device := agent.getDeviceWithoutLock()
for _, port := range device.Ports {
@@ -1406,19 +1616,26 @@
log.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
return err
}
+
//send request to adapter
- if err := agent.adapterProxy.disablePort(ctx, device, cp); err != nil {
- log.Debugw("DisablePort-error", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.disablePort(ctx, device, cp)
+ if err != nil {
+ cancel()
return err
}
+ go agent.waitForAdapterResponse(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure)
return nil
}
func (agent *DeviceAgent) enablePort(ctx context.Context, Port *voltha.Port) error {
- var cp *voltha.Port
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
log.Debugw("enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+
+ var cp *voltha.Port
// Get the most up to date the device info
device := agent.getDeviceWithoutLock()
for _, port := range device.Ports {
@@ -1442,28 +1659,47 @@
return err
}
//send request to adapter
- if err := agent.adapterProxy.enablePort(ctx, device, cp); err != nil {
- log.Debugw("EnablePort-error", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.enablePort(ctx, device, cp)
+ if err != nil {
+ cancel()
return err
}
+ go agent.waitForAdapterResponse(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure)
return nil
}
func (agent *DeviceAgent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("ChildDeviceLost", log.Fields{"id": device.Id})
+ log.Debugw("childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-ud": agent.deviceID})
//Remove the associated peer ports on the parent device
- if err := agent.deviceMgr.deletePeerPorts(ctx, device.ParentId, device.Id); err != nil {
- // At this stage, the parent device may also have been deleted. Just log and keep processing.
- log.Warnw("failure-deleting-peer-port", log.Fields{"error": err, "child-device-id": device.Id, "parent-device-id": device.ParentId})
+ parentDevice := agent.getDeviceWithoutLock()
+ var updatedPeers []*voltha.Port_PeerPort
+ for _, port := range parentDevice.Ports {
+ updatedPeers = make([]*voltha.Port_PeerPort, 0)
+ for _, peerPort := range port.Peers {
+ if peerPort.DeviceId != device.Id {
+ updatedPeers = append(updatedPeers, peerPort)
+ }
+ }
+ port.Peers = updatedPeers
+ }
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, parentDevice, false, ""); err != nil {
+ return err
}
- if err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId); err != nil {
- log.Warnw("ChildDeviceLost-error", log.Fields{"error": err})
+ //send request to adapter
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.childDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId)
+ if err != nil {
+ cancel()
+ return err
}
+ go agent.waitForAdapterResponse(subCtx, cancel, "childDeviceLost", ch, agent.onSuccess, agent.onFailure)
return nil
-
}