VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability
- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism
Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 53faec8..70349d8 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -32,6 +32,7 @@
"google.golang.org/grpc/status"
"reflect"
"sync"
+ "time"
)
type LogicalDeviceAgent struct {
@@ -103,7 +104,7 @@
agent.lockLogicalDevice.Lock()
// Save the logical device
- if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
+ if added := agent.clusterDataProxy.AddWithID(ctx, "/logical_devices", ld.Id, ld, ""); added == nil {
log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
} else {
log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -135,12 +136,15 @@
defer agent.lockLogicalDevice.Unlock()
agent.flowProxy = agent.clusterDataProxy.CreateProxy(
+ ctx,
fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
false)
agent.groupProxy = agent.clusterDataProxy.CreateProxy(
+ ctx,
fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceId),
false)
agent.ldProxy = agent.clusterDataProxy.CreateProxy(
+ ctx,
fmt.Sprintf("/logical_devices/%s", agent.logicalDeviceId),
false)
@@ -162,7 +166,7 @@
defer agent.lockLogicalDevice.Unlock()
//Remove the logical device from the model
- if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
+ if removed := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
} else {
log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -176,7 +180,7 @@
log.Debug("GetLogicalDevice")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice, nil
}
@@ -187,7 +191,7 @@
log.Debug("ListLogicalDeviceFlows")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
cFlows := (proto.Clone(lDevice.Flows)).(*ofp.Flows)
return cFlows, nil
@@ -199,7 +203,7 @@
log.Debug("ListLogicalDeviceFlowGroups")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
cFlowGroups := (proto.Clone(lDevice.FlowGroups)).(*ofp.FlowGroups)
return cFlowGroups, nil
@@ -211,7 +215,7 @@
log.Debug("ListLogicalDevicePorts")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
lPorts := make([]*voltha.LogicalPort, 0)
for _, port := range lDevice.Ports {
@@ -227,7 +231,7 @@
log.Debug("listFlows")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice.Flows.Items
}
@@ -239,7 +243,7 @@
log.Debug("listFlowGroups")
agent.lockLogicalDevice.RLock()
defer agent.lockLogicalDevice.RUnlock()
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
return lDevice.FlowGroups.Items
}
@@ -248,7 +252,8 @@
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
- afterUpdate := agent.flowProxy.Update("/", flows, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.flowProxy.Update(updateCtx, "/", flows, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
}
@@ -257,7 +262,8 @@
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
- afterUpdate := agent.groupProxy.Update("/", flowGroups, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.groupProxy.Update(updateCtx, "/", flowGroups, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
}
@@ -268,7 +274,7 @@
// functions that have already acquired the logical device lock to the model
func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
log.Debug("getLogicalDeviceWithoutLock")
- logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 0, false, "")
+ logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceId, 0, false, "")
if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
//log.Debug("getLogicalDeviceWithoutLock", log.Fields{"ldevice": lDevice})
return lDevice, nil
@@ -466,7 +472,8 @@
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
- afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, logicalDevice, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/logical_devices/"+agent.logicalDeviceId, logicalDevice, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
}
@@ -1187,6 +1194,7 @@
// Set the proxy and callback for that port
agent.portProxiesLock.Lock()
agent.portProxies[port.Id] = agent.clusterDataProxy.CreateProxy(
+ context.Background(),
fmt.Sprintf("/logical_devices/%s/ports/%s", agent.logicalDeviceId, port.Id),
false)
agent.portProxies[port.Id].RegisterCallback(model.POST_UPDATE, agent.portUpdated)