[VOL-1547] Add port to logical device when device is active
This commit consists of the following changes:
1) Fix the issue where flows were received when the logical
device flow graph was not ready.
2) Update the default kafka config for improved performance
3) Add a lock to the device ownership logic to ensure the
lock map does not get corrupted.
Change-Id: I840d572e06ed5acf0f3bc1ce423a0ada8f335543
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 887a167..e5f6dfe 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -431,14 +431,14 @@
}
func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
- log.Infow("start-listening-on-channel", log.Fields{"channel": ch})
+ log.Debug("start-listening-on-channel ...")
for resp := range channel {
for _, ev := range resp.Events {
//log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value)
}
}
- log.Info("stop-listening-on-channel")
+ log.Debug("stop-listening-on-channel ...")
}
func getEventType(event *v3Client.Event) int {
diff --git a/kafka/client.go b/kafka/client.go
index 4eb3e5a..a4c49ca 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -40,14 +40,14 @@
DefaultKafkaPort = 9092
DefaultGroupName = "voltha"
DefaultSleepOnError = 1
- DefaultProducerFlushFrequency = 1
- DefaultProducerFlushMessages = 1
- DefaultProducerFlushMaxmessages = 1
+ DefaultProducerFlushFrequency = 10
+ DefaultProducerFlushMessages = 10
+ DefaultProducerFlushMaxmessages = 100
DefaultProducerReturnSuccess = true
DefaultProducerReturnErrors = true
DefaultProducerRetryMax = 3
DefaultProducerRetryBackoff = time.Millisecond * 100
- DefaultConsumerMaxwait = 10
+ DefaultConsumerMaxwait = 100
DefaultMaxProcessingTime = 100
DefaultConsumerType = PartitionConsumer
DefaultNumberPartitions = 3
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 42e5a02..b9c03e6 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -257,7 +257,7 @@
// subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
//key := GetDeviceIdFromTopic(*toTopic)
log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
- kp.kafkaClient.Send(protoRequest, toTopic, key)
+ go kp.kafkaClient.Send(protoRequest, toTopic, key)
if waitForResponse {
// Create a child context based on the parent context, if any
@@ -550,7 +550,7 @@
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
KeyTopic: request.Header.KeyTopic,
- Timestamp: time.Now().Unix(),
+ Timestamp: time.Now().UnixNano(),
}
// Go over all returned values
@@ -635,7 +635,6 @@
// First extract the header to know whether this is a request - responses are handled by a different handler
if msg.Header.Type == ic.MessageType_REQUEST {
-
var out []reflect.Value
var err error
@@ -707,7 +706,7 @@
key := msg.Header.KeyTopic
log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
// TODO: handle error response.
- kp.kafkaClient.Send(icm, replyTopic, key)
+ go kp.kafkaClient.Send(icm, replyTopic, key)
}
} else if msg.Header.Type == ic.MessageType_RESPONSE {
log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
@@ -765,7 +764,7 @@
FromTopic: replyTopic.Name,
ToTopic: toTopic.Name,
KeyTopic: key,
- Timestamp: time.Now().Unix(),
+ Timestamp: time.Now().UnixNano(),
}
requestBody := &ic.InterContainerRequestBody{
Rpc: rpc,
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index 4a692d3..0921561 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -238,39 +238,34 @@
if id == nil {
return "", status.Error(codes.InvalidArgument, "nil-id")
}
+ da.deviceToKeyMapLock.Lock()
+ defer da.deviceToKeyMapLock.Unlock()
var device *voltha.Device
var lDevice *voltha.LogicalDevice
// The id can either be a device Id or a logical device id.
if dId, ok := id.(*utils.DeviceID); ok {
// Use cache if present
- if val, err := da.getDeviceKey(dId.Id); err == nil {
+ if val, exist := da.deviceToKeyMap[dId.Id]; exist {
return val, nil
}
if device, _ = da.deviceMgr.GetDevice(dId.Id); device == nil {
return "", status.Error(codes.NotFound, fmt.Sprintf("id-absent-%s", dId))
}
if device.Root {
- if err := da.updateDeviceKey(dId.Id, device.Id); err != nil {
- log.Warnw("Error-updating-cache", log.Fields{"id": dId.Id, "key": device.Id, "error": err})
- }
- return device.Id, nil
+ da.deviceToKeyMap[dId.Id] = device.Id
} else {
- if err := da.updateDeviceKey(dId.Id, device.ParentId); err != nil {
- log.Warnw("Error-updating-cache", log.Fields{"id": dId.Id, "key": device.ParentId, "error": err})
- }
- return device.ParentId, nil
+ da.deviceToKeyMap[dId.Id] = device.ParentId
}
+ return da.deviceToKeyMap[dId.Id], nil
} else if ldId, ok := id.(*utils.LogicalDeviceID); ok {
// Use cache if present
- if val, err := da.getDeviceKey(ldId.Id); err == nil {
+ if val, exist := da.deviceToKeyMap[ldId.Id]; exist {
return val, nil
}
if lDevice, _ = da.logicalDeviceMgr.getLogicalDevice(ldId.Id); lDevice == nil {
return "", status.Error(codes.NotFound, fmt.Sprintf("id-absent-%s", ldId))
}
- if err := da.updateDeviceKey(ldId.Id, lDevice.RootDeviceId); err != nil {
- log.Warnw("Error-updating-cache", log.Fields{"id": ldId.Id, "key": lDevice.RootDeviceId, "error": err})
- }
+ da.deviceToKeyMap[ldId.Id] = lDevice.RootDeviceId
return lDevice.RootDeviceId, nil
}
return "", status.Error(codes.NotFound, fmt.Sprintf("id-%s", id))
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index b1a68cc..732e9cf 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -22,12 +22,12 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
- ic "github.com/opencord/voltha-protos/go/inter_container"
- ofp "github.com/opencord/voltha-protos/go/openflow_13"
- "github.com/opencord/voltha-protos/go/voltha"
fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
"github.com/opencord/voltha-go/rw_core/graph"
fu "github.com/opencord/voltha-go/rw_core/utils"
+ ic "github.com/opencord/voltha-protos/go/inter_container"
+ ofp "github.com/opencord/voltha-protos/go/openflow_13"
+ "github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"reflect"
@@ -248,24 +248,31 @@
func (agent *LogicalDeviceAgent) addLogicalPort (device *voltha.Device, port *voltha.Port) error {
+ log.Debugw("addLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
+ var changed bool
+ var err error
if port.Type == voltha.Port_ETHERNET_NNI {
- if err := agent.addNNILogicalPort(device, port); err != nil {
+ if changed, err = agent.addNNILogicalPort(device, port); err != nil {
return err
}
} else if port.Type == voltha.Port_ETHERNET_UNI {
- if err := agent.addUNILogicalPort(device, port); err != nil {
+ if changed, err = agent.addUNILogicalPort(device, port); err != nil {
return err
}
} else {
log.Debugw("invalid-port-type", log.Fields{"deviceId": device.Id, "port": port})
return nil
}
- go agent.setupDeviceGraph()
+ if changed {
+ go agent.setupDeviceGraph()
+ }
return nil
}
// setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
+ //now := time.Now()
+ //defer fmt.Println("setupNNILogicalPorts:", deviceId, time.Since(now))
log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
var err error
@@ -279,11 +286,12 @@
//Get UNI port number
changesMade := false
for _, port := range device.Ports {
+ changed := false
if port.Type == voltha.Port_ETHERNET_NNI {
- if err = agent.addNNILogicalPort(device, port); err != nil {
+ if changed, err = agent.addNNILogicalPort(device, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
} else {
- changesMade = true
+ changesMade = changed || changesMade
}
}
}
@@ -296,6 +304,8 @@
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
+ //now := time.Now()
+ //defer fmt.Println("setupUNILogicalPorts:", childDevice.Id, time.Since(now))
log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
var err error
@@ -303,11 +313,12 @@
//Get UNI port number
changesMade := false
for _, port := range childDevice.Ports {
+ changed := false
if port.Type == voltha.Port_ETHERNET_UNI {
- if err = agent.addUNILogicalPort(childDevice, port); err != nil {
+ if changed, err = agent.addUNILogicalPort(childDevice, port); err != nil {
log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
} else {
- changesMade = true
+ changesMade = changed || changesMade
}
}
}
@@ -797,6 +808,14 @@
if isNNIPort(ingressPortNo, nniLogicalPortsNo) {
log.Debug("returning-half-route")
//This is a trap on the NNI Port
+ if len(agent.deviceGraph.Routes) == 0 {
+ // If there are no routes set (usually when the logical device has only NNI port(s), then just return an
+ // internal route
+ hop := graph.RouteHop{DeviceID:ld.RootDeviceId, Ingress:ingressPortNo, Egress:egressPortNo}
+ routes = append(routes, hop)
+ routes = append(routes, hop)
+ return routes
+ }
//Return a 'half' route to make the flow decomposer logic happy
for routeLink, route := range agent.deviceGraph.Routes {
if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
@@ -1208,17 +1227,23 @@
}
-func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port) error {
+// addNNILogicalPort adds an NNI port to the logical device. It returns a bool representing whether a port has been
+// added and an eror in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port) (bool, error) {
+ //now := time.Now()
+ //defer fmt.Println("setupNNILogicalPorts:", device.Id, time.Since(now))
log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
- if device.AdminState != voltha.AdminState_ENABLED {
- log.Infow("device-not-enabled", log.Fields{"deviceId": device.Id})
- return nil
+ if device.AdminState != voltha.AdminState_ENABLED || device.OperStatus != voltha.OperStatus_ACTIVE {
+ log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
+ return false, nil
}
agent.lockLogicalDevice.RLock()
if agent.portExist(device, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})
agent.lockLogicalDevice.RUnlock()
- return nil
+ return false, nil
}
agent.lockLogicalDevice.RUnlock()
@@ -1227,7 +1252,7 @@
// First get the port capability
if portCap, err = agent.deviceMgr.getPortCapability(nil, device.Id, port.PortNo); err != nil {
log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return err
+ return false, err
}
agent.lockLogicalDevice.Lock()
@@ -1235,7 +1260,7 @@
// Double check again if this port has been already added since the getPortCapability could have taken a long time
if agent.portExist(device, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})
- return nil
+ return false, nil
}
portCap.Port.RootPort = true
@@ -1249,7 +1274,7 @@
var ld *voltha.LogicalDevice
if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
log.Errorw("error-retrieving-logical-device", log.Fields{"error": err})
- return err
+ return false, err
}
cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
if cloned.Ports == nil {
@@ -1259,9 +1284,9 @@
if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
log.Errorw("error-updating-logical-device", log.Fields{"error": err})
- return err
+ return false, err
}
- return nil
+ return true, nil
}
func (agent *LogicalDeviceAgent) portExist (device *voltha.Device, port *voltha.Port) bool {
@@ -1275,17 +1300,24 @@
return false
}
-func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port) error {
+
+// addUNILogicalPort adds an UNI port to the logical device. It returns a bool representing whether a port has been
+// added and an eror in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port) (bool, error) {
+ //now := time.Now()
+ //defer fmt.Println("addUNILogicalPort:", childDevice.Id, time.Since(now))
log.Debugw("addUNILogicalPort", log.Fields{"port": port})
- if childDevice.AdminState != voltha.AdminState_ENABLED {
- log.Infow("device-not-enabled", log.Fields{"deviceId": childDevice.Id})
- return nil
+ if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
+ log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
+ return false, nil
}
agent.lockLogicalDevice.RLock()
if agent.portExist(childDevice, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})
agent.lockLogicalDevice.RUnlock()
- return nil
+ return false, nil
}
agent.lockLogicalDevice.RUnlock()
var portCap *ic.PortCapability
@@ -1293,18 +1325,18 @@
// First get the port capability
if portCap, err = agent.deviceMgr.getPortCapability(nil, childDevice.Id, port.PortNo); err != nil {
log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return err
+ return false, err
}
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
// Double check again if this port has been already added since the getPortCapability could have taken a long time
if agent.portExist(childDevice, port) {
log.Debugw("port-already-exist", log.Fields{"port": port})
- return nil
+ return false, nil
}
// Get stored logical device
if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
- return status.Error(codes.NotFound, agent.logicalDeviceId)
+ return false, status.Error(codes.NotFound, agent.logicalDeviceId)
} else {
log.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
portCap.Port.RootPort = false
@@ -1318,7 +1350,7 @@
cloned.Ports = make([]*voltha.LogicalPort, 0)
}
cloned.Ports = append(cloned.Ports, portCap.Port)
- return agent.updateLogicalDeviceWithoutLock(cloned)
+ return true, agent.updateLogicalDeviceWithoutLock(cloned)
}
}
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index 6c67ace..ec2904f 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -819,17 +819,8 @@
fg := fu.NewFlowsAndGroups()
if agent.GetDeviceGraph().IsRootPort(inPortNo) {
log.Debug("trap-nni")
- var fa *fu.FlowArgs
- fa = &fu.FlowArgs{
- KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
- MatchFields: []*ofp.OfpOxmOfbField{
- InPort(egressHop.Egress),
- },
- Actions: GetActions(flow),
- }
- // Augment the matchfields with the ofpfields from the flow
- fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT)...)
- fg.AddFlow(MkFlowStat(fa))
+ // no decomposition required - it is already an OLT flow from NNI
+ fg.AddFlow(flow)
} else {
// Trap flow for UNI port
log.Debug("trap-uni")