VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability
- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism
Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/db/model/proxy_test.go b/db/model/proxy_test.go
index f583b99..3f65997 100644
--- a/db/model/proxy_test.go
+++ b/db/model/proxy_test.go
@@ -16,6 +16,7 @@
package model
import (
+ "context"
"encoding/hex"
"encoding/json"
"github.com/golang/protobuf/proto"
@@ -53,9 +54,9 @@
//log.AddPackage(log.JSON, log.InfoLevel, log.Fields{"instanceId": "DB_MODEL"})
//log.UpdateAllLoggers(log.Fields{"instanceId": "PROXY_LOAD_TEST"})
TestProxy_Root = NewRoot(&voltha.Voltha{}, nil)
- TestProxy_Root_LogicalDevice = TestProxy_Root.CreateProxy("/", false)
- TestProxy_Root_Device = TestProxy_Root.CreateProxy("/", false)
- TestProxy_Root_Adapter = TestProxy_Root.CreateProxy("/", false)
+ TestProxy_Root_LogicalDevice = TestProxy_Root.CreateProxy(context.Background(), "/", false)
+ TestProxy_Root_Device = TestProxy_Root.CreateProxy(context.Background(), "/", false)
+ TestProxy_Root_Adapter = TestProxy_Root.CreateProxy(context.Background(), "/", false)
TestProxy_LogicalPorts = []*voltha.LogicalPort{
{
@@ -115,7 +116,7 @@
postAddExecuted := make(chan struct{})
preAddExecutedPtr, postAddExecutedPtr := preAddExecuted, postAddExecuted
- devicesProxy := TestProxy_Root.node.CreateProxy("/devices", false)
+ devicesProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/devices", false)
devicesProxy.RegisterCallback(PRE_ADD, commonCallback2, "PRE_ADD Device container changes")
devicesProxy.RegisterCallback(POST_ADD, commonCallback2, "POST_ADD Device container changes")
@@ -123,7 +124,7 @@
TestProxy_Root_Device.RegisterCallback(PRE_ADD, commonChanCallback, "PRE_ADD instructions", &preAddExecutedPtr)
TestProxy_Root_Device.RegisterCallback(POST_ADD, commonChanCallback, "POST_ADD instructions", &postAddExecutedPtr)
- if added := TestProxy_Root_Device.Add("/devices", TestProxy_Device, ""); added == nil {
+ if added := TestProxy_Root_Device.Add(context.Background(), "/devices", TestProxy_Device, ""); added == nil {
t.Error("Failed to add device")
} else {
t.Logf("Added device : %+v", added)
@@ -137,7 +138,7 @@
}
// Verify that the added device can now be retrieved
- if d := TestProxy_Root_Device.Get("/devices/"+TestProxy_DeviceId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find added device")
} else {
djson, _ := json.Marshal(d)
@@ -148,7 +149,7 @@
func TestProxy_1_1_2_Add_ExistingDevice(t *testing.T) {
TestProxy_Device.Id = TestProxy_DeviceId
- added := TestProxy_Root_Device.Add("/devices", TestProxy_Device, "")
+ added := TestProxy_Root_Device.Add(context.Background(), "/devices", TestProxy_Device, "")
if added.(proto.Message).String() != reflect.ValueOf(TestProxy_Device).Interface().(proto.Message).String() {
t.Errorf("Devices don't match - existing: %+v returned: %+v", TestProxy_LogicalDevice, added)
}
@@ -180,7 +181,7 @@
TestProxy_Root_Adapter.RegisterCallback(POST_ADD, commonChanCallback, "POST_ADD instructions for adapters", &postAddExecutedPtr)
// Add the adapter
- if added := TestProxy_Root_Adapter.Add("/adapters", TestProxy_Adapter, ""); added == nil {
+ if added := TestProxy_Root_Adapter.Add(context.Background(), "/adapters", TestProxy_Adapter, ""); added == nil {
t.Error("Failed to add adapter")
} else {
t.Logf("Added adapter : %+v", added)
@@ -189,7 +190,7 @@
verifyGotResponse(postAddExecuted)
// Verify that the added device can now be retrieved
- if d := TestProxy_Root_Adapter.Get("/adapters/"+TestProxy_AdapterId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find added adapter")
} else {
djson, _ := json.Marshal(d)
@@ -205,7 +206,7 @@
}
func TestProxy_1_2_1_Get_AllDevices(t *testing.T) {
- devices := TestProxy_Root_Device.Get("/devices", 1, false, "")
+ devices := TestProxy_Root_Device.Get(context.Background(), "/devices", 1, false, "")
if len(devices.([]interface{})) == 0 {
t.Error("there are no available devices to retrieve")
@@ -217,7 +218,7 @@
}
func TestProxy_1_2_2_Get_SingleDevice(t *testing.T) {
- if d := TestProxy_Root_Device.Get("/devices/"+TestProxy_TargetDeviceId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 0, false, ""); !reflect.ValueOf(d).IsValid() {
t.Errorf("Failed to find device : %s", TestProxy_TargetDeviceId)
} else {
djson, _ := json.Marshal(d)
@@ -232,7 +233,7 @@
postUpdateExecuted := make(chan struct{})
preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
- if retrieved := TestProxy_Root_Device.Get("/devices/"+TestProxy_TargetDeviceId, 1, false, ""); retrieved == nil {
+ if retrieved := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 1, false, ""); retrieved == nil {
t.Error("Failed to get device")
} else {
t.Logf("Found raw device (root proxy): %+v", retrieved)
@@ -257,7 +258,7 @@
"POST_UPDATE instructions (root proxy)", &postUpdateExecutedPtr,
)
- if afterUpdate := TestProxy_Root_Device.Update("/devices/"+TestProxy_TargetDeviceId, retrieved, false, ""); afterUpdate == nil {
+ if afterUpdate := TestProxy_Root_Device.Update(context.Background(), "/devices/"+TestProxy_TargetDeviceId, retrieved, false, ""); afterUpdate == nil {
t.Error("Failed to update device")
} else {
t.Logf("Updated device : %+v", afterUpdate)
@@ -270,7 +271,7 @@
t.Error("POST_UPDATE callback was not executed")
}
- if d := TestProxy_Root_Device.Get("/devices/"+TestProxy_TargetDeviceId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_TargetDeviceId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated device (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -281,8 +282,8 @@
func TestProxy_1_3_2_Update_DeviceFlows(t *testing.T) {
// Get a device proxy and update a specific port
- devFlowsProxy := TestProxy_Root.node.CreateProxy("/devices/"+TestProxy_DeviceId+"/flows", false)
- flows := devFlowsProxy.Get("/", 0, false, "")
+ devFlowsProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/devices/"+TestProxy_DeviceId+"/flows", false)
+ flows := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
flows.(*openflow_13.Flows).Items[0].TableId = 2244
preUpdateExecuted := make(chan struct{})
@@ -300,13 +301,13 @@
"POST_UPDATE instructions (flows proxy)", &postUpdateExecutedPtr,
)
- kvFlows := devFlowsProxy.Get("/", 0, false, "")
+ kvFlows := devFlowsProxy.Get(context.Background(), "/", 0, false, "")
if reflect.DeepEqual(flows, kvFlows) {
t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
}
- if updated := devFlowsProxy.Update("/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ if updated := devFlowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, ""); updated == nil {
t.Error("Failed to update flow")
} else {
t.Logf("Updated flows : %+v", updated)
@@ -319,14 +320,14 @@
t.Error("POST_UPDATE callback was not executed")
}
- if d := devFlowsProxy.Get("/", 0, false, ""); d == nil {
+ if d := devFlowsProxy.Get(context.Background(), "/", 0, false, ""); d == nil {
t.Error("Failed to find updated flows (flows proxy)")
} else {
djson, _ := json.Marshal(d)
t.Logf("Found flows (flows proxy): %s", string(djson))
}
- if d := TestProxy_Root_Device.Get("/devices/"+TestProxy_DeviceId+"/flows", 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId+"/flows", 1, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated flows (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -339,9 +340,9 @@
postUpdateExecuted := make(chan struct{})
preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
- adaptersProxy := TestProxy_Root.node.CreateProxy("/adapters", false)
+ adaptersProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/adapters", false)
- if retrieved := TestProxy_Root_Adapter.Get("/adapters/"+TestProxy_AdapterId, 1, false, ""); retrieved == nil {
+ if retrieved := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 1, false, ""); retrieved == nil {
t.Error("Failed to get adapter")
} else {
t.Logf("Found raw adapter (root proxy): %+v", retrieved)
@@ -359,7 +360,7 @@
"POST_UPDATE instructions for adapters", &postUpdateExecutedPtr,
)
- if afterUpdate := adaptersProxy.Update("/"+TestProxy_AdapterId, retrieved, false, ""); afterUpdate == nil {
+ if afterUpdate := adaptersProxy.Update(context.Background(), "/"+TestProxy_AdapterId, retrieved, false, ""); afterUpdate == nil {
t.Error("Failed to update adapter")
} else {
t.Logf("Updated adapter : %+v", afterUpdate)
@@ -372,7 +373,7 @@
t.Error("POST_UPDATE callback for adapter was not executed")
}
- if d := TestProxy_Root_Adapter.Get("/adapters/"+TestProxy_AdapterId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := TestProxy_Root_Adapter.Get(context.Background(), "/adapters/"+TestProxy_AdapterId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated adapter (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -397,7 +398,7 @@
"POST_REMOVE instructions (root proxy)", &postRemoveExecutedPtr,
)
- if removed := TestProxy_Root_Device.Remove("/devices/"+TestProxy_DeviceId, ""); removed == nil {
+ if removed := TestProxy_Root_Device.Remove(context.Background(), "/devices/"+TestProxy_DeviceId, ""); removed == nil {
t.Error("Failed to remove device")
} else {
t.Logf("Removed device : %+v", removed)
@@ -410,7 +411,7 @@
t.Error("POST_REMOVE callback was not executed")
}
- if d := TestProxy_Root_Device.Get("/devices/"+TestProxy_DeviceId, 0, false, ""); reflect.ValueOf(d).IsValid() {
+ if d := TestProxy_Root_Device.Get(context.Background(), "/devices/"+TestProxy_DeviceId, 0, false, ""); reflect.ValueOf(d).IsValid() {
djson, _ := json.Marshal(d)
t.Errorf("Device was not removed - %s", djson)
} else {
@@ -432,7 +433,7 @@
TestProxy_Root_LogicalDevice.RegisterCallback(PRE_ADD, commonChanCallback, "PRE_ADD instructions", &preAddExecutedPtr)
TestProxy_Root_LogicalDevice.RegisterCallback(POST_ADD, commonChanCallback, "POST_ADD instructions", &postAddExecutedPtr)
- if added := TestProxy_Root_LogicalDevice.Add("/logical_devices", TestProxy_LogicalDevice, ""); added == nil {
+ if added := TestProxy_Root_LogicalDevice.Add(context.Background(), "/logical_devices", TestProxy_LogicalDevice, ""); added == nil {
t.Error("Failed to add logical device")
} else {
t.Logf("Added logical device : %+v", added)
@@ -440,7 +441,7 @@
verifyGotResponse(postAddExecuted)
- if ld := TestProxy_Root_LogicalDevice.Get("/logical_devices/"+TestProxy_LogicalDeviceId, 0, false, ""); !reflect.ValueOf(ld).IsValid() {
+ if ld := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, 0, false, ""); !reflect.ValueOf(ld).IsValid() {
t.Error("Failed to find added logical device")
} else {
ldJSON, _ := json.Marshal(ld)
@@ -458,14 +459,14 @@
func TestProxy_2_1_2_Add_ExistingLogicalDevice(t *testing.T) {
TestProxy_LogicalDevice.Id = TestProxy_LogicalDeviceId
- added := TestProxy_Root_LogicalDevice.Add("/logical_devices", TestProxy_LogicalDevice, "")
+ added := TestProxy_Root_LogicalDevice.Add(context.Background(), "/logical_devices", TestProxy_LogicalDevice, "")
if added.(proto.Message).String() != reflect.ValueOf(TestProxy_LogicalDevice).Interface().(proto.Message).String() {
t.Errorf("Logical devices don't match - existing: %+v returned: %+v", TestProxy_LogicalDevice, added)
}
}
func TestProxy_2_2_1_Get_AllLogicalDevices(t *testing.T) {
- logicalDevices := TestProxy_Root_LogicalDevice.Get("/logical_devices", 1, false, "")
+ logicalDevices := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices", 1, false, "")
if len(logicalDevices.([]interface{})) == 0 {
t.Error("there are no available logical devices to retrieve")
@@ -477,7 +478,7 @@
}
func TestProxy_2_2_2_Get_SingleLogicalDevice(t *testing.T) {
- if ld := TestProxy_Root_LogicalDevice.Get("/logical_devices/"+TestProxy_TargetLogicalDeviceId, 0, false, ""); !reflect.ValueOf(ld).IsValid() {
+ if ld := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 0, false, ""); !reflect.ValueOf(ld).IsValid() {
t.Errorf("Failed to find logical device : %s", TestProxy_TargetLogicalDeviceId)
} else {
ldJSON, _ := json.Marshal(ld)
@@ -492,7 +493,7 @@
postUpdateExecuted := make(chan struct{})
preUpdateExecutedPtr, postUpdateExecutedPtr := preUpdateExecuted, postUpdateExecuted
- if retrieved := TestProxy_Root_LogicalDevice.Get("/logical_devices/"+TestProxy_TargetLogicalDeviceId, 1, false, ""); retrieved == nil {
+ if retrieved := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 1, false, ""); retrieved == nil {
t.Error("Failed to get logical device")
} else {
t.Logf("Found raw logical device (root proxy): %+v", retrieved)
@@ -517,7 +518,7 @@
retrieved.(*voltha.LogicalDevice).RootDeviceId = strconv.Itoa(fwVersion)
- if afterUpdate := TestProxy_Root_LogicalDevice.Update("/logical_devices/"+TestProxy_TargetLogicalDeviceId, retrieved, false,
+ if afterUpdate := TestProxy_Root_LogicalDevice.Update(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, retrieved, false,
""); afterUpdate == nil {
t.Error("Failed to update logical device")
} else {
@@ -531,7 +532,7 @@
t.Error("POST_UPDATE callback was not executed")
}
- if d := TestProxy_Root_LogicalDevice.Get("/logical_devices/"+TestProxy_TargetLogicalDeviceId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
+ if d := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_TargetLogicalDeviceId, 1, false, ""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated logical device (root proxy)")
} else {
djson, _ := json.Marshal(d)
@@ -543,8 +544,8 @@
func TestProxy_2_3_2_Update_LogicalDeviceFlows(t *testing.T) {
// Get a device proxy and update a specific port
- ldFlowsProxy := TestProxy_Root.node.CreateProxy("/logical_devices/"+TestProxy_LogicalDeviceId+"/flows", false)
- flows := ldFlowsProxy.Get("/", 0, false, "")
+ ldFlowsProxy := TestProxy_Root.node.CreateProxy(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId+"/flows", false)
+ flows := ldFlowsProxy.Get(context.Background(), "/", 0, false, "")
flows.(*openflow_13.Flows).Items[0].TableId = rand.Uint32()
t.Logf("before updated flows: %+v", flows)
@@ -557,26 +558,26 @@
commonCallback2,
)
- kvFlows := ldFlowsProxy.Get("/", 0, false, "")
+ kvFlows := ldFlowsProxy.Get(context.Background(), "/", 0, false, "")
if reflect.DeepEqual(flows, kvFlows) {
t.Errorf("Local changes have changed the KV store contents - local:%+v, kv: %+v", flows, kvFlows)
}
- if updated := ldFlowsProxy.Update("/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ if updated := ldFlowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, ""); updated == nil {
t.Error("Failed to update logical device flows")
} else {
t.Logf("Updated logical device flows : %+v", updated)
}
- if d := ldFlowsProxy.Get("/", 0, false, ""); d == nil {
+ if d := ldFlowsProxy.Get(context.Background(), "/", 0, false, ""); d == nil {
t.Error("Failed to find updated logical device flows (flows proxy)")
} else {
djson, _ := json.Marshal(d)
t.Logf("Found flows (flows proxy): %s", string(djson))
}
- if d := TestProxy_Root_LogicalDevice.Get("/logical_devices/"+TestProxy_LogicalDeviceId+"/flows", 0, false,
+ if d := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId+"/flows", 0, false,
""); !reflect.ValueOf(d).IsValid() {
t.Error("Failed to find updated logical device flows (root proxy)")
} else {
@@ -601,7 +602,7 @@
"POST_REMOVE instructions (root proxy)", &postRemoveExecutedPtr,
)
- if removed := TestProxy_Root_LogicalDevice.Remove("/logical_devices/"+TestProxy_LogicalDeviceId, ""); removed == nil {
+ if removed := TestProxy_Root_LogicalDevice.Remove(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, ""); removed == nil {
t.Error("Failed to remove logical device")
} else {
t.Logf("Removed device : %+v", removed)
@@ -614,7 +615,7 @@
t.Error("POST_REMOVE callback was not executed")
}
- if d := TestProxy_Root_LogicalDevice.Get("/logical_devices/"+TestProxy_LogicalDeviceId, 0, false, ""); reflect.ValueOf(d).IsValid() {
+ if d := TestProxy_Root_LogicalDevice.Get(context.Background(), "/logical_devices/"+TestProxy_LogicalDeviceId, 0, false, ""); reflect.ValueOf(d).IsValid() {
djson, _ := json.Marshal(d)
t.Errorf("Device was not removed - %s", djson)
} else {