[VOL-1462] Sync data between two voltha cores in the same pair
This commit consists of the following updates:
1) Background data syncing between two cores after a transaction
is completed by one core.
2) Add transaction management to southbound APIs (adapter facing).
This is enabled got adapter registration only for now.
3) Fix an issue with flow decomposition
4) Add the rough-in to allow a packet to be send to an OFAgent
with a transaction ID. Two cores can therefore send the same
packet and let the OFAgent discard the duplicate. The work in
OFAgent remains.
5) Cleanups
Change-Id: Ibe9d75edb66cfd6a0954bdfeb16a7e7c8a3c53b6
diff --git a/adapters/common/utils.go b/adapters/common/utils.go
index 98468b0..810a3d0 100644
--- a/adapters/common/utils.go
+++ b/adapters/common/utils.go
@@ -45,3 +45,29 @@
rand.Intn(128),
)
}
+
+const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
+const (
+ letterIdxBits = 6 // 6 bits to represent a letter index
+ letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
+ letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
+)
+
+var src = rand.NewSource(time.Now().UnixNano())
+
+func GetRandomString(n int) string {
+ b := make([]byte, n)
+ // A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
+ for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
+ if remain == 0 {
+ cache, remain = src.Int63(), letterIdxMax
+ }
+ if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
+ b[i] = letterBytes[idx]
+ i--
+ }
+ cache >>= letterIdxBits
+ remain--
+ }
+ return string(b)
+}
\ No newline at end of file
diff --git a/compose/rw_core_concurrency_test.yml b/compose/rw_core_concurrency_test.yml
new file mode 100644
index 0000000..8f6f35e
--- /dev/null
+++ b/compose/rw_core_concurrency_test.yml
@@ -0,0 +1,71 @@
+---
+# Copyright 2019 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2'
+services:
+ rw_core1:
+ image: voltha-rw-core
+ entrypoint:
+ - /app/rw_core
+ - -kv_store_type=etcd
+ - -kv_store_host=${DOCKER_HOST_IP}
+ - -kv_store_port=2379
+ - -grpc_port=50057
+ - -banner=true
+ - -kafka_adapter_host=${DOCKER_HOST_IP}
+ - -kafka_adapter_port=9092
+ - -kafka_cluster_host=${DOCKER_HOST_IP}
+ - -kafka_cluster_port=9092
+ - -rw_core_topic=rwcore
+ - -kv_store_data_prefix=service/voltha
+ - -in_competing_mode=true
+ - -timeout_long_request=3000
+ - -timeout_request=300
+ - -log_level=0
+ ports:
+ - 50057:50057
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ networks:
+ - default
+
+ rw_core2:
+ image: voltha-rw-core
+ entrypoint:
+ - /app/rw_core
+ - -kv_store_type=etcd
+ - -kv_store_host=${DOCKER_HOST_IP}
+ - -kv_store_port=2379
+ - -grpc_port=50057
+ - -banner=true
+ - -kafka_adapter_host=${DOCKER_HOST_IP}
+ - -kafka_adapter_port=9092
+ - -kafka_cluster_host=${DOCKER_HOST_IP}
+ - -kafka_cluster_port=9092
+ - -rw_core_topic=rwcore
+ - -kv_store_data_prefix=service/voltha
+ - -in_competing_mode=true
+ - -timeout_long_request=3000
+ - -timeout_request=300
+ - -log_level=0
+ ports:
+ - 50058:50057
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ networks:
+ - default
+networks:
+ default:
+ driver: bridge
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 05d0af5..4359f7d 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -41,6 +41,10 @@
DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
)
+const (
+ TransactionKey = "transactionID"
+)
+
// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
// obtained from that channel, this interface is invoked. This is used to handle
// async requests into the Core via the kafka messaging bus
@@ -592,6 +596,25 @@
return
}
+func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+ arg := &KVArg{
+ Key: TransactionKey,
+ Value: &ic.StrType{Val: transactionId},
+ }
+
+ var marshalledArg *any.Any
+ var err error
+ if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
+ log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+ return currentArgs
+ }
+ protoArg := &ic.Argument{
+ Key: arg.Key,
+ Value: marshalledArg,
+ }
+ return append(currentArgs, protoArg)
+}
+
func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
// First extract the header to know whether this is a request - responses are handled by a different handler
@@ -607,6 +630,9 @@
} else {
log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
// let the callee unpack the arguments as its the only one that knows the real proto type
+ // Augment the requestBody with the message Id as it will be used in scenarios where cores
+ // are set in pairs and competing
+ requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
if err != nil {
log.Warn(err)
@@ -623,7 +649,6 @@
returnedValues = make([]interface{}, 1)
returnedValues[0] = returnError
} else {
- //log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
returnedValues = make([]interface{}, 0)
// Check for errors first
lastIndex := len(out) - 1
@@ -635,6 +660,8 @@
returnError = &ic.Error{Reason: "incorrect-error-returns"}
returnedValues = append(returnedValues, returnError)
}
+ } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
+ return // Ignore case - when core is in competing mode
} else { // Non-error case
success = true
for idx, val := range out {
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 55c68a3..698fefa 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -459,7 +459,7 @@
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
case ok := <-sc.producer.Successes():
- log.Debugw("message-sent", log.Fields{"status": ok})
+ log.Debugw("message-sent", log.Fields{"status": ok.Topic})
case notOk := <-sc.producer.Errors():
log.Debugw("error-sending", log.Fields{"status": notOk})
return notOk
@@ -786,7 +786,7 @@
// Channel closed
break startloop
}
- log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
+ log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
msgBody := msg.Value
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index d4d3c69..0be4e12 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -139,7 +139,7 @@
//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
func (aMgr *AdapterManager) loadAdaptersAndDevicetypesInMemory() {
// Load the adapters
- if adaptersIf := aMgr.clusterDataProxy.Get("/adapters", 0, false, ""); adaptersIf != nil {
+ if adaptersIf := aMgr.clusterDataProxy.List("/adapters", 0, false, ""); adaptersIf != nil {
for _, adapterIf := range adaptersIf.([]interface{}) {
if adapter, ok := adapterIf.(*voltha.Adapter); ok {
log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
@@ -153,7 +153,7 @@
}
// Load the device types
- if deviceTypesIf := aMgr.clusterDataProxy.Get("/device_types", 0, false, ""); deviceTypesIf != nil {
+ if deviceTypesIf := aMgr.clusterDataProxy.List("/device_types", 0, false, ""); deviceTypesIf != nil {
dTypes := &voltha.DeviceTypes{Items:[]*voltha.DeviceType{}}
for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
@@ -169,6 +169,31 @@
}
}
+
+//updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
+func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory() {
+ // Update the adapters
+ if adaptersIf := aMgr.clusterDataProxy.List("/adapters", 0, false, ""); adaptersIf != nil {
+ for _, adapterIf := range adaptersIf.([]interface{}) {
+ if adapter, ok := adapterIf.(*voltha.Adapter); ok {
+ log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
+ aMgr.updateAdapter(adapter)
+ }
+ }
+ }
+ // Update the device types
+ if deviceTypesIf := aMgr.clusterDataProxy.List("/device_types", 0, false, ""); deviceTypesIf != nil {
+ dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
+ for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
+ if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
+ log.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
+ aMgr.updateDeviceType(dType)
+ }
+ }
+ }
+}
+
+
func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
@@ -232,7 +257,9 @@
defer aMgr.lockAdaptersMap.Unlock()
for _, adapterAgent := range aMgr.adapterAgents {
if a := adapterAgent.getAdapter(); a != nil {
- result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+ if a.Id != SENTINEL_ADAPTER_ID { // don't report the sentinel
+ result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+ }
}
}
return result, nil
@@ -270,7 +297,7 @@
defer aMgr.lockAdaptersMap.Unlock()
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- if adapterAgent, exist := aMgr.adapterAgents[deviceType.Adapter]; !exist {
+ if adapterAgent, exist := aMgr.adapterAgents[deviceType.Adapter]; exist {
adapterAgent.updateDeviceType(deviceType)
} else {
aMgr.adapterAgents[deviceType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: deviceType.Adapter},
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 98cc688..5fede68 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -22,6 +22,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/kafka"
ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
@@ -36,9 +37,14 @@
adapterMgr *AdapterManager
localDataProxy *model.Proxy
clusterDataProxy *model.Proxy
+ defaultRequestTimeout int64
+ longRunningRequestTimeout int64
+ coreInCompetingMode bool
}
-func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
+func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
+ aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout int64,
+ defaultRequestTimeout int64) *AdapterRequestHandlerProxy {
var proxy AdapterRequestHandlerProxy
proxy.coreInstanceId = coreInstanceId
proxy.deviceMgr = dMgr
@@ -46,17 +52,43 @@
proxy.clusterDataProxy = cdProxy
proxy.localDataProxy = ldProxy
proxy.adapterMgr = aMgr
+ proxy.coreInCompetingMode = incompetingMode
+ proxy.defaultRequestTimeout = defaultRequestTimeout
+ proxy.longRunningRequestTimeout = longRunningRequestTimeout
return &proxy
}
+func (rhp *AdapterRequestHandlerProxy) acquireTransaction(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
+ timeout := rhp.defaultRequestTimeout
+ if len(maxTimeout) > 0 {
+ timeout = maxTimeout[0]
+ }
+ log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+ txn := NewKVTransaction(transactionId)
+ if txn == nil {
+ return nil, errors.New("fail-to-create-transaction")
+ } else if txn.Acquired(timeout) {
+ return txn, nil
+ } else {
+ return nil, errors.New("failed-to-seize-request")
+ }
+}
+
+// competeForTransaction is a helper function to determine whether every request needs to compete with another
+// Core to execute the request
+func (rhp *AdapterRequestHandlerProxy) competeForTransaction() bool {
+ return rhp.coreInCompetingMode
+}
+
func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
- if len(args) != 2 {
+ if len(args) != 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
adapter := &voltha.Adapter{}
deviceTypes := &voltha.DeviceTypes{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "adapter":
@@ -69,9 +101,27 @@
log.Warnw("cannot-unmarshal-device-types", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "coreId": rhp.coreInstanceId})
+ log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionID": transactionID.Val, "coreId": rhp.coreInstanceId})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // Update our adapters in memory
+ go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory()
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
@@ -80,17 +130,40 @@
}
func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
- if len(args) != 1 {
+ if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
+
pID := &voltha.ID{}
- if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
- log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
- return nil, err
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+ log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
- log.Debugw("GetDevice", log.Fields{"deviceId": pID.Id})
+ log.Debugw("GetDevice", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return &voltha.Device{Id: pID.Id}, nil
@@ -125,17 +198,40 @@
}
func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 1 {
+ if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
+
device := &voltha.Device{}
- if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
- return nil, err
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
- log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
+ log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return new(empty.Empty), nil
@@ -154,32 +250,56 @@
}
func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ic.Argument) (*voltha.Device, error) {
- if len(args) < 1 {
+ if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
+
pID := &voltha.ID{}
- if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
- log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
- return nil, err
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+ log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
- log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id})
+ log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return &voltha.Device{Id: pID.Id}, nil
}
- return nil, nil
+ return rhp.deviceMgr.GetDevice(pID.Id)
}
func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
- if len(args) != 2 {
+ if len(args) != 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
pt := &ic.IntType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -192,9 +312,14 @@
log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("GetPorts", log.Fields{"deviceID": deviceId.Id, "portype": pt.Val})
+ log.Debugw("GetPorts", log.Fields{"deviceID": deviceId.Id, "portype": pt.Val, "transactionID": transactionID.Val})
if rhp.TestMode { // Execute only for test cases
aPort := &voltha.Port{Label: "test_port"}
allPorts := &voltha.Ports{}
@@ -204,31 +329,54 @@
return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
}
-func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Device, error) {
- if len(args) != 1 {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Devices, error) {
+ if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
+
pID := &voltha.ID{}
- if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
- log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
- return nil, err
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+ log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
- log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id})
+ log.Debugw("GetChildDevices", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
- return &voltha.Device{Id: pID.Id}, nil
+ return &voltha.Devices{Items:nil}, nil
}
- //TODO: Complete
- return nil, nil
+
+ return rhp.deviceMgr.getAllChildDevices(pID.Id)
}
// ChildDeviceDetected is invoked when a child device is detected. The following
// parameters are expected:
// {parent_device_id, parent_port_no, child_device_type, proxy_address, admin_state, **kw)
func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 4 {
+ if len(args) < 5 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -238,6 +386,7 @@
portNo := &ic.IntType{}
dt := &ic.StrType{}
chnlId := &ic.IntType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "parent_device_id":
@@ -260,10 +409,26 @@
log.Warnw("cannot-unmarshal-channel-id", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
log.Debugw("ChildDeviceDetected", log.Fields{"parentDeviceId": pID.Id, "parentPortNo": portNo.Val,
- "deviceType": dt.Val, "channelId": chnlId.Val})
+ "deviceType": dt.Val, "channelId": chnlId.Val, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return nil, nil
@@ -275,7 +440,7 @@
}
func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 2 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -283,6 +448,7 @@
deviceId := &voltha.ID{}
operStatus := &ic.IntType{}
connStatus := &ic.IntType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -300,9 +466,27 @@
log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
+ log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus,
+ "conn-status": connStatus, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
+
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
@@ -312,7 +496,7 @@
}
func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 2 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -320,6 +504,7 @@
deviceId := &voltha.ID{}
operStatus := &ic.IntType{}
connStatus := &ic.IntType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -337,9 +522,27 @@
log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("ChildrenStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
+ log.Debugw("ChildrenStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus,
+ "conn-status": connStatus, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
+
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
@@ -350,7 +553,7 @@
}
func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 2 {
+ if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -359,6 +562,7 @@
portType := &ic.IntType{}
portNo := &ic.IntType{}
operStatus := &ic.IntType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -381,10 +585,27 @@
log.Warnw("cannot-unmarshal-portno", log.Fields{"error": err})
return nil, err
}
-
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus, "portType": portType, "portNo": portNo})
+ log.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus,
+ "portType": portType, "portNo": portNo, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
+
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
@@ -393,13 +614,14 @@
}
func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 2 {
+ if len(args) != 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
port := &voltha.Port{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -412,9 +634,25 @@
log.Warnw("cannot-unmarshal-port", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("PortCreated", log.Fields{"deviceId": deviceId.Id, "port": port})
+ log.Debugw("PortCreated", log.Fields{"deviceId": deviceId.Id, "port": port, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return nil, nil
@@ -426,13 +664,14 @@
}
func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 2 {
+ if len(args) != 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
pmConfigs := &voltha.PmConfigs{}
init := &ic.BoolType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_pm_config":
@@ -445,10 +684,26 @@
log.Warnw("cannot-unmarshal-boolean", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
log.Debugw("DevicePMConfigUpdate", log.Fields{"deviceId": pmConfigs.Id, "configs": pmConfigs,
- "init": init})
+ "init": init, "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
if rhp.TestMode { // Execute only for test cases
return nil, nil
@@ -461,7 +716,7 @@
}
func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) < 3 {
+ if len(args) < 4 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -469,6 +724,7 @@
deviceId := &voltha.ID{}
portNo := &ic.IntType{}
packet := &ic.Packet{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -486,14 +742,23 @@
log.Warnw("cannot-unmarshal-packet", log.Fields{"error": err})
return nil, err
}
-
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val, "packet": packet})
+ log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val, "packet": packet,
+ "transactionID": transactionID.Val})
+
+ // For performance reason, we do not compete for packet-in. We process it and send the packet in. later in the
+ // processing flow the duplicate packet will be discarded
+
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), packet.Payload)
+ go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
return new(empty.Empty), nil
}
@@ -506,6 +771,7 @@
}
deviceId := &voltha.ID{}
img := &voltha.ImageDownload{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -518,9 +784,27 @@
log.Warnw("cannot-unmarshal-imgaeDownload", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
- log.Debugw("UpdateImageDownload", log.Fields{"deviceId": deviceId.Id, "image-download": img})
+ log.Debugw("UpdateImageDownload", log.Fields{"deviceId": deviceId.Id, "image-download": img,
+ "transactionID": transactionID.Val})
+
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+ // returning nil, nil instructs the callee to ignore this request
+ return nil, nil
+ } else {
+ defer txn.Close()
+ }
+ }
+
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c81141b..838235d 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -154,9 +154,11 @@
return nil
}
-func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
- aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) error {
- requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy)
+func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+ ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
+ ) error {
+ requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+ core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
log.Info("request-handlers")
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 8bf8664..116e2bb 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -54,10 +54,10 @@
cloned := (proto.Clone(device)).(*voltha.Device)
if cloned.Id == "" {
cloned.Id = CreateDeviceId()
+ cloned.AdminState = voltha.AdminState_PREPROVISIONED
+ cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
+ cloned.Flows = &ofp.Flows{Items: nil}
}
- cloned.AdminState = voltha.AdminState_PREPROVISIONED
- cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
- cloned.Flows = &ofp.Flows{Items: nil}
if !device.GetRoot() && device.ProxyAddress != nil {
// Set the default vlan ID to the one specified by the parent adapter. It can be
// overwritten by the child adapter during a device update request
@@ -74,15 +74,29 @@
return &agent
}
-// start save the device to the data model and registers for callbacks on that device
-func (agent *DeviceAgent) start(ctx context.Context) {
+// start save the device to the data model and registers for callbacks on that device if loadFromdB is false. Otherwise,
+// it will load the data from the dB and setup teh necessary callbacks and proxies.
+func (agent *DeviceAgent) start(ctx context.Context, loadFromdB bool) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
- // Add the initial device to the local model
- if added := agent.clusterDataProxy.AddWithID("/devices", agent.deviceId, agent.lastData, ""); added == nil {
- log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
+ log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceId})
+ if loadFromdB {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ agent.lastData = proto.Clone(d).(*voltha.Device)
+ }
+ } else {
+ log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceId})
+ return status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+ }
+ log.Debugw("device-loaded-from-dB", log.Fields{"device": agent.lastData})
+ } else {
+ // Add the initial device to the local model
+ if added := agent.clusterDataProxy.AddWithID("/devices", agent.deviceId, agent.lastData, ""); added == nil {
+ log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
+ }
}
+
agent.deviceProxy = agent.clusterDataProxy.Root.CreateProxy("/devices/"+agent.deviceId, false)
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
@@ -97,6 +111,7 @@
agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
log.Debug("device-agent-started")
+ return nil
}
// stop stops the device agent. Not much to do for now
@@ -112,7 +127,7 @@
func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
@@ -124,7 +139,7 @@
// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
// This function is meant so that we do not have duplicate code all over the device agent functions
func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
@@ -808,7 +823,7 @@
}
//flowTableUpdated is the callback after flows have been updated in the model to push them
-//to the adapterAgents
+//to the adapters
func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
@@ -842,7 +857,7 @@
}
groups := device.FlowGroups
- // Send update to adapterAgents
+ // Send update to adapters
dType := agent.adapterMgr.getDeviceType(device.Type)
if !dType.AcceptsAddRemoveFlowUpdates {
if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
@@ -882,7 +897,7 @@
}
//groupTableUpdated is the callback after group table has been updated in the model to push them
-//to the adapterAgents
+//to the adapters
func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
@@ -916,11 +931,9 @@
}
flows := device.Flows
- // Send update to adapterAgents
- // TODO: Check whether the device supports incremental flow changes
- // Assume false for test
- acceptsAddRemoveFlowUpdates := false
- if !acceptsAddRemoveFlowUpdates {
+ // Send update to adapters
+ dType := agent.adapterMgr.getDeviceType(device.Type)
+ if !dType.AcceptsAddRemoveFlowUpdates {
if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
return err
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index cd662ac..88b0c7d 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -34,7 +34,7 @@
type DeviceManager struct {
deviceAgents map[string]*DeviceAgent
adapterProxy *AdapterProxy
- adapterMgr *AdapterManager
+ adapterMgr *AdapterManager
logicalDeviceMgr *LogicalDeviceManager
kafkaICProxy *kafka.InterContainerProxy
stateTransitions *TransitionMap
@@ -52,7 +52,7 @@
deviceMgr.kafkaICProxy = kafkaICProxy
deviceMgr.coreInstanceId = coreInstanceId
deviceMgr.clusterDataProxy = cdProxy
- deviceMgr.adapterMgr= adapterMgr
+ deviceMgr.adapterMgr = adapterMgr
deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
return &deviceMgr
}
@@ -96,16 +96,27 @@
delete(dMgr.deviceAgents, agent.deviceId)
}
+// getDeviceAgent returns the agent managing the device. If the device is not in memory, it will loads it, if it exists
func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
- // TODO If the device is not in memory it needs to be loaded first
dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
if agent, ok := dMgr.deviceAgents[deviceId]; ok {
+ dMgr.lockDeviceAgentsMap.Unlock()
return agent
+ } else {
+ // Try to load into memory - loading will also create the device agent
+ dMgr.lockDeviceAgentsMap.Unlock()
+ if err := dMgr.load(deviceId); err == nil {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ if agent, ok = dMgr.deviceAgents[deviceId]; ok {
+ return agent
+ }
+ }
}
return nil
}
+// listDeviceIdsFromMap returns the list of device IDs that are in memory
func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
dMgr.lockDeviceAgentsMap.Lock()
defer dMgr.lockDeviceAgentsMap.Unlock()
@@ -122,7 +133,7 @@
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy)
dMgr.addDeviceAgentToMap(agent)
- agent.start(ctx)
+ agent.start(ctx, false)
sendResponse(ctx, ch, agent.lastData)
}
@@ -133,8 +144,6 @@
if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
res = agent.enableDevice(ctx)
log.Debugw("EnableDevice-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", id.Id)
}
sendResponse(ctx, ch, res)
@@ -181,6 +190,7 @@
sendResponse(ctx, ch, res)
}
+// GetDevice will returns a device, either from memory or from the dB, if present
func (dMgr *DeviceManager) GetDevice(id string) (*voltha.Device, error) {
log.Debugw("GetDevice", log.Fields{"deviceid": id})
if agent := dMgr.getDeviceAgent(id); agent != nil {
@@ -189,6 +199,13 @@
return nil, status.Errorf(codes.NotFound, "%s", id)
}
+func (dMgr *DeviceManager) IsDeviceInCache(id string) bool {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ _, exist := dMgr.deviceAgents[id]
+ return exist
+}
+
func (dMgr *DeviceManager) IsRootDevice(id string) (bool, error) {
device, err := dMgr.GetDevice(id)
if err != nil {
@@ -203,10 +220,15 @@
result := &voltha.Devices{}
if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
for _, device := range devices.([]interface{}) {
- if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
- agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
- dMgr.addDeviceAgentToMap(agent)
- agent.start(nil)
+ // If device is not in memory then set it up
+ if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
+ agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
+ agent.stop(nil)
+ } else {
+ dMgr.addDeviceAgentToMap(agent)
+ }
}
result.Items = append(result.Items, device.(*voltha.Device))
}
@@ -214,6 +236,97 @@
return result, nil
}
+// loadDevice loads the deviceId in memory, if not present
+func (dMgr *DeviceManager) loadDevice(deviceId string) (*DeviceAgent, error) {
+ log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
+ // Sanity check
+ if deviceId == "" {
+ return nil, status.Error(codes.InvalidArgument, "deviceId empty")
+ }
+ if !dMgr.IsDeviceInCache(deviceId) {
+ agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: deviceId}, dMgr, dMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ agent.stop(nil)
+ return nil, err
+ }
+ dMgr.addDeviceAgentToMap(agent)
+ }
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent, nil
+ }
+ return nil, status.Error(codes.NotFound, deviceId) // This should nto happen
+}
+
+// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
+func (dMgr *DeviceManager) loadRootDeviceParentAndChildren(device *voltha.Device) error {
+ log.Debugw("loading-parent-and-children", log.Fields{"deviceId": device.Id})
+ if device.Root {
+ // Scenario A
+ if device.ParentId != "" {
+ // Load logical device if needed.
+ if err := dMgr.logicalDeviceMgr.load(device.ParentId); err != nil {
+ log.Warnw("failure-loading-logical-device", log.Fields{"lDeviceId": device.ParentId})
+ }
+ } else {
+ log.Debugw("no-parent-to-load", log.Fields{"deviceId": device.Id})
+ }
+ // Load all child devices, if needed
+ if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
+ for _, childDeviceId := range childDeviceIds {
+ if _, err := dMgr.loadDevice(childDeviceId); err != nil {
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId})
+ return err
+ }
+ }
+ log.Debugw("loaded-children", log.Fields{"deviceId": device.Id, "numChildren": len(childDeviceIds)})
+ } else {
+ log.Debugw("no-child-to-load", log.Fields{"deviceId": device.Id})
+ }
+ }
+ return nil
+}
+
+// load loads the deviceId in memory, if not present, and also loads its accompanying parents and children. Loading
+// in memory is for improved performance. It is not imperative that a device needs to be in memory when a request
+// acting on the device is received by the core. In such a scenario, the Core will load the device in memory first
+// and the proceed with the request.
+func (dMgr *DeviceManager) load(deviceId string) error {
+ log.Debug("load...")
+ // First load the device - this may fail in case the device was deleted intentionally by the other core
+ var dAgent *DeviceAgent
+ var err error
+ if dAgent, err = dMgr.loadDevice(deviceId); err != nil {
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": deviceId})
+ return err
+ }
+ // Get the loaded device details
+ var device *voltha.Device
+ if device, err = dAgent.getDevice(); err != nil {
+ return err
+ }
+
+ // If the device is in Pre-provisioning or deleted state stop here
+ if device.AdminState == voltha.AdminState_PREPROVISIONED || device.AdminState == voltha.AdminState_DELETED {
+ return nil
+ }
+
+ // Now we face two scenarios
+ if device.Root {
+ // Load all children as well as the parent of this device (logical_device)
+ if err := dMgr.loadRootDeviceParentAndChildren(device); err != nil {
+ log.Warnw("failure-loading-device-parent-and-children", log.Fields{"deviceId": deviceId})
+ return err
+ }
+ log.Debugw("successfully-loaded-parent-and-children", log.Fields{"deviceId": deviceId})
+ } else {
+ // Scenario B - use the parentId of that device (root device) to trigger the loading
+ if device.ParentId != "" {
+ return dMgr.load(device.ParentId)
+ }
+ }
+ return nil
+}
+
// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
func (dMgr *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
log.Debug("ListDeviceIDs")
@@ -230,17 +343,17 @@
reconciled := 0
for _, id := range ids.Items {
// Act on the device only if its not present in the agent map
- if agent := dMgr.getDeviceAgent(id.Id); agent == nil {
+ if !dMgr.IsDeviceInCache(id.Id) {
// Device Id not in memory
log.Debugw("reconciling-device", log.Fields{"id": id.Id})
- // Load device from model
- if device := dMgr.clusterDataProxy.Get("/devices/"+id.Id, 0, false, ""); device != nil {
- agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
- dMgr.addDeviceAgentToMap(agent)
- agent.start(nil)
- reconciled += 1
+ // Load device from dB
+ agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id:id.Id}, dMgr, dMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
+ agent.stop(nil)
} else {
- log.Warnw("device-inexistent", log.Fields{"id": id.Id})
+ dMgr.addDeviceAgentToMap(agent)
+ reconciled += 1
}
} else {
reconciled += 1
@@ -393,7 +506,7 @@
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy)
dMgr.addDeviceAgentToMap(agent)
- agent.start(nil)
+ agent.start(nil, false)
// Activate the child device
if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
@@ -430,7 +543,7 @@
return status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) PacketIn(deviceId string, port uint32, packet []byte) error {
+func (dMgr *DeviceManager) PacketIn(deviceId string, port uint32, transactionId string, packet []byte) error {
log.Debugw("PacketIn", log.Fields{"deviceId": deviceId, "port": port})
// Get the logical device Id based on the deviceId
var device *voltha.Device
@@ -444,7 +557,7 @@
return status.Errorf(codes.FailedPrecondition, "%s", deviceId)
}
- if err := dMgr.logicalDeviceMgr.packetIn(device.ParentId, port, packet); err != nil {
+ if err := dMgr.logicalDeviceMgr.packetIn(device.ParentId, port, transactionId, packet); err != nil {
return err
}
return nil
@@ -576,6 +689,23 @@
return childDeviceIds, nil
}
+//getAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
+func (dMgr *DeviceManager) getAllChildDevices(parentDeviceId string) (*voltha.Devices, error) {
+ log.Debugw("getAllChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
+ if parentDevice, err := dMgr.GetDevice(parentDeviceId); err == nil {
+ childDevices := make([]*voltha.Device, 0)
+ if childDeviceIds, er := dMgr.getAllChildDeviceIds(parentDevice); er == nil {
+ for _, deviceId := range childDeviceIds {
+ if d, e := dMgr.GetDevice(deviceId); e == nil && d != nil {
+ childDevices = append(childDevices, d)
+ }
+ }
+ }
+ return &voltha.Devices{Items: childDevices}, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", parentDeviceId)
+}
+
func (dMgr *DeviceManager) addUNILogicalPort(cDevice *voltha.Device) error {
log.Info("addUNILogicalPort")
if err := dMgr.logicalDeviceMgr.addUNILogicalPort(nil, cDevice); err != nil {
@@ -585,7 +715,6 @@
return nil
}
-
func (dMgr *DeviceManager) downloadImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
log.Debugw("downloadImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
@@ -661,7 +790,6 @@
sendResponse(ctx, ch, res)
}
-
func (dMgr *DeviceManager) updateImageDownload(deviceId string, img *voltha.ImageDownload) error {
log.Debugw("updateImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
@@ -691,7 +819,6 @@
return nil, status.Errorf(codes.NotFound, "%s", deviceId)
}
-
func (dMgr *DeviceManager) activateDevice(cDevice *voltha.Device) error {
log.Info("activateDevice")
return nil
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 877fcb4..f72d615 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -35,8 +35,6 @@
//TODO: Move this Tag into the proto file
const OF_CONTROLLER_TAG= "voltha_backend_name"
-//const MAX_RESPONSE_TIME = int64(500) // milliseconds
-
const (
IMAGE_DOWNLOAD = iota
CANCEL_IMAGE_DOWNLOAD = iota
@@ -44,6 +42,15 @@
REVERT_IMAGE = iota
)
+
+type deviceID struct {
+ id string
+}
+
+type logicalDeviceID struct {
+ id string
+}
+
type APIHandler struct {
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
@@ -118,7 +125,7 @@
return handler.coreInCompetingMode
}
-func (handler *APIHandler) acquireTransaction(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+func (handler *APIHandler) acquireTransaction(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
timeout := handler.defaultRequestTimeout
if len(maxTimeout) > 0 {
timeout = maxTimeout[0]
@@ -130,6 +137,20 @@
} else if txn.Acquired(timeout) {
return txn, nil
} else {
+ if id != nil {
+ // The id can either be a device Id or a logical device id.
+ if dId, ok := id.(*deviceID); ok {
+ // Since this core has not processed this request, let's load the device, along with its extended
+ // family (parents and children) in memory. This will keep this core in-sync with its paired core as
+ // much as possible. The watch feature in the core model will ensure that the contents of those objects in
+ // memory are in sync.
+ time.Sleep(2 * time.Second)
+ go handler.deviceMgr.load(dId.id)
+ } else if ldId, ok := id.(*logicalDeviceID); ok {
+ // This will load the logical device along with its children and grandchildren
+ go handler.logicalDeviceMgr.load(ldId.id)
+ }
+ }
return nil, errors.New("failed-to-seize-request")
}
}
@@ -173,7 +194,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -194,7 +215,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -216,7 +237,7 @@
if handler.competeForTransaction() {
if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -239,7 +260,7 @@
if handler.competeForTransaction() {
if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -324,7 +345,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
return &voltha.Device{}, err
} else {
defer txn.Close()
@@ -361,7 +382,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, handler.longRunningRequestTimeout); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -382,7 +403,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -403,7 +424,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -424,7 +445,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -446,7 +467,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
return &common.OperationResp{}, err
} else {
defer txn.Close()
@@ -537,7 +558,7 @@
failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx); err != nil {
+ if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
return failedresponse, err
} else {
defer txn.Close()
@@ -670,7 +691,8 @@
return nil
}
-func (handler *APIHandler) sendPacketIn(deviceId string, packet *openflow_13.OfpPacketIn) {
+func (handler *APIHandler) sendPacketIn(deviceId string, transationId string, packet *openflow_13.OfpPacketIn) {
+ // TODO: Augment the OF PacketIn to include the transactionId
packetIn := openflow_13.PacketIn{Id: deviceId, PacketIn: packet}
log.Debugw("sendPacketIn", log.Fields{"packetIn": packetIn})
// Enqueue the packet
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 7875da7..bda249f 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -66,63 +66,75 @@
}
// start creates the logical device and add it to the data model
-func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
- log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
- //Build the logical device based on information retrieved from the device adapter
- var switchCap *ic.SwitchCapability
- var err error
- if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
- log.Errorw("error-creating-logical-device", log.Fields{"error": err})
- return err
- }
-
- ld := &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
-
- // Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
- var datapathID uint64
- if datapathID, err = CreateDataPathId(agent.logicalDeviceId); err != nil {
- log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
- return err
- }
- ld.DatapathId = datapathID
- ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
- ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
- ld.Flows = &ofp.Flows{Items: nil}
- ld.FlowGroups = &ofp.FlowGroups{Items: nil}
-
- //Add logical ports to the logical device based on the number of NNI ports discovered
- //First get the default port capability - TODO: each NNI port may have different capabilities,
- //hence. may need to extract the port by the NNI port id defined by the adapter during device
- //creation
- var nniPorts *voltha.Ports
- if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
- log.Errorw("error-creating-logical-port", log.Fields{"error": err})
- }
- var portCap *ic.PortCapability
- for _, port := range nniPorts.Items {
- log.Infow("!!!!!!!NNI PORTS", log.Fields{"NNI": port})
- if portCap, err = agent.deviceMgr.getPortCapability(ctx, agent.rootDeviceId, port.PortNo); err != nil {
+func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromdB bool) error {
+ log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId, "loadFromdB": loadFromdB})
+ var ld *voltha.LogicalDevice
+ if !loadFromdB {
+ //Build the logical device based on information retrieved from the device adapter
+ var switchCap *ic.SwitchCapability
+ var err error
+ if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
log.Errorw("error-creating-logical-device", log.Fields{"error": err})
return err
}
- portCap.Port.RootPort = true
- lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
- lp.DeviceId = agent.rootDeviceId
- lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
- lp.OfpPort.PortNo = port.PortNo
- lp.OfpPort.Name = lp.Id
- lp.DevicePortNo = port.PortNo
- ld.Ports = append(ld.Ports, lp)
+
+ ld = &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
+
+ // Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
+ var datapathID uint64
+ if datapathID, err = CreateDataPathId(agent.logicalDeviceId); err != nil {
+ log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
+ return err
+ }
+ ld.DatapathId = datapathID
+ ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
+ ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
+ ld.Flows = &ofp.Flows{Items: nil}
+ ld.FlowGroups = &ofp.FlowGroups{Items: nil}
+
+ //Add logical ports to the logical device based on the number of NNI ports discovered
+ //First get the default port capability - TODO: each NNI port may have different capabilities,
+ //hence. may need to extract the port by the NNI port id defined by the adapter during device
+ //creation
+ var nniPorts *voltha.Ports
+ if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
+ log.Errorw("error-creating-logical-port", log.Fields{"error": err})
+ }
+ var portCap *ic.PortCapability
+ for _, port := range nniPorts.Items {
+ log.Infow("!!!!!!!NNI PORTS", log.Fields{"NNI": port})
+ if portCap, err = agent.deviceMgr.getPortCapability(ctx, agent.rootDeviceId, port.PortNo); err != nil {
+ log.Errorw("error-creating-logical-device", log.Fields{"error": err})
+ return err
+ }
+ portCap.Port.RootPort = true
+ lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+ lp.DeviceId = agent.rootDeviceId
+ lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
+ lp.OfpPort.PortNo = port.PortNo
+ lp.OfpPort.Name = lp.Id
+ lp.DevicePortNo = port.PortNo
+ ld.Ports = append(ld.Ports, lp)
+ }
+ agent.lockLogicalDevice.Lock()
+ //defer agent.lockLogicalDevice.Unlock()
+ // Save the logical device
+ if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
+ log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ } else {
+ log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ }
+ agent.lockLogicalDevice.Unlock()
+ } else {
+ // load from dB - the logical may not exist at this time. On error, just return and the calling function
+ // will destroy this agent.
+ var err error
+ if ld, err = agent.GetLogicalDevice(); err != nil {
+ log.Warnw("failed-to-load-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ return err
+ }
}
agent.lockLogicalDevice.Lock()
- //defer agent.lockLogicalDevice.Unlock()
- // Save the logical device
- if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
- log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
- } else {
- log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
- }
-
agent.flowProxy = agent.clusterDataProxy.Root.CreateProxy(
fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
false)
@@ -1033,9 +1045,9 @@
}
}
-func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
- log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
+func (agent *LogicalDeviceAgent) packetIn(port uint32, transactionId string, packet []byte) {
+ log.Debugw("packet-in", log.Fields{"port": port, "packet": packet, "transactionId": transactionId})
packetIn := fd.MkPacketIn(port, packet)
- agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, packetIn)
+ agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, transactionId, packetIn)
log.Debugw("sending-packet-in", log.Fields{"packet-in": packetIn})
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 18bc30a..b4dc7ea 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -113,7 +113,7 @@
func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
log.Debug("ListAllLogicalDevices")
result := &voltha.LogicalDevices{}
- if logicalDevices := ldMgr.clusterDataProxy.Get("/logical_devices", 0, false, ""); logicalDevices != nil {
+ if logicalDevices := ldMgr.clusterDataProxy.List("/logical_devices", 0, false, ""); logicalDevices != nil {
for _, logicalDevice := range logicalDevices.([]interface{}) {
if agent := ldMgr.getLogicalDeviceAgent(logicalDevice.(*voltha.LogicalDevice).Id); agent == nil {
agent = newLogicalDeviceAgent(
@@ -124,7 +124,7 @@
ldMgr.clusterDataProxy,
)
ldMgr.addLogicalDeviceAgentToMap(agent)
- go agent.start(nil)
+ go agent.start(nil, true)
}
result.Items = append(result.Items, logicalDevice.(*voltha.LogicalDevice))
}
@@ -166,12 +166,32 @@
agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
ldMgr.addLogicalDeviceAgentToMap(agent)
- go agent.start(ctx)
+ go agent.start(ctx, false)
log.Debug("creating-logical-device-ends")
return &id, nil
}
+// load loads a logical device manager in memory
+func (ldMgr *LogicalDeviceManager) load(lDeviceId string) error {
+ log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
+ // To prevent a race condition, let's hold the logical device agent map lock. This will prevent a loading and
+ // a create logical device callback from occurring at the same time.
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ if ldAgent, _ := ldMgr.logicalDeviceAgents[lDeviceId]; ldAgent == nil {
+ // Logical device not in memory - create a temp logical device Agent and let it load from memory
+ agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+ if err := agent.start(nil, true); err != nil {
+ agent.stop(nil)
+ return err
+ }
+ ldMgr.logicalDeviceAgents[agent.logicalDeviceId] = agent
+ }
+ // TODO: load the child device
+ return nil
+}
+
func (ldMgr *LogicalDeviceManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("deleting-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
@@ -365,10 +385,10 @@
}
}
-func (ldMgr *LogicalDeviceManager) packetIn(logicalDeviceId string, port uint32, packet []byte) error {
+func (ldMgr *LogicalDeviceManager) packetIn(logicalDeviceId string, port uint32, transactionId string, packet []byte) error {
log.Debugw("packetIn", log.Fields{"logicalDeviceId": logicalDeviceId, "port": port})
if agent := ldMgr.getLogicalDeviceAgent(logicalDeviceId); agent != nil {
- agent.packetIn(port, packet)
+ agent.packetIn(port, transactionId, packet)
} else {
log.Error("logical-device-not-exist", log.Fields{"logicalDeviceId": logicalDeviceId})
}
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index a6b90aa..f702633 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -497,7 +497,7 @@
return 0
}
if md <= 0xffffffff {
- log.Warnw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
+ log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
return md
}
return md & 0xffffffff
@@ -512,7 +512,7 @@
return 0
}
if md <= 0xffffffff {
- log.Warnw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
+ log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
return md
}
return (md >> 32) & 0xffffffff
@@ -1256,20 +1256,17 @@
return deviceRules
}
- var ingressDevice *voltha.Device
- var err error
- if ingressDevice, err = fd.deviceMgr.GetDevice(route[0].DeviceID); err != nil {
- log.Errorw("ingress-device-not-found", log.Fields{"deviceId": route[0].DeviceID})
- return deviceRules
- }
-
- isDownstream := ingressDevice.Root
- isUpstream := !isDownstream
-
// Process controller bound flow
if outPortNo != 0 && (outPortNo&0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER) {
deviceRules = fd.processControllerBoundFlow(agent, route, inPortNo, outPortNo, flow)
} else {
+ var ingressDevice *voltha.Device
+ var err error
+ if ingressDevice, err = fd.deviceMgr.GetDevice(route[0].DeviceID); err != nil {
+ log.Errorw("ingress-device-not-found", log.Fields{"deviceId": route[0].DeviceID, "flow": flow})
+ return deviceRules
+ }
+ isUpstream := !ingressDevice.Root
if isUpstream {
deviceRules = fd.processUpstreamNonControllerBoundFlow(agent, route, inPortNo, outPortNo, flow)
} else if HasNextTable(flow) {
diff --git a/tests/core/grpc_nbi_api_handler_client_test.go b/tests/core/api/grpc_nbi_api_handler_client_test.go
similarity index 99%
rename from tests/core/grpc_nbi_api_handler_client_test.go
rename to tests/core/api/grpc_nbi_api_handler_client_test.go
index 420bf39..ef0defa 100644
--- a/tests/core/grpc_nbi_api_handler_client_test.go
+++ b/tests/core/api/grpc_nbi_api_handler_client_test.go
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package core
+package api
import (
"context"
diff --git a/tests/core/concurrency/core_concurrency_test.go b/tests/core/concurrency/core_concurrency_test.go
new file mode 100644
index 0000000..d781ffc
--- /dev/null
+++ b/tests/core/concurrency/core_concurrency_test.go
@@ -0,0 +1,379 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package concurrency
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/google/uuid"
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/protos/common"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+ "os"
+ "os/exec"
+ "strings"
+ "testing"
+)
+
+var conns []*grpc.ClientConn
+var stubs []voltha.VolthaServiceClient
+var volthaSerialNumberKey string
+var grpcPorts []int
+
+/*
+ This series of tests are executed with two RW_Cores
+*/
+
+var devices map[string]*voltha.Device
+
+func setup() {
+ var err error
+
+ if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+ log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+ log.SetAllLogLevel(log.ErrorLevel)
+
+ grpcPorts = []int{50057, 50058}
+ stubs = make([]voltha.VolthaServiceClient, 0)
+ conns = make([]*grpc.ClientConn, 0)
+
+ volthaSerialNumberKey = "voltha_serial_number"
+ devices = make(map[string]*voltha.Device)
+}
+
+func connectToCore(port int) (voltha.VolthaServiceClient, error) {
+ grpcHostIP := os.Getenv("DOCKER_HOST_IP")
+ grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, port)
+ conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
+ if err != nil {
+ log.Fatalf("did not connect: %s", err)
+ return nil, errors.New("failure-to-connect")
+ }
+ conns = append(conns, conn)
+ return voltha.NewVolthaServiceClient(conn), nil
+}
+
+func setupGrpcConnection() []voltha.VolthaServiceClient {
+ // We have 2 concurrent cores. Connect to them
+ for _, port := range grpcPorts {
+ if client, err := connectToCore(port); err == nil {
+ stubs = append(stubs, client)
+ log.Infow("connected", log.Fields{"port": port})
+ }
+ }
+ return stubs
+}
+
+func clearAllDevices(clearMap bool) {
+ for key, _ := range devices {
+ ctx := context.Background()
+ response, err := stubs[1].DeleteDevice(ctx, &voltha.ID{Id: key})
+ log.Infow("response", log.Fields{"res": response, "error": err})
+ if clearMap {
+ delete(devices, key)
+ }
+ }
+}
+
+// Verify if all ids are present in the global list of devices
+func hasAllIds(ids *voltha.IDs) bool {
+ if ids == nil && len(devices) == 0 {
+ return true
+ }
+ if ids == nil {
+ return false
+ }
+ for _, id := range ids.Items {
+ if _, exist := devices[id.Id]; !exist {
+ return false
+ }
+ }
+ return true
+}
+
+func startKafka() {
+ fmt.Println("Starting Kafka and Etcd ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
+ if err := cmd.Run(); err != nil {
+ log.Fatal(err)
+ }
+}
+
+func startEtcd() {
+ fmt.Println("Starting Etcd ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "up", "-d")
+ if err := cmd.Run(); err != nil {
+ log.Fatal(err)
+ }
+}
+
+func stopKafka() {
+ fmt.Println("Stopping Kafka and Etcd ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "down")
+ if err := cmd.Run(); err != nil {
+ // ignore error - as this is mostly due network being left behind as its being used by other
+ // containers
+ log.Warn(err)
+ }
+}
+
+func stopEtcd() {
+ fmt.Println("Stopping Etcd ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "down")
+ if err := cmd.Run(); err != nil {
+ // ignore error - as this is mostly due network being left behind as its being used by other
+ // containers
+ log.Warn(err)
+ }
+}
+
+func startCores() {
+ fmt.Println("Starting voltha cores ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "up", "-d")
+ if err := cmd.Run(); err != nil {
+ log.Fatal(err)
+ }
+}
+
+func stopCores() {
+ fmt.Println("Stopping voltha cores ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "down")
+ if err := cmd.Run(); err != nil {
+ // ignore error - as this is mostly due network being left behind as its being used by other
+ // containers
+ log.Warn(err)
+ }
+}
+
+func startSimulatedOLTAndONUAdapters() {
+ fmt.Println("Starting simulated OLT and ONU adapters ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "up", "-d")
+ if err := cmd.Run(); err != nil {
+ log.Fatal(err)
+ }
+}
+
+func stopSimulatedOLTAndONUAdapters() {
+ fmt.Println("Stopping simulated OLT and ONU adapters ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "down")
+ if err := cmd.Run(); err != nil {
+ // ignore error - as this is mostly due network being left behind as its being used by other
+ // containers
+ log.Warn(err)
+ }
+}
+
+
+func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{} ) {
+ fmt.Println("Sending create device ...")
+ if response, err := stub.CreateDevice(ctx, device); err != nil {
+ ch <- err
+ } else {
+ ch <- response
+ }
+}
+
+func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{} ) {
+ fmt.Println("Sending enable device ...")
+ if response, err := stub.EnableDevice(ctx, &common.ID{Id:deviceId}); err != nil {
+ ch <- err
+ } else {
+ ch <- response
+ }
+}
+
+//// createPonsimDevice sends two requests to each core and waits for both responses
+//func createPonsimDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
+// ui := uuid.New()
+// ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+// //preprovision_olt -t ponsim_olt -H 172.20.0.11:50060
+// device := &voltha.Device{Type: "ponsim_olt"}
+// device.Address = &voltha.Device_HostAndPort{HostAndPort:"172.20.0.11:50060"}
+// ch := make(chan interface{})
+// defer close(ch)
+// requestNum := 0
+// for _, stub := range stubs {
+// go sendCreateDeviceRequest(ctx, stub, device, ch)
+// requestNum += 1
+// }
+// fmt.Println("Waiting for create device response ...")
+// receivedResponse := 0
+// var err error
+// var returnedDevice *voltha.Device
+// select {
+// case res, ok := <-ch:
+// receivedResponse += 1
+// if !ok {
+// } else if er, ok := res.(error); ok {
+// err = er
+// } else if d, ok := res.(*voltha.Device); ok {
+// returnedDevice = d
+// }
+// if receivedResponse == requestNum {
+// break
+// }
+// }
+// if returnedDevice != nil {
+// return returnedDevice, nil
+// }
+// return nil, err
+//}
+
+// createDevice sends two requests to each core and waits for both responses
+func createDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+ randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
+ device := &voltha.Device{Type: "simulated_olt", MacAddress:randomMacAddress}
+ ch := make(chan interface{})
+ defer close(ch)
+ requestNum := 0
+ for _, stub := range stubs {
+ go sendCreateDeviceRequest(ctx, stub, device, ch)
+ requestNum += 1
+ }
+ fmt.Println("Waiting for create device response ...")
+ receivedResponse := 0
+ var err error
+ var returnedDevice *voltha.Device
+ select {
+ case res, ok := <-ch:
+ receivedResponse += 1
+ if !ok {
+ } else if er, ok := res.(error); ok {
+ err = er
+ } else if d, ok := res.(*voltha.Device); ok {
+ returnedDevice = d
+ }
+ if receivedResponse == requestNum {
+ break
+ }
+ }
+ if returnedDevice != nil {
+ return returnedDevice, nil
+ }
+ return nil, err
+}
+
+// enableDevices sends two requests to each core for each device and waits for both responses before sending another
+// enable request for a different device.
+func enableAllDevices(stubs []voltha.VolthaServiceClient) error {
+ for deviceId, val := range devices {
+ if val.AdminState == voltha.AdminState_PREPROVISIONED {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+ ch := make(chan interface{})
+ defer close(ch)
+ requestNum := 0
+ for _, stub := range stubs {
+ go sendEnableDeviceRequest(ctx, stub, deviceId, ch)
+ requestNum +=1
+ }
+ receivedResponse := 0
+ var err error
+ fmt.Println("Waiting for enable device response ...")
+ validResponseReceived := false
+ select {
+ case res, ok := <-ch:
+ receivedResponse += 1
+ if !ok {
+ } else if er, ok := res.(error); ok {
+ err = er
+ } else if _ , ok := res.(*empty.Empty); ok {
+ validResponseReceived = true
+ }
+ if receivedResponse == requestNum {
+ break
+ }
+ }
+ if validResponseReceived {
+ return nil
+ }
+ return err
+ }
+ }
+ return nil
+}
+
+
+func TestConcurrentRequests(t *testing.T) {
+ fmt.Println("Testing Concurrent requests ...")
+
+ ////0. Start kafka and Ectd
+ //startKafka()
+ //startEtcd()
+ //
+ ////1. Start the core
+ //startCores()
+ //
+ ////2. Start the simulated adapters
+ //startSimulatedOLTAndONUAdapters()
+ //
+ //// Wait until the core and adapters sync up
+ //time.Sleep(10 * time.Second)
+
+ stubs = setupGrpcConnection()
+
+ //3. Create the devices
+ response, err := createDevice(stubs)
+ log.Infow("response", log.Fields{"res": response, "error": err})
+ assert.Nil(t, err)
+ devices[response.Id] = response
+
+ //4. Enable all the devices
+ err = enableAllDevices(stubs)
+ assert.Nil(t, err)
+
+ ////5. Store simulated adapters
+ //stopSimulatedOLTAndONUAdapters()
+ //
+ ////6. Store the core
+ //stopCores()
+ //
+ ////7. Stop Kafka and Etcd
+ //stopKafka()
+ //stopEtcd()
+}
+
+
+func shutdown() {
+ for _, conn := range conns {
+ conn.Close()
+ }
+}
+
+func TestMain(m *testing.M) {
+ setup()
+ code := m.Run()
+ shutdown()
+ os.Exit(code)
+}