VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
- Pass information to agent to do the flow replication
- Consolidate various locks in the adapter and remove reduntant locks
- use voltha-proto version 4.0.2 and voltha-lib-go version 4.0.0
- Bump adapter version to 3.0.0
Change-Id: Ic053c54e5319bb1736ec74facfc79dd10058ecf5
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index f342bcf..3be60ec 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -34,17 +34,17 @@
"github.com/golang/protobuf/ptypes"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
- "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v3/pkg/flows"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- "github.com/opencord/voltha-lib-go/v3/pkg/pmmetrics"
+ "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v4/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v4/pkg/pmmetrics"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
- "github.com/opencord/voltha-protos/v3/go/common"
- ic "github.com/opencord/voltha-protos/v3/go/inter_container"
- of "github.com/opencord/voltha-protos/v3/go/openflow_13"
- oop "github.com/opencord/voltha-protos/v3/go/openolt"
- "github.com/opencord/voltha-protos/v3/go/voltha"
+ "github.com/opencord/voltha-protos/v4/go/common"
+ ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+ of "github.com/opencord/voltha-protos/v4/go/openflow_13"
+ oop "github.com/opencord/voltha-protos/v4/go/openolt"
+ "github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -52,9 +52,7 @@
// Constants for number of retries and for timeout
const (
- MaxRetry = 10
- MaxTimeOutInMs = 500
- InvalidPort = 0xffffffff
+ InvalidPort = 0xffffffff
)
//DeviceHandler will interact with the OLT device.
@@ -356,16 +354,16 @@
// On failure process a backoff timer while watching for stopIndications
// events
- backoff := time.NewTimer(indicationBackoff.NextBackOff())
+ backoffTimer := time.NewTimer(indicationBackoff.NextBackOff())
select {
case <-dh.stopIndications:
logger.Debugw(ctx, "stopping-collecting-indications-for-olt", log.Fields{"device-id": dh.device.Id})
- if !backoff.Stop() {
- <-backoff.C
+ if !backoffTimer.Stop() {
+ <-backoffTimer.C
}
break Loop
- case <-backoff.C:
- // backoff expired continue
+ case <-backoffTimer.C:
+ // backoffTimer expired continue
}
if indications, err = dh.startOpenOltIndicationStream(ctx); err != nil {
return err
@@ -720,6 +718,9 @@
_ = olterrors.NewErrAdapter("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
}()
+
+ go startHeartbeatCheck(ctx, dh)
+
return nil
}
@@ -748,6 +749,9 @@
if device.PmConfigs != nil {
dh.UpdatePmConfig(ctx, device.PmConfigs)
}
+
+ go startHeartbeatCheck(ctx, dh)
+
return nil
}
@@ -769,7 +773,7 @@
dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
for i := range dh.flowMgr {
// Instantiate flow manager
- if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr); dh.flowMgr[i] == nil {
+ if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr, uint32(i)); dh.flowMgr[i] == nil {
return olterrors.ErrResourceManagerInstantiating
}
}
@@ -875,8 +879,6 @@
if err := dh.coreProxy.DevicePMConfigUpdate(ctx, dh.metrics.ToPmConfigs()); err != nil {
_ = olterrors.NewErrAdapter("error-updating-performance-metrics", log.Fields{"device-id": device.Id}, err).LogAt(log.ErrorLevel)
}
-
- go startHeartbeatCheck(ctx, dh)
}
//GetOfpDeviceInfo Gets the Ofp information of the given device
@@ -886,7 +888,7 @@
MfrDesc: "VOLTHA Project",
HwDesc: "open_pon",
SwDesc: "open_pon",
- SerialNum: dh.device.SerialNumber,
+ SerialNum: device.SerialNumber,
},
SwitchFeatures: &of.OfpSwitchFeatures{
NBuffers: 256,
@@ -1328,7 +1330,7 @@
}
return &oop.SerialNumber{
VendorId: []byte(serialNum[:4]),
- VendorSpecific: []byte(decodedStr),
+ VendorSpecific: decodedStr,
}, nil
}
@@ -1578,17 +1580,13 @@
var uniID uint32
var err error
for _, port := range onu.UniPorts {
- uniID = UniIDFromPortNum(uint32(port))
+ uniID = UniIDFromPortNum(port)
logger.Debugw(ctx, "clearing-resource-data-for-uni-port", log.Fields{"port": port, "uni-id": uniID})
/* Delete tech-profile instance from the KV store */
- if err = dh.flowMgr[onu.IntfID].DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
+ if err = dh.flowMgr[onu.IntfID].DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID); err != nil {
logger.Debugw(ctx, "failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
- flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID))
- for _, flowID := range flowIDs {
- dh.resourceMgr.FreeFlowID(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID), flowID)
- }
tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
for _, tpID := range tpIDList {
if err = dh.resourceMgr.RemoveMeterIDForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
@@ -1605,9 +1603,12 @@
logger.Debugw(ctx, "failed-to-remove-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
- if err = dh.resourceMgr.DelGemPortPktInOfAllServices(ctx, onu.IntfID, onu.OnuID, uint32(port)); err != nil {
+ if err = dh.resourceMgr.DeletePacketInGemPortForOnu(ctx, onu.IntfID, onu.OnuID, port); err != nil {
logger.Debugw(ctx, "failed-to-remove-gemport-pkt-in", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
}
+ if err = dh.resourceMgr.RemoveAllFlowsForIntfOnuUniKey(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID)); err != nil {
+ logger.Debugw(ctx, "failed-to-remove-flow-for", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
+ }
}
return nil
}
@@ -1626,16 +1627,14 @@
}
logger.Debugw(ctx, "nni-", log.Fields{"nni": nni})
for _, nniIntfID := range nni {
- flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(ctx, uint32(nniIntfID), int32(nniOnuID), int32(nniUniID))
- logger.Debugw(ctx, "current-flow-ids-for-nni", log.Fields{"flow-ids": flowIDs})
- for _, flowID := range flowIDs {
- dh.resourceMgr.FreeFlowID(ctx, uint32(nniIntfID), -1, -1, uint32(flowID))
- }
dh.resourceMgr.RemoveResourceMap(ctx, nniIntfID, int32(nniOnuID), int32(nniUniID))
+ _ = dh.resourceMgr.RemoveAllFlowsForIntfOnuUniKey(ctx, nniIntfID, -1, -1)
+
}
if err = dh.resourceMgr.DelNNiFromKVStore(ctx); err != nil {
return olterrors.NewErrPersistence("clear", "nni", 0, nil, err)
}
+
return nil
}
@@ -2248,7 +2247,6 @@
// Default to PON0
var intfID uint32
inPort, outPort := getPorts(flow)
- logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
if inPort != InvalidPort && outPort != InvalidPort {
_, intfID, _, _ = ExtractAccessFromFlow(inPort, outPort)
}