[VOL-1588] Improve Flow Add performance
This update consists of the following:
1) Update the performance when adding a flow to a logical device,
decomposing the flow into parent and child device and sending the
flow to the adapters.
2) Format a number of files as per GO fmt.
3) Ensure the device graph cache gets updated when a new port is
added to the graph that belongs to an existing device in cache.
The flow update/deletion performance will be addressed in a separate
commit.
Change-Id: I2eb663cc73eef9fc6172203ed88a35726f5fe008
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index 738a77a..11a22ab 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -34,7 +34,6 @@
coreTopic string
deviceIdCoreMap map[string]string
lockDeviceIdCoreMap sync.RWMutex
-
}
func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
@@ -185,7 +184,7 @@
}
func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
- childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64 ) error {
+ childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) error {
log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
rpc := "ChildDeviceDetected"
// Use a device specific topic to send the request. The adapter handling the device creates a device
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index d16ad95..dd8df8c 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -23,8 +23,8 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/kafka"
ic "github.com/opencord/voltha-protos/go/inter_container"
+ "github.com/opencord/voltha-protos/go/openflow_13"
"github.com/opencord/voltha-protos/go/voltha"
- "github.com/opencord/voltha-protos/go/openflow_13"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -33,7 +33,7 @@
TestMode bool
coreInstanceId string
adapter adapters.IAdapter
- coreProxy *CoreProxy
+ coreProxy *CoreProxy
}
func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter, cProxy *CoreProxy) *RequestHandlerProxy {
@@ -135,7 +135,7 @@
}
func (rhp *RequestHandlerProxy) Update_flows_incrementally(args []*ic.Argument) (*empty.Empty, error) {
- log.Debug("Update_flows_incrementally")
+ log.Debug("Update_flows_incrementally")
if len(args) < 3 {
log.Warn("Update_flows_incrementally-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -143,8 +143,8 @@
}
device := &voltha.Device{}
transactionID := &ic.StrType{}
- flows := &openflow_13.FlowChanges{}
- groups := &openflow_13.FlowGroupChanges{}
+ flows := &openflow_13.FlowChanges{}
+ groups := &openflow_13.FlowGroupChanges{}
for _, arg := range args {
switch arg.Key {
case "device":
@@ -169,11 +169,11 @@
}
}
}
- log.Debugw("Update_flows_incrementally",log.Fields{"flows":flows,"groups":groups})
- //Invoke the adopt device on the adapter
- if err := rhp.adapter.Update_flows_incrementally(device,flows,groups); err != nil {
- return nil, status.Errorf(codes.NotFound, "%s", err.Error())
- }
+ log.Debugw("Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
+ //Invoke the adopt device on the adapter
+ if err := rhp.adapter.Update_flows_incrementally(device, flows, groups); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
diff --git a/adapters/common/utils.go b/adapters/common/utils.go
index 810a3d0..d3c562a 100644
--- a/adapters/common/utils.go
+++ b/adapters/common/utils.go
@@ -70,4 +70,4 @@
remain--
}
return string(b)
-}
\ No newline at end of file
+}
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index bc70a82..029f032 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -212,22 +212,22 @@
return errors.New("UnImplemented")
}
-func (so *SimulatedOLT) Download_image(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+func (so *SimulatedOLT) Download_image(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
return nil, errors.New("UnImplemented")
}
-func (so *SimulatedOLT) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+func (so *SimulatedOLT) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
return nil, errors.New("UnImplemented")
}
-func (so *SimulatedOLT) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+func (so *SimulatedOLT) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
return nil, errors.New("UnImplemented")
}
-func (so *SimulatedOLT) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+func (so *SimulatedOLT) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
return nil, errors.New("UnImplemented")
}
-func (so *SimulatedOLT) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+func (so *SimulatedOLT) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
return nil, errors.New("UnImplemented")
}
diff --git a/adapters/simulated_onu/adaptercore/device_handler.go b/adapters/simulated_onu/adaptercore/device_handler.go
index 217614c..ec00426 100644
--- a/adapters/simulated_onu/adaptercore/device_handler.go
+++ b/adapters/simulated_onu/adaptercore/device_handler.go
@@ -139,7 +139,7 @@
AdminState: voltha.AdminState_ENABLED,
OperStatus: voltha.OperStatus_ACTIVE,
Peers: []*voltha.Port_PeerPort{{DeviceId: cloned.ParentId, // Peer device is OLT
- PortNo: uni_port}}, // Peer port is UNI port
+ PortNo: uni_port}}, // Peer port is UNI port
}
// Synchronous call to update device - this method is run in its own go routine
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index 738ca92..2d02342 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -498,11 +498,10 @@
}
}
-
-func (c *ConsulClient) AcquireLock(lockName string, timeout int) error {
+func (c *ConsulClient) AcquireLock(lockName string, timeout int) error {
return nil
}
-func (c *ConsulClient) ReleaseLock(lockName string) error {
+func (c *ConsulClient) ReleaseLock(lockName string) error {
return nil
-}
\ No newline at end of file
+}
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 480c651..0b97039 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -251,7 +251,7 @@
// ReleaseReservation releases reservation for a specific key.
func (c *EtcdClient) ReleaseReservation(key string) error {
// Get the leaseid using the key
- log.Debugw("Release-reservation", log.Fields{"key":key})
+ log.Debugw("Release-reservation", log.Fields{"key": key})
var ok bool
var leaseID *v3Client.LeaseID
c.writeLock.Lock()
@@ -458,6 +458,7 @@
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
mu := v3Concurrency.NewMutex(session, "/devicelock_"+lockName)
if err := mu.Lock(context.Background()); err != nil {
+ cancel()
return err
}
c.addLockName(lockName, mu, session)
diff --git a/kafka/client.go b/kafka/client.go
index a4c49ca..3d37f6e 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -32,7 +32,7 @@
const (
GroupIdKey = "groupId"
- Offset = "offset"
+ Offset = "offset"
)
const (
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index b9c03e6..afad2ac 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -549,7 +549,7 @@
Type: ic.MessageType_RESPONSE,
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
- KeyTopic: request.Header.KeyTopic,
+ KeyTopic: request.Header.KeyTopic,
Timestamp: time.Now().UnixNano(),
}
@@ -706,7 +706,7 @@
key := msg.Header.KeyTopic
log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
// TODO: handle error response.
- go kp.kafkaClient.Send(icm, replyTopic, key)
+ go kp.kafkaClient.Send(icm, replyTopic, key)
}
} else if msg.Header.Type == ic.MessageType_RESPONSE {
log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
@@ -763,7 +763,7 @@
Type: ic.MessageType_REQUEST,
FromTopic: replyTopic.Name,
ToTopic: toTopic.Name,
- KeyTopic: key,
+ KeyTopic: key,
Timestamp: time.Now().UnixNano(),
}
requestBody := &ic.InterContainerRequestBody{
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index add1900..e920a83 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -52,7 +52,7 @@
producer sarama.AsyncProducer
consumer sarama.Consumer
groupConsumers map[string]*scc.Consumer
- lockOfGroupConsumers sync.RWMutex
+ lockOfGroupConsumers sync.RWMutex
consumerGroupPrefix string
consumerType int
consumerGroupName string
@@ -454,7 +454,6 @@
// Send message to kafka
sc.producer.Input() <- kafkaMsg
-
// Wait for result
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
@@ -920,7 +919,6 @@
return channels
}
-
func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
sc.lockOfGroupConsumers.Lock()
defer sc.lockOfGroupConsumers.Unlock()
@@ -935,7 +933,7 @@
if _, exist := sc.groupConsumers[topic]; exist {
consumer := sc.groupConsumers[topic]
delete(sc.groupConsumers, topic)
- if err := consumer.Close(); err!= nil {
+ if err := consumer.Close(); err != nil {
log.Errorw("failure-closing-consumer", log.Fields{"error": err})
return err
}
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index f87d8ce..a282b9b 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,63 +24,65 @@
// RW Core service default constants
const (
- ConsulStoreName = "consul"
- EtcdStoreName = "etcd"
- default_InstanceID = "rwcore001"
- default_GrpcPort = 50057
- default_GrpcHost = ""
- default_KafkaAdapterHost = "127.0.0.1"
- default_KafkaAdapterPort = 9092
- default_KafkaClusterHost = "127.0.0.1"
- default_KafkaClusterPort = 9094
- default_KVStoreType = EtcdStoreName
- default_KVStoreTimeout = 5 //in seconds
- default_KVStoreHost = "127.0.0.1"
- default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
- default_KVTxnKeyDelTime = 60
- default_KVStoreDataPrefix = "service/voltha"
- default_LogLevel = 0
- default_Banner = false
- default_CoreTopic = "rwcore"
- default_RWCoreEndpoint = "rwcore"
- default_RWCoreKey = "pki/voltha.key"
- default_RWCoreCert = "pki/voltha.crt"
- default_RWCoreCA = "pki/voltha-CA.pem"
- default_AffinityRouterTopic = "affinityRouter"
- default_InCompetingMode = true
+ ConsulStoreName = "consul"
+ EtcdStoreName = "etcd"
+ default_InstanceID = "rwcore001"
+ default_GrpcPort = 50057
+ default_GrpcHost = ""
+ default_KafkaAdapterHost = "127.0.0.1"
+ default_KafkaAdapterPort = 9092
+ default_KafkaClusterHost = "127.0.0.1"
+ default_KafkaClusterPort = 9094
+ default_KVStoreType = EtcdStoreName
+ default_KVStoreTimeout = 5 //in seconds
+ default_KVStoreHost = "127.0.0.1"
+ default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
+ default_KVTxnKeyDelTime = 60
+ default_KVStoreDataPrefix = "service/voltha"
+ default_LogLevel = 0
+ default_Banner = false
+ default_CoreTopic = "rwcore"
+ default_RWCoreEndpoint = "rwcore"
+ default_RWCoreKey = "pki/voltha.key"
+ default_RWCoreCert = "pki/voltha.crt"
+ default_RWCoreCA = "pki/voltha-CA.pem"
+ default_AffinityRouterTopic = "affinityRouter"
+ default_InCompetingMode = true
default_LongRunningRequestTimeout = int64(2000)
- default_DefaultRequestTimeout = int64(500)
- default_CoreBindingKey = "voltha_backend_name"
+ default_DefaultRequestTimeout = int64(500)
+ default_CoreTimeout = int64(500)
+ default_CoreBindingKey = "voltha_backend_name"
)
// RWCoreFlags represents the set of configurations used by the read-write core service
type RWCoreFlags struct {
// Command line parameters
- InstanceID string
- RWCoreEndpoint string
- GrpcHost string
- GrpcPort int
- KafkaAdapterHost string
- KafkaAdapterPort int
- KafkaClusterHost string
- KafkaClusterPort int
- KVStoreType string
- KVStoreTimeout int // in seconds
- KVStoreHost string
- KVStorePort int
- KVTxnKeyDelTime int
- KVStoreDataPrefix string
- CoreTopic string
- LogLevel int
- Banner bool
- RWCoreKey string
- RWCoreCert string
- RWCoreCA string
- AffinityRouterTopic string
- InCompetingMode bool
+ InstanceID string
+ RWCoreEndpoint string
+ GrpcHost string
+ GrpcPort int
+ KafkaAdapterHost string
+ KafkaAdapterPort int
+ KafkaClusterHost string
+ KafkaClusterPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ KVTxnKeyDelTime int
+ KVStoreDataPrefix string
+ CoreTopic string
+ LogLevel int
+ Banner bool
+ RWCoreKey string
+ RWCoreCert string
+ RWCoreCA string
+ AffinityRouterTopic string
+ InCompetingMode bool
LongRunningRequestTimeout int64
- DefaultRequestTimeout int64
- CoreBindingKey string
+ DefaultRequestTimeout int64
+ DefaultCoreTimeout int64
+ CoreBindingKey string
}
func init() {
@@ -90,31 +92,32 @@
// NewRWCoreFlags returns a new RWCore config
func NewRWCoreFlags() *RWCoreFlags {
var rwCoreFlag = RWCoreFlags{ // Default values
- InstanceID: default_InstanceID,
- RWCoreEndpoint: default_RWCoreEndpoint,
- GrpcHost: default_GrpcHost,
- GrpcPort: default_GrpcPort,
- KafkaAdapterHost: default_KafkaAdapterHost,
- KafkaAdapterPort: default_KafkaAdapterPort,
- KafkaClusterHost: default_KafkaClusterHost,
- KafkaClusterPort: default_KafkaClusterPort,
- KVStoreType: default_KVStoreType,
- KVStoreTimeout: default_KVStoreTimeout,
- KVStoreHost: default_KVStoreHost,
- KVStorePort: default_KVStorePort,
- KVStoreDataPrefix: default_KVStoreDataPrefix,
- KVTxnKeyDelTime: default_KVTxnKeyDelTime,
- CoreTopic: default_CoreTopic,
- LogLevel: default_LogLevel,
- Banner: default_Banner,
- RWCoreKey: default_RWCoreKey,
- RWCoreCert: default_RWCoreCert,
- RWCoreCA: default_RWCoreCA,
- AffinityRouterTopic: default_AffinityRouterTopic,
- InCompetingMode: default_InCompetingMode,
- DefaultRequestTimeout:default_DefaultRequestTimeout,
- LongRunningRequestTimeout:default_LongRunningRequestTimeout,
- CoreBindingKey: default_CoreBindingKey,
+ InstanceID: default_InstanceID,
+ RWCoreEndpoint: default_RWCoreEndpoint,
+ GrpcHost: default_GrpcHost,
+ GrpcPort: default_GrpcPort,
+ KafkaAdapterHost: default_KafkaAdapterHost,
+ KafkaAdapterPort: default_KafkaAdapterPort,
+ KafkaClusterHost: default_KafkaClusterHost,
+ KafkaClusterPort: default_KafkaClusterPort,
+ KVStoreType: default_KVStoreType,
+ KVStoreTimeout: default_KVStoreTimeout,
+ KVStoreHost: default_KVStoreHost,
+ KVStorePort: default_KVStorePort,
+ KVStoreDataPrefix: default_KVStoreDataPrefix,
+ KVTxnKeyDelTime: default_KVTxnKeyDelTime,
+ CoreTopic: default_CoreTopic,
+ LogLevel: default_LogLevel,
+ Banner: default_Banner,
+ RWCoreKey: default_RWCoreKey,
+ RWCoreCert: default_RWCoreCert,
+ RWCoreCA: default_RWCoreCA,
+ AffinityRouterTopic: default_AffinityRouterTopic,
+ InCompetingMode: default_InCompetingMode,
+ DefaultRequestTimeout: default_DefaultRequestTimeout,
+ LongRunningRequestTimeout: default_LongRunningRequestTimeout,
+ DefaultCoreTimeout: default_CoreTimeout,
+ CoreBindingKey: default_CoreBindingKey,
}
return &rwCoreFlag
}
@@ -184,6 +187,9 @@
help = fmt.Sprintf("Default timeout for regular request")
flag.Int64Var(&(cf.DefaultRequestTimeout), "timeout_request", default_DefaultRequestTimeout, help)
+ help = fmt.Sprintf("Default Core timeout")
+ flag.Int64Var(&(cf.DefaultCoreTimeout), "core_timeout", default_CoreTimeout, help)
+
help = fmt.Sprintf("Show startup banner log lines")
flag.BoolVar(&cf.Banner, "banner", default_Banner, help)
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 07a4826..5d539aa 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -164,7 +164,7 @@
} else {
log.Debug("no-existing-device-type-found")
// No device types data. In order to have a proxy setup for that path let's create a fake device type
- aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{&voltha.DeviceType{Id: SENTINEL_DEVICETYPE_ID, Adapter: SENTINEL_ADAPTER_ID}}}, true)
+ aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{{Id: SENTINEL_DEVICETYPE_ID, Adapter: SENTINEL_ADAPTER_ID}}}, true)
}
}
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index d933466..eeebd0d 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -88,7 +88,7 @@
return nil, errors.New("fail-to-create-transaction")
}
- if rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id:devId}) {
+ if rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: devId}) {
log.Debugw("owned-by-me", log.Fields{"Id": devId})
if txn.Acquired(timeout) {
log.Debugw("processing-request", log.Fields{"Id": devId})
@@ -789,9 +789,6 @@
return nil, nil
}
go rhp.deviceMgr.addPort(deviceId.Id, port)
- //if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
- // return nil, err
- //}
return new(empty.Empty), nil
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index a504f34..f03c7d2 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -24,8 +24,8 @@
"github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/kafka"
- "github.com/opencord/voltha-protos/go/voltha"
"github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -38,7 +38,7 @@
logicalDeviceMgr *LogicalDeviceManager
grpcServer *grpcserver.GrpcServer
grpcNBIAPIHandler *APIHandler
- adapterMgr *AdapterManager
+ adapterMgr *AdapterManager
config *config.RWCoreFlags
kmp *kafka.InterContainerProxy
clusterDataRoot model.Root
@@ -48,9 +48,9 @@
exitChannel chan int
kvClient kvstore.Client
kafkaClient kafka.Client
- coreMembership *voltha.Membership
- membershipLock *sync.RWMutex
- deviceOwnership *DeviceOwnership
+ coreMembership *voltha.Membership
+ membershipLock *sync.RWMutex
+ deviceOwnership *DeviceOwnership
}
func init() {
@@ -92,7 +92,7 @@
log.Info("values", log.Fields{"kmp": core.kmp})
core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
core.deviceMgr = newDeviceManager(core)
- core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy)
+ core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy, core.config.DefaultCoreTimeout)
if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
log.Fatal("Failure-registering-adapterRequestHandler")
@@ -170,17 +170,6 @@
return nil
}
-//func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
-// ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
-// ) error {
-// requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
-// core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
-// core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
-//
-// log.Info("request-handlers")
-// return nil
-//}
-
func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
) error {
@@ -197,7 +186,6 @@
return nil
}
-
func (core *Core) isMembershipRegistrationComplete() bool {
core.membershipLock.RLock()
defer core.membershipLock.RUnlock()
@@ -219,7 +207,6 @@
core.coreMembership = membership
-
// Use the group name to register a specific kafka topic for this container
go func(groupName string) {
// Register the core-pair topic to handle core-bound requests destined to the core pair
@@ -242,7 +229,6 @@
return core.coreMembership
}
-
func (core *Core) startDeviceManager(ctx context.Context) {
// TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
// callbacks. For now, until the model is ready, devicemanager will keep a reference to the
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 9704fff..836269e 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -17,7 +17,6 @@
import (
"context"
- "fmt"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
@@ -34,6 +33,7 @@
type DeviceAgent struct {
deviceId string
deviceType string
+ isRootdevice bool
lastData *voltha.Device
adapterProxy *AdapterProxy
adapterMgr *AdapterManager
@@ -41,14 +41,13 @@
clusterDataProxy *model.Proxy
deviceProxy *model.Proxy
exitChannel chan int
- flowProxy *model.Proxy
- groupProxy *model.Proxy
lockDevice sync.RWMutex
+ defaultTimeout int64
}
//newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
//preprovisioning
-func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
+func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout int64) *DeviceAgent {
var agent DeviceAgent
agent.adapterProxy = ap
cloned := (proto.Clone(device)).(*voltha.Device)
@@ -63,6 +62,7 @@
// overwritten by the child adapter during a device update request
cloned.Vlan = device.ProxyAddress.ChannelId
}
+ agent.isRootdevice = device.Root
agent.deviceId = cloned.Id
agent.deviceType = cloned.Type
agent.lastData = cloned
@@ -71,6 +71,7 @@
agent.exitChannel = make(chan int, 1)
agent.clusterDataProxy = cdProxy
agent.lockDevice = sync.RWMutex{}
+ agent.defaultTimeout = timeout
return &agent
}
@@ -101,16 +102,6 @@
agent.deviceProxy = agent.clusterDataProxy.CreateProxy("/devices/"+agent.deviceId, false)
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
- agent.flowProxy = agent.clusterDataProxy.CreateProxy(
- fmt.Sprintf("/devices/%s/flows", agent.deviceId),
- false)
- agent.groupProxy = agent.clusterDataProxy.CreateProxy(
- fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
- false)
-
- agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
- agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
-
log.Debug("device-agent-started")
return nil
}
@@ -201,38 +192,117 @@
return nil
}
-func (agent *DeviceAgent) updateFlows(flows []*ofp.OfpFlowStats) error {
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
- log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows})
- var oldData *voltha.Flows
- if storedData, err := agent.getDeviceWithoutLock(); err != nil {
- return status.Errorf(codes.NotFound, "%s", agent.deviceId)
- } else {
- oldData = proto.Clone(storedData.Flows).(*voltha.Flows)
- log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows, "old": oldData})
-
- // store the changed data
- afterUpdate := agent.flowProxy.Update("/", &ofp.Flows{Items: flows}, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
-
- return nil
+func (agent *DeviceAgent) updateDeviceWithoutLockAsync(device *voltha.Device, ch chan interface{}) {
+ if err := agent.updateDeviceWithoutLock(device); err != nil {
+ ch <- status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceId)
}
+ ch <- nil
}
-func (agent *DeviceAgent) updateGroups(groups []*ofp.OfpGroupEntry) error {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, ch chan interface{}) {
+ if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups); err != nil {
+ log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ ch <- err
+ }
+ ch <- nil
+}
+
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, ch chan interface{}) {
+ if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups); err != nil {
+ log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ ch <- err
+ }
+ ch <- nil
+}
+
+func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry) error {
+ if (len(newFlows) | len(newGroups)) == 0 {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+ return nil
+ }
+
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- log.Debugw("updateGroups", log.Fields{"deviceId": agent.deviceId, "groups": groups})
- if _, err := agent.getDeviceWithoutLock(); err != nil {
+ log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+ var existingFlows *voltha.Flows
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
+ existingFlows = proto.Clone(device.Flows).(*voltha.Flows)
+ existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+ log.Debugw("addFlows", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "existingFlows": existingFlows, "groups": newGroups, "existingGroups": existingGroups})
+
+ var updatedFlows []*ofp.OfpFlowStats
+ var flowsToDelete []*ofp.OfpFlowStats
+ var groupsToDelete []*ofp.OfpGroupEntry
+ var updatedGroups []*ofp.OfpGroupEntry
+
+ // Process flows
+ for _, flow := range newFlows {
+ updatedFlows = append(updatedFlows, flow)
+ }
+
+ for _, flow := range existingFlows.Items {
+ if idx := fu.FindFlows(newFlows, flow); idx == -1 {
+ updatedFlows = append(updatedFlows, flow)
+ } else {
+ flowsToDelete = append(flowsToDelete, flow)
+ }
+ }
+
+ // Process groups
+ for _, g := range newGroups {
+ updatedGroups = append(updatedGroups, g)
+ }
+
+ for _, group := range existingGroups.Items {
+ if fu.FindGroup(newGroups, group.Desc.GroupId) == -1 { // does not exist now
+ updatedGroups = append(updatedGroups, group)
+ } else {
+ groupsToDelete = append(groupsToDelete, group)
+ }
+ }
+
+ // Sanity check
+ if (len(updatedFlows) | len(flowsToDelete) | len(updatedGroups) | len(groupsToDelete)) == 0 {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+ return nil
+
+ }
+ // Send update to adapters
+ chAdapters := make(chan interface{})
+ defer close(chAdapters)
+ chdB := make(chan interface{})
+ defer close(chdB)
+ dType := agent.adapterMgr.getDeviceType(device.Type)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+
+ if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+ return nil
+ }
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+
+ } else {
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: newFlows},
+ ToRemove: &voltha.Flows{Items: flowsToDelete},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: newGroups},
+ ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+ }
+
// store the changed data
- afterUpdate := agent.groupProxy.Update("/", &ofp.FlowGroups{Items: groups}, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ device.Flows = &voltha.Flows{Items: updatedFlows}
+ device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+ go agent.updateDeviceWithoutLockAsync(device, chdB)
+
+ if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -677,7 +747,6 @@
func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
agent.lockDevice.Lock()
- //defer agent.lockDevice.Unlock()
// Work only on latest data
if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
agent.lockDevice.Unlock()
@@ -707,7 +776,6 @@
func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
agent.lockDevice.Lock()
- //defer agent.lockDevice.Unlock()
// Work only on latest data
// TODO: Get list of ports from device directly instead of the entire device
if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
@@ -766,7 +834,7 @@
func (agent *DeviceAgent) addPort(port *voltha.Port) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
+ log.Debugw("addLogicalPortToMap", log.Fields{"deviceId": agent.deviceId})
// Work only on latest data
if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
@@ -775,7 +843,7 @@
cloned := proto.Clone(storeDevice).(*voltha.Device)
if cloned.Ports == nil {
// First port
- log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
+ log.Debugw("addLogicalPortToMap-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
cloned.Ports = make([]*voltha.Port, 0)
}
cp := proto.Clone(port).(*voltha.Port)
@@ -822,160 +890,6 @@
}
}
-//flowTableUpdated is the callback after flows have been updated in the model to push them
-//to the adapters
-func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
- log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
-
- var previousData *voltha.Flows
- var latestData *voltha.Flows
-
- var ok bool
- if previousData, ok = args[0].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- return nil
- }
- if latestData, ok = args[1].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- return nil
- }
-
- // Sanity check - should not happen as this is already handled in logical device agent
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debugw("flow-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
- return nil
- }
-
- var device *voltha.Device
- var err error
- if device, err = agent.getDeviceWithoutLock(); err != nil {
- log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
- return nil
- }
- groups := device.FlowGroups
-
- // Send update to adapters
- dType := agent.adapterMgr.getDeviceType(device.Type)
- if !dType.AcceptsAddRemoveFlowUpdates {
- if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
- log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
- return err
- }
- return nil
- }
- // Incremental flow changes accepted
- var toAdd []*ofp.OfpFlowStats
- var toDelete []*ofp.OfpFlowStats
-
- for _, flow := range latestData.Items {
- if fu.FindFlowById(previousData.Items, flow) == -1 { // did not exist before
- toAdd = append(toAdd, flow)
- }
- }
- for _, flow := range previousData.Items {
- if fu.FindFlowById(latestData.Items, flow) == -1 { // does not exist now
- toDelete = append(toDelete, flow)
- }
- }
- flowChanges := &ofp.FlowChanges{
- ToAdd: &voltha.Flows{Items: toAdd},
- ToRemove: &voltha.Flows{Items: toDelete},
- }
- // Send an empty group changes as it would be dealt with a call to groupTableUpdated
- groupChanges := &ofp.FlowGroupChanges{}
-
- // Send changes only
- if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
- log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
- return err
- }
-
- return nil
-}
-
-//groupTableUpdated is the callback after group table has been updated in the model to push them
-//to the adapters
-func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
- log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
- agent.lockDevice.Lock()
- defer agent.lockDevice.Unlock()
-
- var previousData *voltha.FlowGroups
- var latestData *voltha.FlowGroups
-
- var ok bool
- if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- return nil
- }
- if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- return nil
- }
-
- // Sanity check - should not happen as this is already handled in logical device agent
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debugw("group-table-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
- return nil
- }
-
- var device *voltha.Device
- var err error
- if device, err = agent.getDeviceWithoutLock(); err != nil {
- log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
- return nil
- }
- flows := device.Flows
-
- // Send update to adapters
- dType := agent.adapterMgr.getDeviceType(device.Type)
- if !dType.AcceptsAddRemoveFlowUpdates {
- if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
- log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
- return err
- }
- return nil
- }
-
- // Incremental group changes accepted
- var toAdd []*ofp.OfpGroupEntry
- var toDelete []*ofp.OfpGroupEntry
- var toUpdate []*ofp.OfpGroupEntry
-
- for _, group := range latestData.Items {
- if idx := fu.FindGroup(previousData.Items, group.Desc.GroupId); idx == -1 { // did not exist before
- toAdd = append(toAdd, group)
- } else { // existed before
- if previousData.Items[idx].String() != group.String() { // there is a change
- toUpdate = append(toUpdate, group)
- }
- }
- }
- for _, group := range previousData.Items {
- if fu.FindGroup(latestData.Items, group.Desc.GroupId) == -1 { // does not exist now
- toDelete = append(toDelete, group)
- }
- }
- groupChanges := &ofp.FlowGroupChanges{
- ToAdd: &voltha.FlowGroups{Items: toAdd},
- ToRemove: &voltha.FlowGroups{Items: toDelete},
- ToUpdate: &voltha.FlowGroups{Items: toUpdate},
- }
- // Send an empty flow changes as it should have been dealt with a call to flowTableUpdated
- flowChanges := &ofp.FlowChanges{}
-
- // Send changes only
- if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
- log.Debugw("update-incremental-group-error", log.Fields{"id": agent.lastData.Id, "error": err})
- return err
- }
- return nil
-}
-
// TODO: A generic device update by attribute
func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
agent.lockDevice.Lock()
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f9da623..f6540e4 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -33,6 +33,8 @@
type DeviceManager struct {
deviceAgents map[string]*DeviceAgent
+ rootDevices map[string]bool
+ lockRootDeviceMap sync.RWMutex
core *Core
adapterProxy *AdapterProxy
adapterMgr *AdapterManager
@@ -42,6 +44,7 @@
clusterDataProxy *model.Proxy
coreInstanceId string
exitChannel chan int
+ defaultTimeout int64
lockDeviceAgentsMap sync.RWMutex
}
@@ -50,12 +53,15 @@
deviceMgr.core = core
deviceMgr.exitChannel = make(chan int, 1)
deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
+ deviceMgr.rootDevices = make(map[string]bool)
deviceMgr.kafkaICProxy = core.kmp
deviceMgr.adapterProxy = NewAdapterProxy(core.kmp)
deviceMgr.coreInstanceId = core.instanceId
deviceMgr.clusterDataProxy = core.clusterDataProxy
deviceMgr.adapterMgr = core.adapterMgr
deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
+ deviceMgr.lockRootDeviceMap = sync.RWMutex{}
+ deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
return &deviceMgr
}
@@ -86,16 +92,26 @@
func (dMgr *DeviceManager) addDeviceAgentToMap(agent *DeviceAgent) {
dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
+ //defer dMgr.lockDeviceAgentsMap.Unlock()
if _, exist := dMgr.deviceAgents[agent.deviceId]; !exist {
dMgr.deviceAgents[agent.deviceId] = agent
}
+ dMgr.lockDeviceAgentsMap.Unlock()
+ dMgr.lockRootDeviceMap.Lock()
+ defer dMgr.lockRootDeviceMap.Unlock()
+ dMgr.rootDevices[agent.deviceId] = agent.isRootdevice
+
}
func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
dMgr.lockDeviceAgentsMap.Lock()
- defer dMgr.lockDeviceAgentsMap.Unlock()
+ //defer dMgr.lockDeviceAgentsMap.Unlock()
delete(dMgr.deviceAgents, agent.deviceId)
+ dMgr.lockDeviceAgentsMap.Unlock()
+ dMgr.lockRootDeviceMap.Lock()
+ defer dMgr.lockRootDeviceMap.Unlock()
+ delete(dMgr.rootDevices, agent.deviceId)
+
}
// getDeviceAgent returns the agent managing the device. If the device is not in memory, it will loads it, if it exists
@@ -123,7 +139,7 @@
dMgr.lockDeviceAgentsMap.RLock()
defer dMgr.lockDeviceAgentsMap.RUnlock()
result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
- for key, _ := range dMgr.deviceAgents {
+ for key := range dMgr.deviceAgents {
result.Items = append(result.Items, &voltha.ID{Id: key})
}
return result
@@ -135,7 +151,7 @@
// Ensure this device is set as root
device.Root = true
// Create and start a device agent for that device
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
dMgr.addDeviceAgentToMap(agent)
agent.start(ctx, false)
@@ -310,11 +326,12 @@
}
func (dMgr *DeviceManager) IsRootDevice(id string) (bool, error) {
- device, err := dMgr.GetDevice(id)
- if err != nil {
- return false, err
+ dMgr.lockRootDeviceMap.RLock()
+ defer dMgr.lockRootDeviceMap.RUnlock()
+ if exist := dMgr.rootDevices[id]; exist {
+ return dMgr.rootDevices[id], nil
}
- return device.Root, nil
+ return false, nil
}
// ListDevices retrieves the latest devices from the data model
@@ -325,7 +342,7 @@
for _, device := range devices.([]interface{}) {
// If device is not in memory then set it up
if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
- agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if err := agent.start(nil, true); err != nil {
log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
agent.stop(nil)
@@ -347,7 +364,7 @@
return nil, status.Error(codes.InvalidArgument, "deviceId empty")
}
if !dMgr.IsDeviceInCache(deviceId) {
- agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: deviceId}, dMgr, dMgr.clusterDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: deviceId}, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if err := agent.start(nil, true); err != nil {
agent.stop(nil)
return nil, err
@@ -450,7 +467,7 @@
// Device Id not in memory
log.Debugw("reconciling-device", log.Fields{"id": id.Id})
// Load device from dB
- agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: id.Id}, dMgr, dMgr.clusterDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: id.Id}, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if err := agent.start(nil, true); err != nil {
log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
agent.stop(nil)
@@ -494,14 +511,14 @@
}
}
}
- // Notify the logical device manager to setup a logical port if needed
- if port.Type == voltha.Port_ETHERNET_NNI || port.Type == voltha.Port_ETHERNET_UNI {
- if device, err := dMgr.GetDevice(deviceId); err == nil {
- go dMgr.logicalDeviceMgr.addLogicalPort(device, port)
- } else {
- log.Errorw("failed-to-retrieve-device", log.Fields{"deviceId": deviceId})
- return err
- }
+ // Notify the logical device manager to setup a logical port, if needed. If the added port is an NNI or UNI
+ // then a logical port will be added to the logical device and the device graph generated. If the port is a
+ // PON port then only the device graph will be generated.
+ if device, err := dMgr.GetDevice(deviceId); err == nil {
+ go dMgr.logicalDeviceMgr.updateLogicalPort(device, port)
+ } else {
+ log.Errorw("failed-to-retrieve-device", log.Fields{"deviceId": deviceId})
+ return err
}
return nil
} else {
@@ -509,17 +526,12 @@
}
}
-func (dMgr *DeviceManager) updateFlows(deviceId string, flows []*ofp.OfpFlowStats) error {
- log.Debugw("updateFlows", log.Fields{"deviceid": deviceId})
+func (dMgr *DeviceManager) addFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) error {
+ log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.updateFlows(flows)
- }
- return status.Errorf(codes.NotFound, "%s", deviceId)
-}
-
-func (dMgr *DeviceManager) updateGroups(deviceId string, groups []*ofp.OfpGroupEntry) error {
- if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.updateGroups(groups)
+ return agent.addFlowsAndGroups(flows, groups)
+ //go agent.addFlowsAndGroups(flows, groups)
+ //return nil
}
return status.Errorf(codes.NotFound, "%s", deviceId)
}
@@ -624,13 +636,10 @@
childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceId, DeviceType: parent.Type, ChannelId: uint32(channelId), OnuId: uint32(onuId)}
// Create and start a device agent for that device
- agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
dMgr.addDeviceAgentToMap(agent)
agent.start(nil, false)
- //// Set device ownership
- //dMgr.core.deviceOwnership.OwnedByMe(agent.deviceId)
-
// Activate the child device
if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
go agent.enableDevice(nil)
@@ -808,8 +817,8 @@
childDeviceIds = append(childDeviceIds, peer.DeviceId)
}
}
+ log.Debugw("returning-getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id, "childDeviceIds": childDeviceIds})
}
- log.Debugw("returning-getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id, "childDeviceIds": childDeviceIds})
return childDeviceIds, nil
}
@@ -970,7 +979,7 @@
func (dMgr *DeviceManager) notAllowed(pcDevice *voltha.Device) error {
log.Info("notAllowed")
- return errors.New("Transition-not-allowed")
+ return errors.New("transition-not-allowed")
}
func funcName(f interface{}) string {
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index f52efea..97de41c 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -20,8 +20,8 @@
"fmt"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
- "github.com/opencord/voltha-protos/go/voltha"
"github.com/opencord/voltha-go/rw_core/utils"
+ "github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"sync"
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 6532b6e..8ff3f04 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -24,8 +24,8 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-protos/go/common"
- "github.com/opencord/voltha-protos/go/openflow_13"
"github.com/opencord/voltha-protos/go/omci"
+ "github.com/opencord/voltha-protos/go/openflow_13"
"github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
@@ -35,38 +35,37 @@
)
const (
- IMAGE_DOWNLOAD = iota
- CANCEL_IMAGE_DOWNLOAD = iota
- ACTIVATE_IMAGE = iota
- REVERT_IMAGE = iota
+ IMAGE_DOWNLOAD = iota
+ CANCEL_IMAGE_DOWNLOAD = iota
+ ACTIVATE_IMAGE = iota
+ REVERT_IMAGE = iota
)
-
type APIHandler struct {
- deviceMgr *DeviceManager
- logicalDeviceMgr *LogicalDeviceManager
- adapterMgr *AdapterManager
- packetInQueue *queue.Queue
- changeEventQueue *queue.Queue
- coreInCompetingMode bool
+ deviceMgr *DeviceManager
+ logicalDeviceMgr *LogicalDeviceManager
+ adapterMgr *AdapterManager
+ packetInQueue *queue.Queue
+ changeEventQueue *queue.Queue
+ coreInCompetingMode bool
longRunningRequestTimeout int64
- defaultRequestTimeout int64
+ defaultRequestTimeout int64
da.DefaultAPIHandler
core *Core
}
func NewAPIHandler(core *Core) *APIHandler {
handler := &APIHandler{
- deviceMgr: core.deviceMgr,
- logicalDeviceMgr: core.logicalDeviceMgr,
- adapterMgr: core.adapterMgr,
- coreInCompetingMode: core.config.InCompetingMode,
- longRunningRequestTimeout:core.config.LongRunningRequestTimeout,
- defaultRequestTimeout:core.config.DefaultRequestTimeout,
+ deviceMgr: core.deviceMgr,
+ logicalDeviceMgr: core.logicalDeviceMgr,
+ adapterMgr: core.adapterMgr,
+ coreInCompetingMode: core.config.InCompetingMode,
+ longRunningRequestTimeout: core.config.LongRunningRequestTimeout,
+ defaultRequestTimeout: core.config.DefaultRequestTimeout,
// TODO: Figure out what the 'hint' parameter to queue.New does
- packetInQueue: queue.New(10),
+ packetInQueue: queue.New(10),
changeEventQueue: queue.New(10),
- core: core,
+ core: core,
}
return handler
}
@@ -92,7 +91,7 @@
} else if serNum, ok = md["voltha_serial_number"]; !ok {
err = errors.New("serial-number-not-found")
}
- if !ok {
+ if !ok || serNum == nil {
log.Error(err)
return nil, err
}
@@ -129,7 +128,7 @@
log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
txn, err := handler.createKvTransaction(ctx)
if txn == nil {
- return nil, err
+ return nil, err
} else if txn.Acquired(timeout) {
return txn, nil
} else {
@@ -160,7 +159,7 @@
log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
txn, err := handler.createKvTransaction(ctx)
if txn == nil {
- return nil, err
+ return nil, err
}
owned := false
@@ -213,7 +212,6 @@
return out, nil
}
-
func (handler *APIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
out := new(empty.Empty)
@@ -231,7 +229,6 @@
return &voltha.Membership{}, nil
}
-
func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
log.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
if isTestMode(ctx) {
@@ -240,7 +237,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -261,7 +258,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -283,7 +280,7 @@
if handler.competeForTransaction() {
if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:flow.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -306,7 +303,7 @@
if handler.competeForTransaction() {
if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:flow.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -370,7 +367,6 @@
return handler.logicalDeviceMgr.listLogicalDevices()
}
-
// ListAdapters returns the contents of all adapters known to the system
func (handler *APIHandler) ListAdapters(ctx context.Context, empty *empty.Empty) (*voltha.Adapters, error) {
log.Debug("ListDevices")
@@ -408,7 +404,7 @@
return &voltha.Device{}, err
}
if d, ok := res.(*voltha.Device); ok {
- handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id:d.Id})
+ handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: d.Id})
return d, nil
}
}
@@ -429,7 +425,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}, handler.longRunningRequestTimeout); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}, handler.longRunningRequestTimeout); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -450,7 +446,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -471,7 +467,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -514,14 +510,14 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:img.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: img.Id}); err != nil {
return &common.OperationResp{}, err
} else {
defer txn.Close()
}
}
- failedresponse := &common.OperationResp{Code:voltha.OperationResp_OPERATION_FAILURE}
+ failedresponse := &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
ch := make(chan interface{})
defer close(ch)
@@ -605,7 +601,7 @@
failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:img.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: img.Id}); err != nil {
return failedresponse, err
} else {
defer txn.Close()
@@ -651,15 +647,15 @@
func (handler *APIHandler) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
log.Debugw("ListImageDownloads-request", log.Fields{"deviceId": id.Id})
if isTestMode(ctx) {
- resp := &voltha.ImageDownloads{Items:[]*voltha.ImageDownload{}}
+ resp := &voltha.ImageDownloads{Items: []*voltha.ImageDownload{}}
return resp, nil
}
if downloads, err := handler.deviceMgr.listImageDownloads(ctx, id.Id); err != nil {
failedResp := &voltha.ImageDownloads{
- Items:[]*voltha.ImageDownload{
- &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN},
- },
+ Items: []*voltha.ImageDownload{
+ {DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN},
+ },
}
return failedResp, err
} else {
@@ -667,7 +663,6 @@
}
}
-
func (handler *APIHandler) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
log.Debugw("UpdateDevicePmConfigs-request", log.Fields{"configs": *configs})
if isTestMode(ctx) {
@@ -819,7 +814,7 @@
//@TODO useless stub, what should this actually do?
func (handler *APIHandler) GetMeterStatsOfLogicalDevice(
- ctx context.Context,
+ ctx context.Context,
in *common.ID,
) (*openflow_13.MeterStatsReply, error) {
log.Debug("GetMeterStatsOfLogicalDevice-stub")
@@ -828,8 +823,8 @@
//@TODO useless stub, what should this actually do?
func (handler *APIHandler) GetMibDeviceData(
- ctx context.Context,
- in *common.ID,
+ ctx context.Context,
+ in *common.ID,
) (*omci.MibDeviceData, error) {
log.Debug("GetMibDeviceData-stub")
return nil, nil
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 9496aa1..a2b4494 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -37,25 +37,29 @@
type LogicalDeviceAgent struct {
logicalDeviceId string
//lastData *voltha.LogicalDevice
- rootDeviceId string
- deviceMgr *DeviceManager
- ldeviceMgr *LogicalDeviceManager
- clusterDataProxy *model.Proxy
- exitChannel chan int
- deviceGraph *graph.DeviceGraph
- DefaultFlowRules *fu.DeviceRules
- flowProxy *model.Proxy
- groupProxy *model.Proxy
- ldProxy *model.Proxy
- portProxies map[string]*model.Proxy
- portProxiesLock sync.RWMutex
- lockLogicalDevice sync.RWMutex
- flowDecomposer *fd.FlowDecomposer
+ rootDeviceId string
+ deviceMgr *DeviceManager
+ ldeviceMgr *LogicalDeviceManager
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ deviceGraph *graph.DeviceGraph
+ DefaultFlowRules *fu.DeviceRules
+ flowProxy *model.Proxy
+ groupProxy *model.Proxy
+ ldProxy *model.Proxy
+ portProxies map[string]*model.Proxy
+ portProxiesLock sync.RWMutex
+ lockLogicalDevice sync.RWMutex
+ logicalPortsNo map[uint32]bool //value is true for NNI port
+ lockLogicalPortsNo sync.RWMutex
+ flowDecomposer *fd.FlowDecomposer
+ includeDefaultFlows bool
+ defaultTimeout int64
}
func newLogicalDeviceAgent(id string, deviceId string, ldeviceMgr *LogicalDeviceManager,
deviceMgr *DeviceManager,
- cdProxy *model.Proxy) *LogicalDeviceAgent {
+ cdProxy *model.Proxy, timeout int64) *LogicalDeviceAgent {
var agent LogicalDeviceAgent
agent.exitChannel = make(chan int, 1)
agent.logicalDeviceId = id
@@ -67,6 +71,10 @@
agent.lockLogicalDevice = sync.RWMutex{}
agent.portProxies = make(map[string]*model.Proxy)
agent.portProxiesLock = sync.RWMutex{}
+ agent.lockLogicalPortsNo = sync.RWMutex{}
+ agent.logicalPortsNo = make(map[uint32]bool)
+ agent.includeDefaultFlows = true
+ agent.defaultTimeout = timeout
return &agent
}
@@ -131,12 +139,11 @@
fmt.Sprintf("/logical_devices/%s", agent.logicalDeviceId),
false)
- agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
- agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
-
// TODO: Use a port proxy once the POST_ADD is fixed
agent.ldProxy.RegisterCallback(model.POST_UPDATE, agent.portUpdated)
+ agent.includeDefaultFlows = true
+
agent.lockLogicalDevice.Unlock()
return nil
@@ -148,13 +155,6 @@
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
- // Unregister to teh callbacks
- if agent.flowProxy != nil {
- agent.flowProxy.UnregisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
- }
- if agent.groupProxy != nil {
- agent.groupProxy.UnregisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
- }
//Remove the logical device from the model
if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -246,6 +246,29 @@
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
+func (agent *LogicalDeviceAgent) updateLogicalPort(device *voltha.Device, port *voltha.Port) error {
+ log.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
+ var err error
+ if port.Type == voltha.Port_ETHERNET_NNI {
+ if _, err = agent.addNNILogicalPort(device, port); err != nil {
+ return err
+ }
+ agent.addLogicalPortToMap(port.PortNo, true)
+ } else if port.Type == voltha.Port_ETHERNET_UNI {
+ if _, err = agent.addUNILogicalPort(device, port); err != nil {
+ return err
+ }
+ agent.addLogicalPortToMap(port.PortNo, false)
+ } else {
+ // Update the device graph to ensure all routes on the logical device have been calculated
+ if err = agent.updateRoutes(device, port); err != nil {
+ log.Errorw("failed-to-update-routes", log.Fields{"deviceId": device.Id, "port": port, "error": err})
+ return err
+ }
+ }
+ return nil
+}
+
func (agent *LogicalDeviceAgent) addLogicalPort(device *voltha.Device, port *voltha.Port) error {
log.Debugw("addLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
var err error
@@ -253,10 +276,12 @@
if _, err = agent.addNNILogicalPort(device, port); err != nil {
return err
}
+ agent.addLogicalPortToMap(port.PortNo, true)
} else if port.Type == voltha.Port_ETHERNET_UNI {
if _, err = agent.addUNILogicalPort(device, port); err != nil {
return err
}
+ agent.addLogicalPortToMap(port.PortNo, false)
} else {
log.Debugw("invalid-port-type", log.Fields{"deviceId": device.Id, "port": port})
return nil
@@ -266,15 +291,13 @@
// setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
- //now := time.Now()
- //defer fmt.Println("setupNNILogicalPorts:", deviceId, time.Since(now))
log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
var err error
var device *voltha.Device
if device, err = agent.deviceMgr.GetDevice(deviceId); err != nil {
- log.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": device.Id})
+ log.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": deviceId})
return err
}
@@ -284,6 +307,7 @@
if _, err = agent.addNNILogicalPort(device, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
+ agent.addLogicalPortToMap(port.PortNo, true)
}
}
return err
@@ -291,8 +315,6 @@
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
- //now := time.Now()
- //defer fmt.Println("setupUNILogicalPorts:", childDevice.Id, time.Since(now))
log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
var err error
@@ -303,6 +325,7 @@
if _, err = agent.addUNILogicalPort(childDevice, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
+ agent.addLogicalPortToMap(port.PortNo, false)
}
}
return err
@@ -357,17 +380,6 @@
"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
}
-//updateFlowGroupsWithoutLock updates the flows in the logical device without locking the logical device. This function
-//must only be called by a function that is holding the lock on the logical device
-func (agent *LogicalDeviceAgent) updateFlowGroupsWithoutLock(groups []*ofp.OfpGroupEntry) error {
- groupsCloned := make([]*ofp.OfpGroupEntry, len(groups))
- copy(groupsCloned, groups)
- if afterUpdate := agent.groupProxy.Update("/", groupsCloned, true, ""); afterUpdate == nil {
- return errors.New(fmt.Sprintf("update-flow-group-failed:%s", agent.logicalDeviceId))
- }
- return nil
-}
-
//flowAdd adds a flow to the flow table of that logical device
func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
log.Debug("flowAdd")
@@ -389,6 +401,7 @@
flows = lDevice.Flows.Items
}
+ updatedFlows := make([]*ofp.OfpFlowStats, 0)
//oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
changed := false
checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
@@ -400,6 +413,7 @@
// Add flow
flow := fd.FlowStatsEntryFromFlowModMessage(mod)
flows = append(flows, flow)
+ updatedFlows = append(updatedFlows, flow)
changed = true
}
} else {
@@ -411,26 +425,65 @@
flow.ByteCount = oldFlow.ByteCount
flow.PacketCount = oldFlow.PacketCount
}
- flows[idx] = flow
+ if !reflect.DeepEqual(oldFlow, flow) {
+ flows[idx] = flow
+ updatedFlows = append(updatedFlows, flow)
+ changed = true
+ }
} else {
flows = append(flows, flow)
+ updatedFlows = append(updatedFlows, flow)
+ changed = true
}
- changed = true
}
if changed {
+ // Launch a routine to decompose the flows
+ if err := agent.decomposeAndSendFlows(&ofp.Flows{Items: updatedFlows}, lDevice.FlowGroups, agent.includeDefaultFlows); err != nil {
+ log.Errorf("decomposing-and-sending-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return err
+ }
+
+ // We no longer need to sent the default flows, unless there is a change in device topology
+ agent.includeDefaultFlows = false
+
// Update model
flowsToUpdate := &ofp.Flows{}
if lDevice.Flows != nil {
flowsToUpdate = &ofp.Flows{Items: flows}
}
if err := agent.updateLogicalDeviceFlowsWithoutLock(flowsToUpdate); err != nil {
- log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
return err
}
}
return nil
}
+func (agent *LogicalDeviceAgent) decomposeAndSendFlows(flows *ofp.Flows, groups *ofp.FlowGroups, includeDefaultFlows bool) error {
+ log.Debugw("decomposeAndSendFlows", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
+
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *groups, includeDefaultFlows)
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+ chnlsList := make([]chan interface{}, 0)
+ for deviceId, value := range deviceRules.GetRules() {
+ ch := make(chan interface{})
+ chnlsList = append(chnlsList, ch)
+ go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
+ if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups); err != nil {
+ log.Error("flow-update-failed", log.Fields{"deviceID": deviceId})
+ ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
+ }
+ ch <- nil
+ }(deviceId, value.ListFlows(), value.ListGroups())
+ }
+ // Wait for completion
+ if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
+ }
+ return nil
+}
+
//flowDelete deletes a flow from the flow table of that logical device
func (agent *LogicalDeviceAgent) flowDelete(mod *ofp.OfpFlowMod) error {
log.Debug("flowDelete")
@@ -740,15 +793,6 @@
return nil
}
-func isNNIPort(portNo uint32, nniPortsNo []uint32) bool {
- for _, pNo := range nniPortsNo {
- if pNo == portNo {
- return true
- }
- }
- return false
-}
-
func (agent *LogicalDeviceAgent) getPreCalculatedRoute(ingress, egress uint32) []graph.RouteHop {
log.Debugw("ROUTE", log.Fields{"len": len(agent.deviceGraph.Routes)})
for routeLink, route := range agent.deviceGraph.Routes {
@@ -763,67 +807,56 @@
func (agent *LogicalDeviceAgent) GetRoute(ingressPortNo uint32, egressPortNo uint32) []graph.RouteHop {
log.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
- // Get the updated logical device
- var ld *ic.LogicalDevice
routes := make([]graph.RouteHop, 0)
- var err error
- if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
- return nil
- }
- nniLogicalPortsNo := make([]uint32, 0)
- for _, logicalPort := range ld.Ports {
- if logicalPort.RootPort {
- nniLogicalPortsNo = append(nniLogicalPortsNo, logicalPort.OfpPort.PortNo)
- }
- }
- if len(nniLogicalPortsNo) == 0 {
- log.Errorw("no-nni-ports", log.Fields{"LogicalDeviceId": ld.Id})
- return nil
- }
+
// Note: A port value of 0 is equivalent to a nil port
// Consider different possibilities
if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
- log.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
- if isNNIPort(ingressPortNo, nniLogicalPortsNo) {
+ log.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
+ if agent.isNNIPort(ingressPortNo) {
log.Debug("returning-half-route")
//This is a trap on the NNI Port
if len(agent.deviceGraph.Routes) == 0 {
// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
// internal route
- hop := graph.RouteHop{DeviceID: ld.RootDeviceId, Ingress: ingressPortNo, Egress: egressPortNo}
+ hop := graph.RouteHop{DeviceID: agent.rootDeviceId, Ingress: ingressPortNo, Egress: egressPortNo}
routes = append(routes, hop)
routes = append(routes, hop)
return routes
}
//Return a 'half' route to make the flow decomposer logic happy
for routeLink, route := range agent.deviceGraph.Routes {
- if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+ if agent.isNNIPort(routeLink.Egress) {
routes = append(routes, graph.RouteHop{}) // first hop is set to empty
routes = append(routes, route[1])
return routes
}
}
- log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+ log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
return nil
}
//treat it as if the output port is the first NNI of the OLT
- egressPortNo = nniLogicalPortsNo[0]
+ var err error
+ if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
+ log.Warnw("no-nni-port", log.Fields{"error": err})
+ return nil
+ }
}
//If ingress port is not specified (nil), it may be a wildcarded
//route if egress port is OFPP_CONTROLLER or a nni logical port,
//in which case we need to create a half-route where only the egress
//hop is filled, the first hop is nil
- if ingressPortNo == 0 && isNNIPort(egressPortNo, nniLogicalPortsNo) {
+ if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
// We can use the 2nd hop of any upstream route, so just find the first upstream:
for routeLink, route := range agent.deviceGraph.Routes {
- if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+ if agent.isNNIPort(routeLink.Egress) {
routes = append(routes, graph.RouteHop{}) // first hop is set to empty
routes = append(routes, route[1])
return routes
}
}
- log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+ log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
return nil
}
//If egress port is not specified (nil), we can also can return a "half" route
@@ -835,10 +868,9 @@
return routes
}
}
- log.Warnw("no-downstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+ log.Warnw("no-downstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
return nil
}
-
// Return the pre-calculated route
return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
}
@@ -935,16 +967,6 @@
}
func (agent *LogicalDeviceAgent) GetAllDefaultRules() *fu.DeviceRules {
- // Get latest
- var err error
- if _, err = agent.GetLogicalDevice(); err != nil {
- return fu.NewDeviceRules()
- }
- if agent.DefaultFlowRules == nil { // Nothing setup yet
- // Setup device graph if needed
- agent.setupDeviceGraph()
- agent.DefaultFlowRules = agent.generateDefaultRules()
- }
return agent.DefaultFlowRules
}
@@ -968,101 +990,64 @@
return agent.deviceGraph
}
-//setupDeviceGraph creates the device graph if not done already
-func (agent *LogicalDeviceAgent) setupDeviceGraph() {
+//updateRoutes redo the device graph if not done already and setup the default rules as well
+func (agent *LogicalDeviceAgent) updateRoutes(device *voltha.Device, port *voltha.Port) error {
+ log.Debugf("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "device": device.Id, "port": port})
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
+ rules := fu.NewDeviceRules()
if agent.deviceGraph == nil {
agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
- if ld, err := agent.getLogicalDeviceWithoutLock(); err == nil {
- agent.deviceGraph.ComputeRoutes(ld.Ports)
+ }
+ // Get all the logical ports on that logical device
+ if lDevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
+ log.Errorf("unknown-logical-device", log.Fields{"error": err, "logicalDeviceId": agent.logicalDeviceId})
+ return err
+ } else {
+ //TODO: Find a better way to refresh only missing routes
+ agent.deviceGraph.ComputeRoutes(lDevice.Ports)
+ }
+ deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
+ for deviceId := range deviceNodeIds {
+ if deviceId == agent.rootDeviceId {
+ rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
+ } else {
+ rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
}
}
+ agent.DefaultFlowRules = rules
+
+ // Reset the default flows flag to ensure all default flows are sent to all devices, including the newly added
+ // one when a flow request is received.
+ agent.includeDefaultFlows = true
+ agent.deviceGraph.Print()
+ return nil
}
-//updateDeviceGraph updates the device graph if not done already
+//updateDeviceGraph updates the device graph if not done already and setup the default rules as well
func (agent *LogicalDeviceAgent) updateDeviceGraph(lp *voltha.LogicalPort) {
+ log.Debugf("updateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ rules := fu.NewDeviceRules()
if agent.deviceGraph == nil {
agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceId, agent.deviceMgr.GetDevice)
}
agent.deviceGraph.AddPort(lp)
-}
-
-func (agent *LogicalDeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
- log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
- var previousData *ofp.Flows
- var latestData *ofp.Flows
-
- var ok bool
- if previousData, ok = args[0].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- }
- if latestData, ok = args[1].(*ofp.Flows); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- }
-
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debug("flow-update-not-required")
- return nil
- }
-
- var groups *ofp.FlowGroups
- lDevice, _ := agent.getLogicalDeviceWithoutLock()
- groups = lDevice.FlowGroups
- log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
- log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
- var err error
- for deviceId, value := range deviceRules.GetRules() {
- if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
- log.Error("update-flows-failed", log.Fields{"deviceID": deviceId})
- }
- if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
- log.Error("update-groups-failed", log.Fields{"deviceID": deviceId})
+ deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
+ for deviceId := range deviceNodeIds {
+ if deviceId == agent.rootDeviceId {
+ rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
+ } else {
+ rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
}
}
+ agent.DefaultFlowRules = rules
- return nil
-}
-
-func (agent *LogicalDeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
- log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
-
- var previousData *ofp.FlowGroups
- var latestData *ofp.FlowGroups
-
- var ok bool
- if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args0": args[0]})
- }
- if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
- log.Errorw("invalid-args", log.Fields{"args1": args[1]})
- }
-
- if reflect.DeepEqual(previousData.Items, latestData.Items) {
- log.Debug("flow-update-not-required")
- return nil
- }
-
- var flows *ofp.Flows
- lDevice, _ := agent.getLogicalDeviceWithoutLock()
- flows = lDevice.Flows
- log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
- log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- var err error
- for deviceId, value := range deviceRules.GetRules() {
- if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
- log.Error("update-flows-failed", log.Fields{"deviceID": deviceId})
- }
- if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
- log.Error("update-groups-failed", log.Fields{"deviceID": deviceId})
- }
-
- }
- return nil
+ // Reset the default flows flag to ensure all default flows are sent to all devices, including the newly added
+ // one when a flow request is received.
+ agent.includeDefaultFlows = true
+ agent.deviceGraph.Print()
}
// portAdded is a callback invoked when a port is added to the logical device.
@@ -1192,9 +1177,9 @@
newPorts, changedPorts, deletedPorts := diff(oldLD.Ports, newlD.Ports)
// Send the port change events to the OF controller
- for _, new := range newPorts {
+ for _, newP := range newPorts {
go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
- &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: new.OfpPort})
+ &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
}
for _, change := range changedPorts {
go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceId,
@@ -1213,8 +1198,6 @@
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
func (agent *LogicalDeviceAgent) addNNILogicalPort(device *voltha.Device, port *voltha.Port) (bool, error) {
- //now := time.Now()
- //defer fmt.Println("setupNNILogicalPorts:", device.Id, time.Since(now))
log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
if device.AdminState != voltha.AdminState_ENABLED || device.OperStatus != voltha.OperStatus_ACTIVE {
log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
@@ -1291,8 +1274,6 @@
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
func (agent *LogicalDeviceAgent) addUNILogicalPort(childDevice *voltha.Device, port *voltha.Port) (bool, error) {
- //now := time.Now()
- //defer fmt.Println("addUNILogicalPort:", childDevice.Id, time.Since(now))
log.Debugw("addUNILogicalPort", log.Fields{"port": port})
if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
@@ -1337,7 +1318,6 @@
if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
return false, err
}
-
// Update the device graph with this new logical port
clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
go agent.updateDeviceGraph(clonedLP)
@@ -1361,3 +1341,39 @@
agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, transactionId, packetIn)
log.Debugw("sending-packet-in", log.Fields{"packet-in": packetIn})
}
+
+func (agent *LogicalDeviceAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
+ agent.lockLogicalPortsNo.Lock()
+ defer agent.lockLogicalPortsNo.Unlock()
+ if exist := agent.logicalPortsNo[portNo]; !exist {
+ agent.logicalPortsNo[portNo] = nniPort
+ }
+}
+
+func (agent *LogicalDeviceAgent) deleteLogicalPortFromMap(portNo uint32) {
+ agent.lockLogicalPortsNo.Lock()
+ defer agent.lockLogicalPortsNo.Unlock()
+ if exist := agent.logicalPortsNo[portNo]; exist {
+ delete(agent.logicalPortsNo, portNo)
+ }
+}
+
+func (agent *LogicalDeviceAgent) isNNIPort(portNo uint32) bool {
+ agent.lockLogicalPortsNo.RLock()
+ defer agent.lockLogicalPortsNo.RUnlock()
+ if exist := agent.logicalPortsNo[portNo]; exist {
+ return agent.logicalPortsNo[portNo]
+ }
+ return false
+}
+
+func (agent *LogicalDeviceAgent) getFirstNNIPort() (uint32, error) {
+ agent.lockLogicalPortsNo.RLock()
+ defer agent.lockLogicalPortsNo.RUnlock()
+ for portNo, nni := range agent.logicalPortsNo {
+ if nni {
+ return portNo, nil
+ }
+ }
+ return 0, status.Error(codes.NotFound, "No NNI port found")
+}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 5cfb475..0b08321 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -39,9 +39,10 @@
clusterDataProxy *model.Proxy
exitChannel chan int
lockLogicalDeviceAgentsMap sync.RWMutex
+ defaultTimeout int64
}
-func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
+func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, timeout int64) *LogicalDeviceManager {
var logicalDeviceMgr LogicalDeviceManager
logicalDeviceMgr.core = core
logicalDeviceMgr.exitChannel = make(chan int, 1)
@@ -50,6 +51,7 @@
logicalDeviceMgr.kafkaICProxy = kafkaICProxy
logicalDeviceMgr.clusterDataProxy = cdProxy
logicalDeviceMgr.lockLogicalDeviceAgentsMap = sync.RWMutex{}
+ logicalDeviceMgr.defaultTimeout = timeout
return &logicalDeviceMgr
}
@@ -137,6 +139,7 @@
ldMgr,
ldMgr.deviceMgr,
ldMgr.clusterDataProxy,
+ ldMgr.defaultTimeout,
)
ldMgr.addLogicalDeviceAgentToMap(agent)
go agent.start(nil, true)
@@ -151,7 +154,7 @@
log.Debugw("creating-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
if !device.Root {
- return nil, errors.New("Device-not-root")
+ return nil, errors.New("device-not-root")
}
// Create a logical device agent - the logical device Id is based on the mac address of the device
@@ -166,13 +169,10 @@
}
log.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
- agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+ agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
ldMgr.addLogicalDeviceAgentToMap(agent)
go agent.start(ctx, false)
- //// Set device ownership
- //ldMgr.core.deviceOwnership.OwnedByMe(id)
-
log.Debug("creating-logical-device-ends")
return &id, nil
}
@@ -186,7 +186,7 @@
defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
if ldAgent, _ := ldMgr.logicalDeviceAgents[lDeviceId]; ldAgent == nil {
// Logical device not in memory - create a temp logical device Agent and let it load from memory
- agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+ agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
if err := agent.start(nil, true); err != nil {
//agent.stop(nil)
return err
@@ -201,7 +201,7 @@
log.Debugw("deleting-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
if !device.Root {
- return errors.New("Device-not-root")
+ return errors.New("device-not-root")
}
logDeviceId := device.ParentId
if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
@@ -275,6 +275,23 @@
return nil, status.Errorf(codes.NotFound, "%s-$s", lPortId.Id, lPortId.PortId)
}
+// updateLogicalPort sets up a logical port on the logical device based on the device port
+// information, if needed
+func (ldMgr *LogicalDeviceManager) updateLogicalPort(device *voltha.Device, port *voltha.Port) error {
+ if ldID, err := ldMgr.getLogicalDeviceId(device); err != nil || *ldID == "" {
+ // This is not an error as the logical device may not have been created at this time. In such a case,
+ // the ports will be created when the logical device is ready.
+ return nil
+ } else {
+ if agent := ldMgr.getLogicalDeviceAgent(*ldID); agent != nil {
+ if err := agent.updateLogicalPort(device, port); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
// addLogicalPort sets up a logical port on the logical device based on the device port
// information.
func (ldMgr *LogicalDeviceManager) addLogicalPort(device *voltha.Device, port *voltha.Port) error {
@@ -304,7 +321,7 @@
}
// Sanity check
if logicalPort.RootPort {
- return errors.New("Device-root")
+ return errors.New("device-root")
}
if agent := ldMgr.getLogicalDeviceAgent(lPortId.Id); agent != nil {
agent.deleteLogicalPort(logicalPort)
@@ -328,15 +345,13 @@
log.Debugw("setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
if parentId == "" || logDeviceId == nil {
- return errors.New("Device-in-invalid-state")
+ return errors.New("device-in-invalid-state")
}
if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
if err := agent.setupUNILogicalPorts(ctx, childDevice); err != nil {
return err
}
- // Update the device routes - let it run in its own go routine as it can take time
- //go agent.updateRoutes()
}
return nil
}
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 12bf93e..711f3b5 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -35,7 +35,7 @@
package core
import (
- log "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"time"
)
diff --git a/rw_core/coreIf/logical_device_agent_if.go b/rw_core/coreIf/logical_device_agent_if.go
index 8394fac..c2614b2 100644
--- a/rw_core/coreIf/logical_device_agent_if.go
+++ b/rw_core/coreIf/logical_device_agent_if.go
@@ -20,9 +20,9 @@
package coreIf
import (
- "github.com/opencord/voltha-protos/go/voltha"
"github.com/opencord/voltha-go/rw_core/graph"
"github.com/opencord/voltha-go/rw_core/utils"
+ "github.com/opencord/voltha-protos/go/voltha"
)
// LogicalAgent represents a generic agent
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index ec2904f..980420a 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -22,11 +22,11 @@
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
- ofp "github.com/opencord/voltha-protos/go/openflow_13"
- "github.com/opencord/voltha-protos/go/voltha"
"github.com/opencord/voltha-go/rw_core/coreIf"
"github.com/opencord/voltha-go/rw_core/graph"
fu "github.com/opencord/voltha-go/rw_core/utils"
+ ofp "github.com/opencord/voltha-protos/go/openflow_13"
+ "github.com/opencord/voltha-protos/go/voltha"
"math/big"
)
@@ -751,9 +751,10 @@
}
//DecomposeRules decomposes per-device flows and flow-groups from the flows and groups defined on a logical device
-func (fd *FlowDecomposer) DecomposeRules(agent coreIf.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups) *fu.DeviceRules {
+func (fd *FlowDecomposer) DecomposeRules(agent coreIf.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups, includeDefaultFlows bool) *fu.DeviceRules {
rules := agent.GetAllDefaultRules()
deviceRules := rules.Copy()
+ devicesToUpdate := make(map[string]string)
groupMap := make(map[uint32]*ofp.OfpGroupEntry)
for _, groupEntry := range groups.Items {
@@ -766,9 +767,15 @@
for deviceId, flowAndGroups := range decomposedRules.Rules {
deviceRules.CreateEntryIfNotExist(deviceId)
deviceRules.Rules[deviceId].AddFrom(flowAndGroups)
+ devicesToUpdate[deviceId] = deviceId
}
}
- return deviceRules
+ if includeDefaultFlows {
+ return deviceRules
+ }
+ updatedDeviceRules := deviceRules.FilterRules(devicesToUpdate)
+
+ return updatedDeviceRules
}
// Handles special case of any controller-bound flow for a parent device
@@ -804,6 +811,7 @@
}
}
}
+
return newDeviceRules
}
@@ -869,6 +877,7 @@
}
}
deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
+
return deviceRules
}
@@ -1052,6 +1061,7 @@
fg.AddFlow(MkFlowStat(fa))
deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
}
+
return deviceRules
}
@@ -1230,10 +1240,9 @@
inPortNo := GetInPort(flow)
outPortNo := GetOutPort(flow)
-
deviceRules := fu.NewDeviceRules()
-
route := agent.GetRoute(inPortNo, outPortNo)
+
switch len(route) {
case 0:
log.Errorw("no-route", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "comment": "deleting-flow"})
diff --git a/rw_core/flow_decomposition/flow_decomposer_test.go b/rw_core/flow_decomposition/flow_decomposer_test.go
index d27fd21..e5c4bbd 100644
--- a/rw_core/flow_decomposition/flow_decomposer_test.go
+++ b/rw_core/flow_decomposition/flow_decomposer_test.go
@@ -18,10 +18,10 @@
import (
"errors"
"github.com/opencord/voltha-go/common/log"
- ofp "github.com/opencord/voltha-protos/go/openflow_13"
- "github.com/opencord/voltha-protos/go/voltha"
"github.com/opencord/voltha-go/rw_core/graph"
fu "github.com/opencord/voltha-go/rw_core/utils"
+ ofp "github.com/opencord/voltha-protos/go/openflow_13"
+ "github.com/opencord/voltha-protos/go/voltha"
"github.com/stretchr/testify/assert"
"testing"
@@ -446,7 +446,7 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -526,7 +526,7 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -622,7 +622,7 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
@@ -697,7 +697,7 @@
groups := ofp.FlowGroups{}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
@@ -770,7 +770,7 @@
groups := ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{MkGroupStat(ga)}}
tfd := newTestFlowDecomposer(newTestDeviceManager())
- deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+ deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups, true)
onu1FlowAndGroup := deviceRules.Rules["onu1"]
oltFlowAndGroup := deviceRules.Rules["olt"]
assert.Equal(t, 2, onu1FlowAndGroup.Flows.Len())
diff --git a/rw_core/graph/device_graph.go b/rw_core/graph/device_graph.go
index 376df16..5583023 100644
--- a/rw_core/graph/device_graph.go
+++ b/rw_core/graph/device_graph.go
@@ -139,7 +139,7 @@
// Build the graph
var device *voltha.Device
for _, logicalPort := range dg.logicalPorts {
- device, _ = dg.getDevice(logicalPort.DeviceId)
+ device, _ = dg.getDevice(logicalPort.DeviceId, false)
dg.GGraph = dg.addDevice(device, dg.GGraph, &dg.devicesAdded, &dg.portsAdded, dg.boundaryPorts)
}
@@ -148,6 +148,7 @@
// AddPort adds a port to the graph. If the graph is empty it will just invoke ComputeRoutes function
func (dg *DeviceGraph) AddPort(lp *voltha.LogicalPort) {
+ log.Debugw("Addport", log.Fields{"logicalPort": lp})
// If the graph does not exist invoke ComputeRoutes.
if len(dg.boundaryPorts) == 0 {
dg.ComputeRoutes([]*voltha.LogicalPort{lp})
@@ -161,12 +162,14 @@
// If the port is already part of the boundary ports, do nothing
if dg.portExist(portId) {
- fmt.Println("port exists")
return
}
+ // Add the port to the set of boundary ports
+ dg.boundaryPorts[portId] = lp.OfpPort.PortNo
+
// Add the device where this port is located to the device graph. If the device is already added then
// only the missing port will be added
- device, _ := dg.getDevice(lp.DeviceId)
+ device, _ := dg.getDevice(lp.DeviceId, false)
dg.GGraph = dg.addDevice(device, dg.GGraph, &dg.devicesAdded, &dg.portsAdded, dg.boundaryPorts)
if lp.RootPort {
@@ -184,6 +187,7 @@
}
func (dg *DeviceGraph) Print() error {
+ log.Debugw("Print", log.Fields{"graph": dg.logicalDeviceId, "boundaryPorts": dg.boundaryPorts})
if level, err := log.GetPackageLogLevel(); err == nil && level == log.DebugLevel {
output := ""
routeNumber := 1
@@ -197,7 +201,11 @@
output += fmt.Sprintf("%d:{%s=>%s} ", routeNumber, key, fmt.Sprintf("[%s]", val))
routeNumber += 1
}
- log.Debugw("graph_routes", log.Fields{"lDeviceId": dg.logicalDeviceId, "Routes": output})
+ if len(dg.Routes) == 0 {
+ log.Debugw("no-routes-found", log.Fields{"lDeviceId": dg.logicalDeviceId, "Graph": dg.GGraph.String()})
+ } else {
+ log.Debugw("graph_routes", log.Fields{"lDeviceId": dg.logicalDeviceId, "Routes": output})
+ }
}
return nil
}
@@ -205,14 +213,16 @@
//getDevice returns the device either from the local cache (default) or from the model.
//TODO: Set a cache timeout such that we do not use invalid data. The full device lifecycle should also
//be taken in consideration
-func (dg *DeviceGraph) getDevice(id string) (*voltha.Device, error) {
- dg.cachedDevicesLock.RLock()
- if d, exist := dg.cachedDevices[id]; exist {
+func (dg *DeviceGraph) getDevice(id string, useCache bool) (*voltha.Device, error) {
+ if useCache {
+ dg.cachedDevicesLock.RLock()
+ if d, exist := dg.cachedDevices[id]; exist {
+ dg.cachedDevicesLock.RUnlock()
+ //log.Debugw("getDevice - returned from cache", log.Fields{"deviceId": id})
+ return d, nil
+ }
dg.cachedDevicesLock.RUnlock()
- //log.Debugw("getDevice - returned from cache", log.Fields{"deviceId": id})
- return d, nil
}
- dg.cachedDevicesLock.RUnlock()
// Not cached
if d, err := dg.getDeviceFromModel(id); err != nil {
log.Errorw("device-not-found", log.Fields{"deviceId": id, "error": err})
@@ -251,13 +261,13 @@
}
for _, peer := range port.Peers {
if _, exist := (*devicesAdded)[peer.DeviceId]; !exist {
- d, _ := dg.getDevice(peer.DeviceId)
+ d, _ := dg.getDevice(peer.DeviceId, true)
g = dg.addDevice(d, g, devicesAdded, portsAdded, boundaryPorts)
- } else {
- peerPortId = concatDeviceIdPortId(peer.DeviceId, peer.PortNo)
- g.AddEdge(goraph.StringID(portId), goraph.StringID(peerPortId), 1)
- g.AddEdge(goraph.StringID(peerPortId), goraph.StringID(portId), 1)
}
+ peerPortId = concatDeviceIdPortId(peer.DeviceId, peer.PortNo)
+ g.AddEdge(goraph.StringID(portId), goraph.StringID(peerPortId), 1)
+ g.AddEdge(goraph.StringID(peerPortId), goraph.StringID(portId), 1)
+
}
}
return g
diff --git a/rw_core/main.go b/rw_core/main.go
index 5db4078..084e339 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -23,9 +23,9 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/kafka"
- ic "github.com/opencord/voltha-protos/go/inter_container"
"github.com/opencord/voltha-go/rw_core/config"
c "github.com/opencord/voltha-go/rw_core/core"
+ ic "github.com/opencord/voltha-protos/go/inter_container"
"os"
"os/signal"
"strconv"
@@ -216,7 +216,7 @@
cf := config.NewRWCoreFlags()
cf.ParseCommandArguments()
- //// Setup logging
+ // Setup logging
//Setup default logger - applies for packages that do not have specific logger set
if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
@@ -228,9 +228,11 @@
log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
}
+ //log.SetAllLogLevel(log.ErrorLevel)
+
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
- //log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/flow_decomposition", log.DebugLevel)
- //log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/graph", log.DebugLevel)
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/flow_decomposition", log.DebugLevel)
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/graph", log.DebugLevel)
//log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
//log.SetPackageLogLevel("github.com/opencord/voltha-go/db/model", log.DebugLevel)
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index 1e1ed9f..cf77d59 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -15,6 +15,13 @@
*/
package utils
+import (
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "reflect"
+ "time"
+)
+
type DeviceID struct {
Id string
}
@@ -22,3 +29,61 @@
type LogicalDeviceID struct {
Id string
}
+
+//WaitForNilOrErrorResponses waits on a variadic number of channels for either a nil response or an error
+//response. If an error is received from a given channel then the returned error array will contain that error.
+//The error will be at the index corresponding to the order in which the channel appear in the parameter list.
+//If no errors is found then nil is returned. This method also takes in a timeout in milliseconds. If a
+//timeout is obtained then this function will stop waiting for the remaining responses and abort.
+func WaitForNilOrErrorResponses(timeout int64, chnls ...chan interface{}) []error {
+ // Create a timeout channel
+ tChnl := make(chan *interface{})
+ go func() {
+ time.Sleep(time.Duration(timeout) * time.Millisecond)
+ tChnl <- nil
+ }()
+
+ errorsReceived := false
+ errors := make([]error, len(chnls))
+ cases := make([]reflect.SelectCase, len(chnls)+1)
+ for i, ch := range chnls {
+ cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
+ }
+ // Add the timeout channel
+ cases[len(chnls)] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(tChnl)}
+
+ resultsReceived := make([]bool, len(errors)+1)
+ remaining := len(cases) - 1
+ for remaining > 0 {
+ index, value, ok := reflect.Select(cases)
+ if !ok { // closed channel
+ //Set the channel at that index to nil to disable this case, hence preventing it from interfering with other cases.
+ cases[index].Chan = reflect.ValueOf(nil)
+ errors[index] = status.Errorf(codes.Internal, "channel closed")
+ errorsReceived = true
+ } else if index == len(chnls) { // Timeout has occurred
+ for k := range errors {
+ if !resultsReceived[k] {
+ errors[k] = status.Errorf(codes.Aborted, "timeout")
+ }
+ }
+ errorsReceived = true
+ break
+ } else if value.IsNil() { // Nil means a good response
+ //do nothing
+ } else if err, ok := value.Interface().(error); ok { // error returned
+ errors[index] = err
+ errorsReceived = true
+ } else { // unknown value
+ errors[index] = status.Errorf(codes.Internal, "%s", value)
+ errorsReceived = true
+ }
+ resultsReceived[index] = true
+ remaining -= 1
+ }
+
+ if errorsReceived {
+ return errors
+ }
+ return nil
+}
diff --git a/rw_core/utils/flow_utils.go b/rw_core/utils/flow_utils.go
index 10be81a..0c485bb 100644
--- a/rw_core/utils/flow_utils.go
+++ b/rw_core/utils/flow_utils.go
@@ -172,7 +172,9 @@
func (dr *DeviceRules) Copy() *DeviceRules {
copyDR := NewDeviceRules()
for key, val := range dr.Rules {
- copyDR.Rules[key] = val.Copy()
+ if val != nil {
+ copyDR.Rules[key] = val.Copy()
+ }
}
return copyDR
}
@@ -183,6 +185,16 @@
}
}
+func (dr *DeviceRules) FilterRules(deviceIds map[string]string) *DeviceRules {
+ filteredDR := NewDeviceRules()
+ for key, val := range dr.Rules {
+ if _, exist := deviceIds[key]; exist {
+ filteredDR.Rules[key] = val.Copy()
+ }
+ }
+ return filteredDR
+}
+
func (dr *DeviceRules) AddFlow(deviceId string, flow *ofp.OfpFlowStats) {
if _, exist := dr.Rules[deviceId]; !exist {
dr.Rules[deviceId] = NewFlowsAndGroups()