VOL-1334 : Fixed concurrency issues
- Semaphores were added at the different layers of the model
- Made the proxy interfaces more robust
- Eliminated problems while retrieving latest data in concurrent mode
Change-Id: I7854105d7effa10e5cb704f5d9917569ab184f84
diff --git a/db/model/base_test.go b/db/model/base_test.go
index 1be34f6..010dff9 100644
--- a/db/model/base_test.go
+++ b/db/model/base_test.go
@@ -20,6 +20,7 @@
"github.com/opencord/voltha-go/protos/common"
"github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
+ "sync"
)
type ModelTestConfig struct {
@@ -35,14 +36,22 @@
var (
modelTestConfig = &ModelTestConfig{
- DbPrefix: "service/voltha/data/core/0001",
- DbType: "etcd",
- DbHost: "localhost",
+ DbPrefix: "service/voltha/data/core/0001",
+ DbType: "etcd",
+ DbHost: "localhost",
//DbHost: "10.106.153.44",
DbPort: 2379,
DbTimeout: 5,
}
+ logports = []*voltha.LogicalPort{
+ {
+ Id: "123",
+ DeviceId: "logicalport-0-device-id",
+ DevicePortNo: 123,
+ RootPort: false,
+ },
+ }
ports = []*voltha.Port{
{
PortNo: 123,
@@ -62,15 +71,25 @@
Items: []*openflow_13.OfpFlowStats{stats},
}
device = &voltha.Device{
- Id: devId,
+ Id: devID,
Type: "simulated_olt",
Address: &voltha.Device_HostAndPort{HostAndPort: "1.2.3.4:5555"},
AdminState: voltha.AdminState_PREPROVISIONED,
Flows: flows,
Ports: ports,
}
- devId string
- targetDeviceId string
+
+ logicalDevice = &voltha.LogicalDevice{
+ Id: devID,
+ DatapathId: 0,
+ Ports: logports,
+ Flows: flows,
+ }
+
+ devID string
+ ldevID string
+ targetDevID string
+ targetLogDevID string
)
func init() {
@@ -89,6 +108,7 @@
msgClass := &voltha.Voltha{}
root := NewRoot(msgClass, modelTestConfig.Backend)
+ //root := NewRoot(msgClass, nil)
if modelTestConfig.Backend != nil {
modelTestConfig.Root = root.Load(msgClass)
@@ -98,7 +118,7 @@
GetProfiling().Report()
- modelTestConfig.RootProxy = modelTestConfig.Root.GetProxy("/", false)
+ modelTestConfig.RootProxy = modelTestConfig.Root.node.CreateProxy("/", false)
}
func commonCallback(args ...interface{}) interface{} {
@@ -107,10 +127,34 @@
for i := 0; i < len(args); i++ {
log.Infof("ARG %d : %+v", i, args[i])
}
+
+ mutex := sync.Mutex{}
execStatus := args[1].(*bool)
// Inform the caller that the callback was executed
+ mutex.Lock()
*execStatus = true
+ mutex.Unlock()
+
+ return nil
+}
+
+func commonCallback2(args ...interface{}) interface{} {
+ log.Infof("Running common callback - arg count: %s", len(args))
+
+ return nil
+}
+
+func commonCallbackFunc(args ...interface{}) interface{} {
+ log.Infof("Running common callback - arg count: %d", len(args))
+
+ for i := 0; i < len(args); i++ {
+ log.Infof("ARG %d : %+v", i, args[i])
+ }
+ execStatusFunc := args[1].(func(bool))
+
+ // Inform the caller that the callback was executed
+ execStatusFunc(true)
return nil
}
@@ -121,6 +165,7 @@
log.Infof("Running first callback - name: %s, id: %s\n", name, id)
return nil
}
+
func secondCallback(args ...interface{}) interface{} {
name := args[0].(map[string]string)
id := args[1]
@@ -129,6 +174,7 @@
//panic("Generating a panic in second callback")
return nil
}
+
func thirdCallback(args ...interface{}) interface{} {
name := args[0]
id := args[1].(*voltha.Device)