VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability
- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism
Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/db/model/proxy_load_test.go b/db/model/proxy_load_test.go
index f44a6ae..f4fd325 100644
--- a/db/model/proxy_load_test.go
+++ b/db/model/proxy_load_test.go
@@ -16,6 +16,7 @@
package model
import (
+ "context"
"encoding/hex"
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
@@ -42,10 +43,16 @@
After interface{}
}
type proxyLoadTest struct {
- sync.RWMutex
- addedDevices []string
- updatedFirmwares []proxyLoadChanges
- updatedFlows []proxyLoadChanges
+ mutex sync.RWMutex
+
+ addMutex sync.RWMutex
+ addedDevices []string
+
+ firmwareMutex sync.RWMutex
+ updatedFirmwares []proxyLoadChanges
+ flowMutex sync.RWMutex
+ updatedFlows []proxyLoadChanges
+
preAddExecuted bool
postAddExecuted bool
preUpdateExecuted bool
@@ -53,33 +60,43 @@
}
func (plt *proxyLoadTest) SetPreAddExecuted(status bool) {
- plt.Lock()
- defer plt.Unlock()
+ plt.mutex.Lock()
+ defer plt.mutex.Unlock()
plt.preAddExecuted = status
}
func (plt *proxyLoadTest) SetPostAddExecuted(status bool) {
- plt.Lock()
- defer plt.Unlock()
+ plt.mutex.Lock()
+ defer plt.mutex.Unlock()
plt.postAddExecuted = status
}
func (plt *proxyLoadTest) SetPreUpdateExecuted(status bool) {
- plt.Lock()
- defer plt.Unlock()
+ plt.mutex.Lock()
+ defer plt.mutex.Unlock()
plt.preUpdateExecuted = status
}
func (plt *proxyLoadTest) SetPostUpdateExecuted(status bool) {
- plt.Lock()
- defer plt.Unlock()
+ plt.mutex.Lock()
+ defer plt.mutex.Unlock()
plt.postUpdateExecuted = status
}
func init() {
BenchmarkProxy_Root = NewRoot(&voltha.Voltha{}, nil)
- BenchmarkProxy_Logger, _ = log.AddPackage(log.JSON, log.InfoLevel, nil)
+ BenchmarkProxy_Logger, _ = log.AddPackage(log.JSON, log.DebugLevel, log.Fields{"instanceId": "PLT"})
//log.UpdateAllLoggers(log.Fields{"instanceId": "PROXY_LOAD_TEST"})
+ //Setup default logger - applies for packages that do not have specific logger set
+ if _, err := log.SetDefaultLogger(log.JSON, log.DebugLevel, log.Fields{"instanceId": "PLT"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
- BenchmarkProxy_DeviceProxy = BenchmarkProxy_Root.node.CreateProxy("/", false)
+ // Update all loggers (provisioned via init) with a common field
+ if err := log.UpdateAllLoggers(log.Fields{"instanceId": "PLT"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/db/model", log.DebugLevel)
+
+ BenchmarkProxy_DeviceProxy = BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/", false)
// Register ADD instructions callbacks
BenchmarkProxy_PLT = &proxyLoadTest{}
@@ -133,16 +150,16 @@
var added interface{}
// Add the device
- if added = BenchmarkProxy_DeviceProxy.AddWithID("/devices", ltDevID, ltDevice, ""); added == nil {
+ if added = BenchmarkProxy_DeviceProxy.AddWithID(context.Background(), "/devices", ltDevID, ltDevice, ""); added == nil {
BenchmarkProxy_Logger.Errorf("Failed to add device: %+v", ltDevice)
continue
} else {
BenchmarkProxy_Logger.Infof("Device was added 1: %+v", added)
}
- BenchmarkProxy_PLT.Lock()
+ BenchmarkProxy_PLT.addMutex.Lock()
BenchmarkProxy_PLT.addedDevices = append(BenchmarkProxy_PLT.addedDevices, added.(*voltha.Device).Id)
- BenchmarkProxy_PLT.Unlock()
+ BenchmarkProxy_PLT.addMutex.Unlock()
}
})
@@ -157,8 +174,8 @@
if len(BenchmarkProxy_PLT.addedDevices) > 0 {
var target interface{}
randomID := BenchmarkProxy_PLT.addedDevices[rand.Intn(len(BenchmarkProxy_PLT.addedDevices))]
- firmProxy := BenchmarkProxy_Root.node.CreateProxy("/", false)
- if target = firmProxy.Get("/devices/"+randomID, 0, false,
+ firmProxy := BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/", false)
+ if target = firmProxy.Get(context.Background(), "/devices/"+randomID, 0, false,
""); !reflect.ValueOf(target).IsValid() {
BenchmarkProxy_Logger.Errorf("Failed to find device: %s %+v", randomID, target)
continue
@@ -183,7 +200,7 @@
after := target.(*voltha.Device).FirmwareVersion
var updated interface{}
- if updated = firmProxy.Update("/devices/"+randomID, target.(*voltha.Device), false,
+ if updated = firmProxy.Update(context.Background(), "/devices/"+randomID, target.(*voltha.Device), false,
""); updated == nil {
BenchmarkProxy_Logger.Errorf("Failed to update device: %+v", target)
continue
@@ -192,7 +209,7 @@
}
- if d := firmProxy.Get("/devices/"+randomID, 0, false,
+ if d := firmProxy.Get(context.Background(), "/devices/"+randomID, 0, false,
""); !reflect.ValueOf(d).IsValid() {
BenchmarkProxy_Logger.Errorf("Failed to get device: %s", randomID)
continue
@@ -204,12 +221,13 @@
BenchmarkProxy_Logger.Errorf("Imm Device has unknown value: %s %+v %+v", randomID, d, target)
}
- BenchmarkProxy_PLT.Lock()
+ BenchmarkProxy_PLT.firmwareMutex.Lock()
+
BenchmarkProxy_PLT.updatedFirmwares = append(
BenchmarkProxy_PLT.updatedFirmwares,
proxyLoadChanges{ID: randomID, Before: before, After: after},
)
- BenchmarkProxy_PLT.Unlock()
+ BenchmarkProxy_PLT.firmwareMutex.Unlock()
}
}
})
@@ -246,8 +264,8 @@
if len(BenchmarkProxy_PLT.addedDevices) > 0 {
randomID := BenchmarkProxy_PLT.addedDevices[rand.Intn(len(BenchmarkProxy_PLT.addedDevices))]
- flowsProxy := BenchmarkProxy_Root.node.CreateProxy("/devices/"+randomID+"/flows", false)
- flows := flowsProxy.Get("/", 0, false, "")
+ flowsProxy := BenchmarkProxy_Root.node.CreateProxy(context.Background(), "/devices/"+randomID+"/flows", false)
+ flows := flowsProxy.Get(context.Background(), "/", 0, false, "")
before := flows.(*openflow_13.Flows).Items[0].TableId
flows.(*openflow_13.Flows).Items[0].TableId = uint32(rand.Intn(3000))
@@ -263,17 +281,17 @@
)
var updated interface{}
- if updated = flowsProxy.Update("/", flows.(*openflow_13.Flows), false, ""); updated == nil {
+ if updated = flowsProxy.Update(context.Background(), "/", flows.(*openflow_13.Flows), false, ""); updated == nil {
b.Errorf("Failed to update flows for device: %+v", flows)
} else {
BenchmarkProxy_Logger.Infof("Flows were updated : %+v", updated)
}
- BenchmarkProxy_PLT.Lock()
+ BenchmarkProxy_PLT.flowMutex.Lock()
BenchmarkProxy_PLT.updatedFlows = append(
BenchmarkProxy_PLT.updatedFlows,
proxyLoadChanges{ID: randomID, Before: before, After: after},
)
- BenchmarkProxy_PLT.Unlock()
+ BenchmarkProxy_PLT.flowMutex.Unlock()
}
}
})
@@ -285,7 +303,7 @@
for i := 0; i < len(BenchmarkProxy_PLT.addedDevices); i++ {
devToGet := BenchmarkProxy_PLT.addedDevices[i]
// Verify that the added device can now be retrieved
- if d := BenchmarkProxy_DeviceProxy.Get("/devices/"+devToGet, 0, false,
+ if d := BenchmarkProxy_DeviceProxy.Get(context.Background(), "/devices/"+devToGet, 0, false,
""); !reflect.ValueOf(d).IsValid() {
BenchmarkProxy_Logger.Errorf("Failed to get device: %s", devToGet)
continue
@@ -299,7 +317,7 @@
for i := 0; i < len(BenchmarkProxy_PLT.updatedFirmwares); i++ {
devToGet := BenchmarkProxy_PLT.updatedFirmwares[i].ID
// Verify that the updated device can be retrieved and that the updates were actually applied
- if d := BenchmarkProxy_DeviceProxy.Get("/devices/"+devToGet, 0, false,
+ if d := BenchmarkProxy_DeviceProxy.Get(context.Background(), "/devices/"+devToGet, 0, false,
""); !reflect.ValueOf(d).IsValid() {
BenchmarkProxy_Logger.Errorf("Failed to get device: %s", devToGet)
continue
@@ -312,23 +330,3 @@
}
}
}
-
-func BenchmarkProxy_GetUpdatedFlows(b *testing.B) {
- var d interface{}
- for i := 0; i < len(BenchmarkProxy_PLT.updatedFlows); i++ {
- devToGet := BenchmarkProxy_PLT.updatedFlows[i].ID
- // Verify that the updated device can be retrieved and that the updates were actually applied
- flowsProxy := BenchmarkProxy_Root.node.CreateProxy("/devices/"+devToGet+"/flows", false)
- if d = flowsProxy.Get("/", 0, false,
- ""); !reflect.ValueOf(d).IsValid() {
- BenchmarkProxy_Logger.Errorf("Failed to get device flows: %s", devToGet)
- continue
- } else if d.(*openflow_13.Flows).Items[0].TableId == BenchmarkProxy_PLT.updatedFlows[i].After.(uint32) {
- BenchmarkProxy_Logger.Infof("Device was updated with new flow value: %s %+v", devToGet, d)
- } else if d.(*openflow_13.Flows).Items[0].TableId == BenchmarkProxy_PLT.updatedFlows[i].Before.(uint32) {
- BenchmarkProxy_Logger.Errorf("Device kept old flow value: %s %+v %+v", devToGet, d, BenchmarkProxy_PLT.updatedFlows[i])
- } else {
- BenchmarkProxy_Logger.Errorf("Device has unknown flow value: %s %+v %+v", devToGet, d, BenchmarkProxy_PLT.updatedFlows[i])
- }
- }
-}