VOL-4425: Fix panic where flows arrive at adapter before device has
fully reconciled
- Have more checks to avoid processing flow before flow manager has
initialized
- Fix issue where OperStatus of device was accidentally marked as
ACTIVE from RECONCILING before the reconciled actually finished
- Fix function complexity of UpdateFlowsIncrementally function
Change-Id: Ib00c88189d44c9081825985c3ccfefca17fe32d1
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index e882cd5..359d715 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1604,10 +1604,7 @@
}
}
-//UpdateFlowsIncrementally updates the device flow
-func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
-
+func (dh *DeviceHandler) handleFlows(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, flowMetadata *voltha.FlowMetadata) []error {
var err error
var errorsList []error
@@ -1638,7 +1635,13 @@
if flow_utils.HasGroup(flow) {
err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupAdd)
} else {
- err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+ if dh.flowMgr == nil || dh.flowMgr[ponIf] == nil {
+ // The flow manager module could be uninitialized if the flow arrives too soon before the device has reconciled fully
+ logger.Errorw(ctx, "flow-manager-uninitialized", log.Fields{"device-id": device.Id})
+ err = fmt.Errorf("flow-manager-uninitialized-%v", device.Id)
+ } else {
+ err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+ }
}
if err != nil {
errorsList = append(errorsList, err)
@@ -1646,6 +1649,13 @@
}
}
+ return errorsList
+}
+
+func (dh *DeviceHandler) handleGroups(ctx context.Context, groups *of.FlowGroupChanges) []error {
+ var err error
+ var errorsList []error
+
// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
if groups != nil {
for _, group := range groups.ToAdd.Items {
@@ -1670,6 +1680,17 @@
}
}
}
+
+ return errorsList
+}
+
+//UpdateFlowsIncrementally updates the device flow
+func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
+
+ var errorsList []error
+ logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
+ errorsList = append(errorsList, dh.handleFlows(ctx, device, flows, flowMetadata)...)
+ errorsList = append(errorsList, dh.handleGroups(ctx, groups)...)
if len(errorsList) > 0 {
return fmt.Errorf("errors-installing-flows-groups, errors:%v", errorsList)
}
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 60ed30b..6595315 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -285,6 +285,13 @@
device.Id = ""
return dh
}
+
+func negativeDeviceHandlerNilFlowMgr() *DeviceHandler {
+ dh := newMockDeviceHandler()
+ dh.flowMgr = nil
+ return dh
+}
+
func Test_generateMacFromHost(t *testing.T) {
ctx := context.Background()
type args struct {
@@ -1298,3 +1305,78 @@
})
}
}
+
+func Test_UpdateFlowsIncrementallyNegativeTestCases(t *testing.T) {
+ dh1 := negativeDeviceHandlerNilFlowMgr()
+ tests := []struct {
+ name string
+ devicehandler *DeviceHandler
+ wantErr bool
+ }{
+ {"update-flow-when-device-handler-is-nil", dh1, true},
+ }
+
+ flowMetadata0 := voltha.FlowMetadata{Meters: []*voltha.OfpMeterConfig{
+ {
+ Flags: 5,
+ MeterId: 1,
+ Bands: []*voltha.OfpMeterBandHeader{
+ {
+ Type: voltha.OfpMeterBandType_OFPMBT_DROP,
+ Rate: 16000,
+ BurstSize: 0,
+ },
+ {
+ Type: voltha.OfpMeterBandType_OFPMBT_DROP,
+ Rate: 32000,
+ BurstSize: 30,
+ },
+ {
+ Type: voltha.OfpMeterBandType_OFPMBT_DROP,
+ Rate: 64000,
+ BurstSize: 30,
+ },
+ },
+ },
+ }}
+
+ kwTable0Meter1 := make(map[string]uint64)
+ kwTable0Meter1["table_id"] = 0
+ kwTable0Meter1["meter_id"] = 1
+ kwTable0Meter1["write_metadata"] = 0x4000000000 // Tech-Profile-ID 64
+
+ // Upstream flow DHCP flow - ONU1 UNI0 PON0
+ fa0 := &fu.FlowArgs{
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(536870912),
+ fu.Metadata_ofp(1),
+ fu.IpProto(17), // dhcp
+ fu.VlanPcp(0),
+ fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT)),
+ fu.TunnelId(256),
+ },
+ Actions: []*ofp.OfpAction{
+ //fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+ fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+ fu.Output(2147483645),
+ fu.PushVlan(0x8100),
+ },
+ KV: kwTable0Meter1,
+ }
+
+ flow0, _ := fu.MkFlowStat(fa0)
+ flowAdd := of.Flows{Items: make([]*voltha.OfpFlowStats, 0)}
+ flowAdd.Items = append(flowAdd.Items, flow0)
+ flowRemove := of.Flows{Items: make([]*voltha.OfpFlowStats, 0)}
+ flowChanges := &ofp.FlowChanges{ToAdd: &flowAdd, ToRemove: &flowRemove}
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ err := tt.devicehandler.UpdateFlowsIncrementally(context.Background(), tt.devicehandler.device, flowChanges, nil, &flowMetadata0)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("DeviceHandler.populateDeviceInfo() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ })
+ }
+}
diff --git a/internal/pkg/core/openolt.go b/internal/pkg/core/openolt.go
index f84910d..30aa33c 100644
--- a/internal/pkg/core/openolt.go
+++ b/internal/pkg/core/openolt.go
@@ -155,11 +155,6 @@
logger.Infow(ctx, "reconcile-device", log.Fields{"device-id": device.Id})
var handler *DeviceHandler
if handler = oo.getDeviceHandler(device.Id); handler == nil {
- handler := NewDeviceHandler(oo.coreClient, oo.eventProxy, device, oo, oo.configManager, oo.config)
- handler.adapterPreviouslyConnected = true
- oo.addDeviceHandlerToMap(handler)
- handler.transitionMap = NewTransitionMap(handler)
-
//Setting state to RECONCILING
cgClient, err := oo.coreClient.GetCoreServiceClient()
if err != nil {
@@ -175,6 +170,14 @@
return nil, olterrors.NewErrAdapter("device-update-failed", log.Fields{"device-id": device.Id}, err)
}
+ // The OperState of the device is set to RECONCILING in the previous section. This also needs to be set on the
+ // locally cached copy of the device struct.
+ device.OperStatus = voltha.OperStatus_RECONCILING
+ handler := NewDeviceHandler(oo.coreClient, oo.eventProxy, device, oo, oo.configManager, oo.config)
+ handler.adapterPreviouslyConnected = true
+ oo.addDeviceHandlerToMap(handler)
+ handler.transitionMap = NewTransitionMap(handler)
+
handler.transitionMap.Handle(log.WithSpanFromContext(context.Background(), ctx), DeviceInit)
}
return &empty.Empty{}, nil