[VOL-5402]-VGC all fixes till date from jan 2024
Change-Id: I2857e0ef9b1829a28c6e3ad04da96b826cb900b6
Signed-off-by: Akash Soni <akash.soni@radisys.com>
diff --git a/.golangci.yml b/.golangci.yml
index 4f7df6b..36badd9 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -55,11 +55,16 @@
run:
issues-exit-code: 1
timeout: 10m
+ skip-files:
+ - _test\.go$
+ skip-dirs:
+ - voltha-go-controller/tests/mocks
+ - internal/test/mocks
# golangci.com configuration
# https://github.com/golangci/golangci/wiki/Configuration
service:
- golangci-lint-version: 1.50.1 # use the fixed version to not introduce new linters unexpectedly
+ golangci-lint-version: 1.61.0 # use the fixed version to not introduce new linters unexpectedly
prepare:
- echo "here I can run custom commands, but no preparation needed for this repo"
issues:
diff --git a/VERSION b/VERSION
index 4e379d2..503e438 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.0.2
+0.0.2-dev1
diff --git a/database/database.go b/database/database.go
index 0178a07..c3ad13f 100644
--- a/database/database.go
+++ b/database/database.go
@@ -40,6 +40,10 @@
"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
)
+const (
+ PonPort = "/pon-port/"
+)
+
var logger log.CLogger
// Database structure
@@ -748,7 +752,7 @@
// PutNbDevicePort to add device port info
func (db *Database) PutNbDevicePort(ctx context.Context, device string, ponPortID uint32, value string) {
- key := GetKeyPath(NbDevicePath) + device + "/pon-port/" + fmt.Sprintf("%v", ponPortID)
+ key := GetKeyPath(NbDevicePath) + device + PonPort + fmt.Sprintf("%v", ponPortID)
if err := db.kvc.Put(ctx, key, value); err != nil {
logger.Warnw(ctx, "Put Device Port failed", log.Fields{"key": key})
@@ -774,7 +778,7 @@
// DelNbDevicePort to delete device port
func (db *Database) DelNbDevicePort(ctx context.Context, device string, ponPortID uint32) {
- key := GetKeyPath(NbDevicePath) + device + "/pon-port/" + fmt.Sprintf("%v", ponPortID)
+ key := GetKeyPath(NbDevicePath) + device + PonPort + fmt.Sprintf("%v", ponPortID)
if err := db.kvc.Delete(ctx, key); err != nil {
logger.Warnw(ctx, "Delete Device Port failed", log.Fields{"key": key})
@@ -783,7 +787,7 @@
// GetAllNbPorts to get all ports info
func (db *Database) GetAllNbPorts(ctx context.Context, deviceID string) (map[string]*kvstore.KVPair, error) {
- key := GetKeyPath(NbDevicePath) + deviceID + "/pon-port/"
+ key := GetKeyPath(NbDevicePath) + deviceID + PonPort
return db.List(ctx, key)
}
diff --git a/internal/pkg/application/application.go b/internal/pkg/application/application.go
index c87e419..7c61725 100644
--- a/internal/pkg/application/application.go
+++ b/internal/pkg/application/application.go
@@ -203,6 +203,7 @@
// NniPort: The identity of the NNI port
// Ports: List of all ports added to the device
type VoltDevice struct {
+ VoltDeviceIntr VoltDevInterface
FlowAddEventMap *util.ConcurrentMap //map[string]*FlowEvent
FlowDelEventMap *util.ConcurrentMap //map[string]*FlowEvent
MigratingServices *util.ConcurrentMap //<vnetID,<RequestID, MigrateServicesRequest>>
@@ -222,7 +223,6 @@
NniDhcpTrapVid of.VlanType
GlobalDhcpFlowAdded bool
icmpv6GroupAdded bool
- VoltDeviceIntr VoltDevInterface
}
type VoltDevInterface interface {
@@ -459,8 +459,8 @@
IgmpPendingPool map[string]map[*IgmpGroup]bool //[grpkey, map[groupObj]bool] //mvlan_grpName/IP
macPortMap map[string]string
VnetsToDelete map[string]bool
- ServicesToDelete map[string]bool
- ServicesToDeactivate map[string]bool
+ ServicesToDelete sync.Map
+ ServicesToDeactivate sync.Map
PortAlarmProfileCache map[string]map[string]int // [portAlarmID][ThresholdLevelString]ThresholdLevel
vendorID string
ServiceByName sync.Map // [serName]*VoltService
@@ -700,8 +700,6 @@
va.IgmpPendingPool = make(map[string]map[*IgmpGroup]bool)
va.VnetsBySvlan = util.NewConcurrentMap()
va.VnetsToDelete = make(map[string]bool)
- va.ServicesToDelete = make(map[string]bool)
- va.ServicesToDeactivate = make(map[string]bool)
va.VoltPortVnetsToDelete = make(map[*VoltPortVnet]bool)
go va.Start(context.Background(), TimerCfg{tick: 100 * time.Millisecond}, tickTimer)
go va.Start(context.Background(), TimerCfg{tick: time.Duration(GroupExpiryTime) * time.Minute}, pendingPoolTimer)
@@ -926,6 +924,21 @@
}
}
+// CheckServiceExists to check if service exists for the given uniport and tech profile ID.
+func (va *VoltApplication) CheckServiceExists(port string, techProfID uint16) bool {
+ var serviceExists bool
+ va.ServiceByName.Range(func(key, existingServiceIntf interface{}) bool {
+ existingService := existingServiceIntf.(*VoltService)
+ if existingService.Port == port && existingService.TechProfileID == techProfID {
+ logger.Warnw(ctx, "Service already exists for same Port and TP. Ignoring add service request", log.Fields{"ExistingService": existingService.Name})
+ serviceExists = true
+ return false
+ }
+ return true
+ })
+ return serviceExists
+}
+
// GetDeviceBySerialNo to get a device by serial number.
// TODO - Transform this into a MAP instead
func (va *VoltApplication) GetDeviceBySerialNo(slno string) (*VoltDevice, string) {
@@ -1772,35 +1785,60 @@
}
}
-// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
-func (va *VoltApplication) IsFlowDelThresholdReached(cntx context.Context, cookie string, device string) bool {
- logger.Debugw(ctx, "Check flow delete threshold", log.Fields{"Cookie": cookie, "Device": device})
- d := va.GetDevice(device)
- if d == nil {
- logger.Warnw(ctx, "Failed to get device during flow delete threshold check", log.Fields{"Cookie": cookie, "Device": device})
- return false
+// CheckAndDeactivateService - check if the attempts for flow delete has reached threshold or not
+func (va *VoltApplication) CheckAndDeactivateService(ctx context.Context, flow *of.VoltSubFlow, devSerialNum string, devID string) {
+ logger.Debugw(ctx, "Check and Deactivate service", log.Fields{"Cookie": flow.Cookie, "FlowCount": flow.FlowCount, "DeviceSerial": devSerialNum})
+ if flow.FlowCount >= controller.GetController().GetMaxFlowRetryAttempt() {
+ devConfig := va.GetDeviceConfig(devSerialNum)
+ if devConfig != nil {
+ portNo := util.GetUniPortFromFlow(devConfig.UplinkPort, flow)
+ portName, err := va.GetPortName(portNo)
+ if err != nil {
+ logger.Warnw(ctx, "Error getting port name", log.Fields{"Reason": err.Error(), "PortID": portNo})
+ return
+ } else if portName == "" {
+ logger.Warnw(ctx, "Port does not exist", log.Fields{"PortID": portNo})
+ return
+ }
+ svc := va.GetServiceNameFromCookie(flow.Cookie, portName, uint8(of.PbitMatchNone), devID, flow.Match.TableMetadata)
+ if svc != nil {
+ va.DeactivateServiceForPort(ctx, svc, devID, portName)
+ }
+ }
}
+}
- flowEventMap, err := d.GetFlowEventRegister(of.CommandDel)
- if err != nil {
- logger.Warnw(ctx, "Flow event map does not exists", log.Fields{"flowMod": of.CommandDel, "Error": err})
- return false
+// DeactivateServiceForPort - deactivate service for given UNI and remove flows from DB, after max flow install threshold has reached
+func (va *VoltApplication) DeactivateServiceForPort(cntx context.Context, vs *VoltService, devID string, portName string) {
+ logger.Debugw(ctx, "Flow install threshold reached. Deactivating service", log.Fields{"Service": vs.Name, "Port": portName})
+
+ if devID == vs.Device && portName == vs.Port && vs.IsActivated {
+ vs.SetSvcDeactivationFlags(SvcDeacRsn_Controller)
+ va.ServiceByName.Store(vs.Name, vs)
+ vs.WriteToDb(cntx)
+ device, err := va.GetDeviceFromPort(portName)
+ if err != nil {
+ // Even if the port/device does not exists at this point in time, the deactivate request is succss.
+ // So no error is returned
+ logger.Warnw(ctx, "Error Getting Device", log.Fields{"Reason": err.Error(), "Port": portName})
+ }
+ p := device.GetPort(vs.Port)
+ if p != nil && (p.State == PortStateUp || !va.OltFlowServiceConfig.RemoveFlowsOnDisable) {
+ if vpv := va.GetVnetByPort(vs.Port, vs.SVlan, vs.CVlan, vs.UniVlan); vpv != nil {
+ // Port down call internally deletes all the flows
+ vpv.PortDownInd(cntx, device.Name, portName, true, true)
+ if vpv.IgmpEnabled {
+ va.ReceiverDownInd(cntx, device.Name, portName)
+ }
+ } else {
+ logger.Warnw(ctx, "VPV does not exists!!!", log.Fields{"Device": device.Name, "port": portName, "SvcName": vs.Name})
+ }
+ logger.Infow(ctx, "Service deactivated after flow install threshold", log.Fields{"Device": device.Name, "Service": vs.Name, "Port": portName})
+ }
+ vs.DeactivateInProgress = false
+ va.ServiceByName.Store(vs.Name, vs)
+ vs.WriteToDb(cntx)
}
- flowEventMap.MapLock.Lock()
- var event interface{}
- if event, _ = flowEventMap.Get(cookie); event == nil {
- logger.Warnw(ctx, "Event does not exist during flow delete threshold check", log.Fields{"Cookie": cookie})
- flowEventMap.MapLock.Unlock()
- return false
- }
- flowEventMap.MapLock.Unlock()
- flowEvent := event.(*FlowEvent)
- if vs, ok := flowEvent.eventData.(*VoltService); ok {
- vs.ServiceLock.RLock()
- defer vs.ServiceLock.RUnlock()
- return vs.FlowPushCount[cookie] == controller.GetController().GetMaxFlowRetryAttempt()
- }
- return false
}
func pushFlowFailureNotif(flowStatus intf.FlowStatus) {
@@ -2131,46 +2169,51 @@
// TriggerPendingServiceDeactivateReq - trigger pending service deactivate request
func (va *VoltApplication) TriggerPendingServiceDeactivateReq(cntx context.Context, device string) {
- logger.Infow(ctx, "Pending Services to be deactivated", log.Fields{"Count": len(va.ServicesToDeactivate)})
- for serviceName := range va.ServicesToDeactivate {
- logger.Debugw(ctx, "Trigger Service Deactivate", log.Fields{"Service": serviceName})
+ va.ServicesToDeactivate.Range(func(key, value interface{}) bool {
+ serviceName := key.(string)
if vs := va.GetService(serviceName); vs != nil {
if vs.Device == device {
logger.Infow(ctx, "Triggering Pending Service Deactivate", log.Fields{"Service": vs.Name})
vpv := va.GetVnetByPort(vs.Port, vs.SVlan, vs.CVlan, vs.UniVlan)
if vpv == nil {
logger.Warnw(ctx, "Vpv Not found for Service", log.Fields{"vs": vs.Name, "port": vs.Port, "Vnet": vs.VnetID})
- continue
+ return true
}
-
vpv.DelTrapFlows(cntx)
vs.DelHsiaFlows(cntx)
+ // Set the flag to false and clear the SevicesToDeactivate map entry so that when core restarts, VGC will not
+ // try to deactivate the service again
+ vs.DeactivateInProgress = false
+ va.ServicesToDeactivate.Delete(serviceName)
vs.WriteToDb(cntx)
vpv.ClearServiceCounters(cntx)
}
} else {
- logger.Warnw(ctx, "Pending Service Not found", log.Fields{"Service": serviceName})
+ logger.Warnw(ctx, "Pending Service Not found during Deactivate", log.Fields{"Service": serviceName})
}
- }
+ return true
+ })
}
// TriggerPendingServiceDeleteReq - trigger pending service delete request
func (va *VoltApplication) TriggerPendingServiceDeleteReq(cntx context.Context, device string) {
- logger.Infow(ctx, "Pending Services to be deleted", log.Fields{"Count": len(va.ServicesToDelete)})
- for serviceName := range va.ServicesToDelete {
- logger.Debugw(ctx, "Trigger Service Delete", log.Fields{"Service": serviceName})
+ va.ServicesToDelete.Range(func(key, value interface{}) bool {
+ serviceName := key.(string)
if vs := va.GetService(serviceName); vs != nil {
if vs.Device == device {
logger.Infow(ctx, "Triggering Pending Service delete", log.Fields{"Service": vs.Name})
vs.DelHsiaFlows(cntx)
+ // Clear the SevicesToDelete map so that when core restarts, VGC will not try to deactivate the service again
+ va.ServicesToDelete.Delete(serviceName)
if vs.ForceDelete {
vs.DelFromDb(cntx)
}
}
} else {
- logger.Warnw(ctx, "Pending Service Not found", log.Fields{"Service": serviceName})
+ logger.Warnw(ctx, "Pending Service Not found during Delete", log.Fields{"Service": serviceName})
}
- }
+ return true
+ })
}
// TriggerPendingVpvDeleteReq - trigger pending VPV delete request
diff --git a/internal/pkg/application/application_test.go b/internal/pkg/application/application_test.go
index 3bb9c72..4c2fe4c 100644
--- a/internal/pkg/application/application_test.go
+++ b/internal/pkg/application/application_test.go
@@ -332,9 +332,6 @@
},
}
- servicesToDel := map[string]bool{}
- servicesToDel["SCOM00001c75-1_SCOM00001c75-1-4096-2310-4096-65"] = true
-
tests := []struct {
name string
args args
@@ -350,12 +347,13 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
va := &VoltApplication{
- ServicesToDelete: servicesToDel,
+ ServicesToDelete: sync.Map{},
ServiceByName: sync.Map{},
DevicesDisc: sync.Map{},
}
va.ServiceByName.Store("SCOM00001c75-1_SCOM00001c75-1-4096-2310-4096-65", voltServ)
+ va.ServicesToDelete.Store("SCOM00001c75-1_SCOM00001c75-1-4096-2310-4096-65", true)
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
@@ -2767,8 +2765,7 @@
cntx context.Context
device string
}
- ServicesDeactivate := map[string]bool{}
- ServicesDeactivate["SDX6320031-1_SDX6320031-1-4096-2310-4096-65"] = true
+
voltServ := &VoltService{
VoltServiceOper: VoltServiceOper{
Device: "SDX6320031",
@@ -2813,12 +2810,13 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
va := &VoltApplication{
- ServicesToDeactivate: ServicesDeactivate,
+ ServicesToDeactivate: sync.Map{},
ServiceByName: sync.Map{},
VnetsByPort: sync.Map{},
}
va.ServiceByName.Store("SDX6320031-1_SDX6320031-1-4096-2310-4096-65", voltServ)
va.VnetsByPort.Store("16777472", voltPortVnets)
+ va.ServicesToDeactivate.Store("SDX6320031-1_SDX6320031-1-4096-2310-4096-65", true)
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutService(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
diff --git a/internal/pkg/application/flowevent.go b/internal/pkg/application/flowevent.go
index 63123da..b7b838b 100644
--- a/internal/pkg/application/flowevent.go
+++ b/internal/pkg/application/flowevent.go
@@ -19,7 +19,6 @@
"context"
infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
- "voltha-go-controller/internal/pkg/util"
"voltha-go-controller/internal/pkg/intf"
"voltha-go-controller/log"
@@ -32,7 +31,7 @@
type FlowEventType string
// FlowEventHandler - Func prototype for flow event handling funcs
-type FlowEventHandler func(context.Context, *FlowEvent, intf.FlowStatus, *util.ConcurrentMap)
+type FlowEventHandler func(context.Context, *FlowEvent, intf.FlowStatus)
var eventMapper map[FlowEventType]FlowEventHandler
@@ -93,20 +92,16 @@
flowEventMap.MapLock.Unlock()
return false
}
+ flowEventMap.Remove(cookie)
flowEventMap.MapLock.Unlock()
flowEvent := event.(*FlowEvent)
- if flowEvent.eType != EventTypeServiceFlowAdded && flowEvent.eType != EventTypeServiceFlowRemoved {
- flowEventMap.MapLock.Lock()
- flowEventMap.Remove(cookie)
- flowEventMap.MapLock.Unlock()
- }
- eventMapper[flowEvent.eType](cntx, flowEvent, flowStatus, flowEventMap)
+ eventMapper[flowEvent.eType](cntx, flowEvent, flowStatus)
return true
}
// ProcessUsIgmpFlowAddEvent - Process Us Igmp Flow event trigger
-func ProcessUsIgmpFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
- logger.Infow(ctx, "Processing Post Flow Add Event for US Igmp", log.Fields{"Cookie": event.cookie, "event": event})
+func ProcessUsIgmpFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+ logger.Debugw(ctx, "Processing Post Flow Add Event for US Igmp", log.Fields{"Cookie": event.cookie, "event": event})
vpv := event.eventData.(*VoltPortVnet)
if isFlowStatusSuccess(flowStatus.Status, true) {
vpv.services.Range(ReceiverUpInd)
@@ -116,19 +111,19 @@
}
// ProcessServiceFlowAddEvent - Process Service Flow event trigger
-func ProcessServiceFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
- logger.Infow(ctx, "Processing Post Flow Add Event for Service", log.Fields{"Cookie": event.cookie, "event": event})
+func ProcessServiceFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+ logger.Debugw(ctx, "Processing Post Flow Add Event for Service", log.Fields{"Cookie": event.cookie, "event": event})
vs := event.eventData.(*VoltService)
if isFlowStatusSuccess(flowStatus.Status, true) {
- vs.FlowInstallSuccess(cntx, event.cookie, flowStatus.AdditionalData, flowEventMap)
+ vs.FlowInstallSuccess(cntx, event.cookie, flowStatus.AdditionalData)
} else {
- vs.FlowInstallFailure(cntx, event.cookie, flowStatus.Status, flowStatus.Reason, flowEventMap)
+ vs.FlowInstallFailure(cntx, event.cookie, flowStatus.Status, flowStatus.Reason)
}
}
// ProcessControlFlowAddEvent - Process Control Flow event trigger
-func ProcessControlFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
- logger.Infow(ctx, "Processing Post Flow Add Event for VPV", log.Fields{"Cookie": event.cookie, "event": event})
+func ProcessControlFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+ logger.Debugw(ctx, "Processing Post Flow Add Event for VPV", log.Fields{"Cookie": event.cookie, "event": event})
vpv := event.eventData.(*VoltPortVnet)
if !isFlowStatusSuccess(flowStatus.Status, true) {
vpv.FlowInstallFailure(event.cookie, flowStatus.Status, flowStatus.Reason)
@@ -136,19 +131,19 @@
}
// ProcessServiceFlowDelEvent - Process Service Flow event trigger
-func ProcessServiceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
- logger.Infow(ctx, "Processing Post Flow Remove Event for Service", log.Fields{"Cookie": event.cookie, "event": event})
+func ProcessServiceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+ logger.Debugw(ctx, "Processing Post Flow Remove Event for Service", log.Fields{"Cookie": event.cookie, "event": event})
vs := event.eventData.(*VoltService)
if isFlowStatusSuccess(flowStatus.Status, false) {
- vs.FlowRemoveSuccess(cntx, event.cookie, flowEventMap)
+ vs.FlowRemoveSuccess(cntx, event.cookie)
} else {
- vs.FlowRemoveFailure(cntx, event.cookie, flowStatus.Status, flowStatus.Reason, flowEventMap)
+ vs.FlowRemoveFailure(cntx, event.cookie, flowStatus.Status, flowStatus.Reason)
}
}
// ProcessControlFlowDelEvent - Process Control Flow event trigger
-func ProcessControlFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
- logger.Infow(ctx, "Processing Post Flow Remove Event for VPV", log.Fields{"Cookie": event.cookie, "event": event})
+func ProcessControlFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+ logger.Debugw(ctx, "Processing Post Flow Remove Event for VPV", log.Fields{"Cookie": event.cookie, "event": event})
vpv := event.eventData.(*VoltPortVnet)
if isFlowStatusSuccess(flowStatus.Status, false) {
vpv.FlowRemoveSuccess(cntx, event.cookie, event.device)
@@ -158,8 +153,8 @@
}
// ProcessMcastFlowDelEvent - Process Control Flow event trigger
-func ProcessMcastFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
- logger.Infow(ctx, "Processing Post Flow Remove Event for Mcast/Igmp", log.Fields{"Cookie": event.cookie, "event": event})
+func ProcessMcastFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+ logger.Debugw(ctx, "Processing Post Flow Remove Event for Mcast/Igmp", log.Fields{"Cookie": event.cookie, "event": event})
mvp := event.eventData.(*MvlanProfile)
if isFlowStatusSuccess(flowStatus.Status, false) {
mvp.FlowRemoveSuccess(cntx, event.cookie, event.device)
@@ -169,7 +164,7 @@
}
// ProcessDeviceFlowDelEvent - Process Control Flow event trigger
-func ProcessDeviceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
+func ProcessDeviceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
logger.Debugw(ctx, "Processing Post Flow Remove Event for VNET", log.Fields{"Cookie": event.cookie, "event": event})
vnet := event.eventData.(*VoltVnet)
if isFlowStatusSuccess(flowStatus.Status, false) {
@@ -181,7 +176,7 @@
// TODO: Update the func or flowStatus struct once all flow status are based on NB error code
func isFlowStatusSuccess(status uint32, flowAdd bool) bool {
- logger.Infow(ctx, "Processing isFlowStatusSuccess", log.Fields{"Status": status, "FlowAdd": flowAdd})
+ logger.Debugw(ctx, "Processing isFlowStatusSuccess", log.Fields{"Status": status, "FlowAdd": flowAdd})
result := false
errorCode := infraerrorcode.ErrorCode(status)
diff --git a/internal/pkg/application/flowevent_test.go b/internal/pkg/application/flowevent_test.go
index 6bd5da2..8621521 100644
--- a/internal/pkg/application/flowevent_test.go
+++ b/internal/pkg/application/flowevent_test.go
@@ -150,7 +150,7 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- ProcessUsIgmpFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
+ ProcessUsIgmpFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
})
}
}
@@ -162,12 +162,8 @@
flowStatus intf.FlowStatus
flowEventMap *util.ConcurrentMap
}
-
- flowPushCountMap := make(map[string]uint32)
vs := &VoltService{
- VoltServiceCfg: VoltServiceCfg{
- FlowPushCount: flowPushCountMap,
- },
+ VoltServiceCfg: VoltServiceCfg{},
}
tests := []struct {
@@ -202,7 +198,7 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- ProcessServiceFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, tt.args.flowEventMap)
+ ProcessServiceFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
})
}
}
@@ -241,7 +237,7 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- ProcessControlFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
+ ProcessControlFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
})
}
}
@@ -253,12 +249,8 @@
flowStatus intf.FlowStatus
flowEventMap *util.ConcurrentMap
}
-
- flowPushCountMap := make(map[string]uint32)
vs := &VoltService{
- VoltServiceCfg: VoltServiceCfg{
- FlowPushCount: flowPushCountMap,
- },
+ VoltServiceCfg: VoltServiceCfg{},
}
tests := []struct {
@@ -294,7 +286,7 @@
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutService(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
- ProcessServiceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, tt.args.flowEventMap)
+ ProcessServiceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
})
}
}
@@ -336,7 +328,7 @@
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutVpv(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
- ProcessControlFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
+ ProcessControlFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
})
}
}
@@ -381,7 +373,7 @@
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutMvlan(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
- ProcessMcastFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
+ ProcessMcastFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
})
}
}
@@ -431,9 +423,9 @@
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutVnet(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil).AnyTimes()
- ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
+ ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
case "ProcessDeviceFlowDelEvent_else_condition":
- ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
+ ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
}
})
}
diff --git a/internal/pkg/application/service.go b/internal/pkg/application/service.go
index 5f7178b..e060548 100644
--- a/internal/pkg/application/service.go
+++ b/internal/pkg/application/service.go
@@ -82,7 +82,7 @@
// MacAddress - The MAC hardware address learnt on the UNI interface
// MacAddresses - Not yet implemented. To be used to learn more MAC addresses
type VoltServiceCfg struct {
- Pbits []of.PbitType
+ DsRemarkPbitsMap map[int]int // Ex: Remark case {0:0,1:0} and No-remark case {1:1}
Name string
CircuitID string
Port string
@@ -94,20 +94,20 @@
RemoteIDType string
DataRateAttr string
ServiceType string
- DsRemarkPbitsMap map[int]int // Ex: Remark case {0:0,1:0} and No-remark case {1:1}
- RemoteID []byte
MacAddr net.HardwareAddr
- ONTEtherTypeClassification int
- SchedID int
+ RemoteID []byte
+ Pbits []of.PbitType
Trigger ServiceTrigger
MacLearning MacLearningType
+ ONTEtherTypeClassification int
+ SchedID int
PonPort uint32
MinDataRateUs uint32
MinDataRateDs uint32
MaxDataRateUs uint32
MaxDataRateDs uint32
- TechProfileID uint16
SVlanTpid layers.EthernetType
+ TechProfileID uint16
UniVlan of.VlanType
CVlan of.VlanType
SVlan of.VlanType
@@ -115,6 +115,7 @@
UsPonSTagPriority of.PbitType
DsPonSTagPriority of.PbitType
DsPonCTagPriority of.PbitType
+ ServiceDeactivateReason SvcDeactivateReason // Mentions why the service was deactivated
VlanControl VlanControl
IsOption82Enabled bool
IgmpEnabled bool
@@ -122,8 +123,6 @@
AllowTransparent bool
EnableMulticastKPI bool
IsActivated bool
- FlowPushCount map[string]uint32 // Tracks the number of flow install/delete failure attempts per cookie in order to throttle flow auditing
- ServiceDeactivateReason SvcDeactivateReason // Mentions why the service was deactivated
}
// VoltServiceOper structure
@@ -215,7 +214,6 @@
vs.Ipv6Addr = net.ParseIP("::")
vs.PendingFlows = make(map[string]bool)
vs.AssociatedFlows = make(map[string]bool)
- vs.FlowPushCount = make(map[string]uint32)
return &vs
}
@@ -302,10 +300,12 @@
func (vs *VoltService) AddHsiaFlows(cntx context.Context) {
logger.Debugw(ctx, "Add US & DS HSIA Flows for the service", log.Fields{"ServiceName": vs.Name})
if err := vs.AddUsHsiaFlows(cntx); err != nil {
+ logger.Errorw(ctx, "Error adding US HSIA Flows", log.Fields{"Service": vs.Name, "Port": vs.Port, "Reason": err.Error()})
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vs.triggerServiceFailureInd(statusCode, statusMessage)
}
if err := vs.AddDsHsiaFlows(cntx); err != nil {
+ logger.Errorw(ctx, "Error adding DS HSIA Flows", log.Fields{"Service": vs.Name, "Port": vs.Port, "Reason": err.Error()})
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vs.triggerServiceFailureInd(statusCode, statusMessage)
}
@@ -315,11 +315,13 @@
func (vs *VoltService) DelHsiaFlows(cntx context.Context) {
logger.Debugw(ctx, "Delete US & DS HSIA Flows for the service", log.Fields{"ServiceName": vs.Name})
if err := vs.DelUsHsiaFlows(cntx, false); err != nil {
+ logger.Errorw(ctx, "Error deleting US HSIA Flows", log.Fields{"Service": vs.Name, "Port": vs.Port, "Reason": err.Error()})
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vs.triggerServiceFailureInd(statusCode, statusMessage)
}
if err := vs.DelDsHsiaFlows(cntx, false); err != nil {
+ logger.Errorw(ctx, "Error deleting DS HSIA Flows", log.Fields{"Service": vs.Name, "Port": vs.Port, "Reason": err.Error()})
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vs.triggerServiceFailureInd(statusCode, statusMessage)
}
@@ -346,7 +348,7 @@
// AddUsHsiaFlows - Add US HSIA Flows for the service
func (vs *VoltService) AddUsHsiaFlows(cntx context.Context) error {
- logger.Infow(ctx, "Configuring US HSIA Service Flows", log.Fields{"Device": vs.Device, "ServiceName": vs.Name})
+ logger.Infow(ctx, "Configuring US HSIA Service Flows", log.Fields{"Device": vs.Device, "ServiceName": vs.Name, "Port": vs.Port})
if vs.DeleteInProgress || vs.UpdateInProgress {
logger.Warnw(ctx, "Ignoring US HSIA Flow Push, Service deleteion In-Progress", log.Fields{"Device": vs.Device, "Service": vs.Name})
return nil
@@ -356,6 +358,7 @@
if !vs.UsHSIAFlowsApplied || vgcRebooted {
device, err := va.GetDeviceFromPort(vs.Port)
if err != nil {
+ logger.Errorw(ctx, "Error Getting Device for Service and Port", log.Fields{"Service": vs.Name, "Port": vs.Port, "Error": err})
return fmt.Errorf("Error Getting Device for Service %s and Port %s : %w", vs.Name, vs.Port, err)
} else if device.State != controller.DeviceStateUP {
logger.Warnw(ctx, "Device state Down. Ignoring US HSIA Flow Push", log.Fields{"Service": vs.Name, "Port": vs.Port})
@@ -398,7 +401,7 @@
// AddDsHsiaFlows - Add DS HSIA Flows for the service
func (vs *VoltService) AddDsHsiaFlows(cntx context.Context) error {
- logger.Infow(ctx, "Configuring DS HSIA Service Flows", log.Fields{"Device": vs.Device, "ServiceName": vs.Name})
+ logger.Infow(ctx, "Configuring DS HSIA Service Flows", log.Fields{"Device": vs.Device, "ServiceName": vs.Name, "Port": vs.Port})
if vs.DeleteInProgress {
logger.Warnw(ctx, "Ignoring DS HSIA Flow Push, Service deleteion In-Progress", log.Fields{"Device": vs.Device, "Service": vs.Name})
return nil
@@ -408,6 +411,7 @@
if !vs.DsHSIAFlowsApplied || vgcRebooted {
device, err := va.GetDeviceFromPort(vs.Port)
if err != nil {
+ logger.Errorw(ctx, "Error Getting Device for Service and Port", log.Fields{"Service": vs.Name, "Port": vs.Port, "Error": err})
return fmt.Errorf("Error Getting Device for Service %s and Port %s : %w", vs.Name, vs.Port, err)
} else if device.State != controller.DeviceStateUP {
logger.Warnw(ctx, "Device state Down. Ignoring DS HSIA Flow Push", log.Fields{"Service": vs.Name, "Port": vs.Port})
@@ -473,6 +477,7 @@
if vs.UsHSIAFlowsApplied || vgcRebooted {
device, err := GetApplication().GetDeviceFromPort(vs.Port)
if err != nil {
+ logger.Errorw(ctx, "Error Getting Device for Servic and Port", log.Fields{"Device": vs.Device, "ServiceName": vs.Name, "Reason": err})
return fmt.Errorf("Error Getting Device for Service %s and Port %s : %w", vs.Name, vs.Port, err)
}
pBits := vs.Pbits
@@ -746,7 +751,7 @@
// BuildUsHsiaFlows build the US HSIA flows
// Called for add/delete HSIA flows
func (vs *VoltService) BuildUsHsiaFlows(pbits of.PbitType) (*of.VoltFlow, error) {
- logger.Debugw(ctx, "Building US HSIA Service Flows", log.Fields{"Device": vs.Device, "ServiceName": vs.Name})
+ logger.Debugw(ctx, "Building US HSIA Service Flows", log.Fields{"Device": vs.Device, "ServiceName": vs.Name, "Port": vs.Port})
flow := &of.VoltFlow{}
flow.SubFlows = make(map[uint64]*of.VoltSubFlow)
@@ -1059,17 +1064,13 @@
vs.DeactivateInProgress = oper.DeactivateInProgress
vs.BwAvailInfo = oper.BwAvailInfo
vs.Device = oper.Device
- // FlowPushCount is newly introduced map and it can be nil when VGC is upgraded. Hence adding a nil check to handle backward compatibility
- if cfg.FlowPushCount != nil {
- vs.FlowPushCount = cfg.FlowPushCount
- }
vs.ServiceDeactivateReason = cfg.ServiceDeactivateReason
} else {
// Sorting Pbit from highest
sort.Slice(vs.Pbits, func(i, j int) bool {
return vs.Pbits[i] > vs.Pbits[j]
})
- logger.Infow(ctx, "Sorted Pbits", log.Fields{"Pbits": vs.Pbits})
+ logger.Debugw(ctx, "Sorted Pbits", log.Fields{"Pbits": vs.Pbits})
}
logger.Infow(ctx, "VolthService...", log.Fields{"vs": vs.Name})
@@ -1151,7 +1152,7 @@
// DelServiceWithPrefix - Deletes service with the provided prefix.
// Added for DT/TT usecase with sadis replica interface
func (va *VoltApplication) DelServiceWithPrefix(cntx context.Context, prefix string) error {
- logger.Infow(ctx, "Delete Service With provided Prefix", log.Fields{"Prefix": prefix})
+ logger.Debugw(ctx, "Delete Service With provided Prefix", log.Fields{"Prefix": prefix})
var isServiceExist bool
va.ServiceByName.Range(func(key, value interface{}) bool {
srvName := key.(string)
@@ -1202,9 +1203,12 @@
vs.ForceDelete = forceDelete
vs.ForceWriteToDb(cntx)
+ vs.ServiceLock.RLock()
if len(vs.AssociatedFlows) == 0 {
noFlowsPresent = true
}
+ vs.ServiceLock.RUnlock()
+
vpv.VpvLock.Lock()
defer vpv.VpvLock.Unlock()
@@ -1301,12 +1305,8 @@
// FlowInstallSuccess - Called when corresponding service flow installation is success
// If no more pending flows, HSIA indication wil be triggered
-func (vs *VoltService) FlowInstallSuccess(cntx context.Context, cookie string, bwAvailInfo of.BwAvailDetails, flowEventMap *util.ConcurrentMap) {
+func (vs *VoltService) FlowInstallSuccess(cntx context.Context, cookie string, bwAvailInfo of.BwAvailDetails) {
logger.Debugw(ctx, "Flow Add Success Notification", log.Fields{"Cookie": cookie, "bwAvailInfo": bwAvailInfo, "Service": vs.Name})
- flowEventMap.MapLock.Lock()
- flowEventMap.Remove(cookie)
- flowEventMap.MapLock.Unlock()
-
if vs.DeleteInProgress {
logger.Warnw(ctx, "Skipping Flow Add Success Notification. Service deletion in-progress", log.Fields{"Cookie": cookie, "Service": vs.Name})
return
@@ -1321,7 +1321,6 @@
delete(vs.PendingFlows, cookie)
vs.AssociatedFlows[cookie] = true
- vs.FlowPushCount[cookie] = 0
vs.ServiceLock.Unlock()
var prevBwAvail, presentBwAvail string
if bwAvailInfo.PrevBw != "" && bwAvailInfo.PresentBw != "" {
@@ -1332,7 +1331,9 @@
}
vs.WriteToDb(cntx)
+ vs.ServiceLock.RLock()
if len(vs.PendingFlows) == 0 && vs.DsHSIAFlowsApplied {
+ vs.ServiceLock.RUnlock()
device, err := GetApplication().GetDeviceFromPort(vs.Port)
if err != nil {
logger.Errorw(ctx, "Error Getting Device. Dropping HSIA Success indication to NB", log.Fields{"Reason": err.Error(), "Service": vs.Name, "Port": vs.Port})
@@ -1349,24 +1350,16 @@
logger.Infow(ctx, "All Flows installed for Service", log.Fields{"Service": vs.Name})
return
}
- logger.Infow(ctx, "Processed Service Flow Add Success Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
+ vs.ServiceLock.RUnlock()
+ logger.Debugw(ctx, "Processed Service Flow Add Success Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
}
// FlowInstallFailure - Called when corresponding service flow installation is failed
// Trigger service failure indication to NB
-func (vs *VoltService) FlowInstallFailure(cntx context.Context, cookie string, errorCode uint32, errReason string, flowEventMap *util.ConcurrentMap) {
+func (vs *VoltService) FlowInstallFailure(cntx context.Context, cookie string, errorCode uint32, errReason string) {
logger.Debugw(ctx, "Service flow installation failure", log.Fields{"Service": vs.Name, "Cookie": cookie, "errorCode": errorCode, "errReason": errReason})
-
- isServiceDeactivated := vs.CheckAndDeactivateService(cntx, cookie)
- if isServiceDeactivated {
- flowEventMap.MapLock.Lock()
- for ck := range vs.PendingFlows {
- flowEventMap.Remove(ck)
- }
- flowEventMap.MapLock.Unlock()
- }
-
vs.ServiceLock.RLock()
+
if _, ok := vs.PendingFlows[cookie]; !ok {
logger.Errorw(ctx, "Flow Add Failure for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie})
vs.ServiceLock.RUnlock()
@@ -1380,7 +1373,7 @@
// DelFlows - Deletes the flow from the service
// Triggers flow deletion after registering for flow indication event
func (vs *VoltService) DelFlows(cntx context.Context, device *VoltDevice, flow *of.VoltFlow, delFlowsInDevice bool) error {
- logger.Debugw(ctx, "Delete the flow from the service", log.Fields{"Port": vs.Port, "Device": device.Name})
+ logger.Infow(ctx, "Delete the flow from the service", log.Fields{"Port": vs.Port, "Device": device.Name, "cookie": flow.MigrateCookie})
if !vs.ForceDelete {
// Using locks instead of concurrent map for AssociatedFlows to avoid
// race condition during flow response indication processing
@@ -1410,56 +1403,16 @@
}
}
-// CheckAndDeactivateService - deactivate service and remove flows from DB, if the max flows retry attempt has reached
-func (vs *VoltService) CheckAndDeactivateService(cntx context.Context, cookie string) bool {
- vs.ServiceLock.Lock()
- logger.Debugw(ctx, "Check and Deactivate service if flow install threshold is reached and remove flows from DB/Device", log.Fields{"serviceName": vs.Name, "FlowPushCount": vs.FlowPushCount[cookie]})
- vs.FlowPushCount[cookie]++
- if vs.FlowPushCount[cookie] == controller.GetController().GetMaxFlowRetryAttempt() {
- if vs.IsActivated {
- vs.ServiceLock.Unlock()
- device, err := GetApplication().GetDeviceFromPort(vs.Port)
- if err != nil {
- // Even if the port/device does not exists at this point in time, the deactivate request is succss.
- // So no error is returned
- logger.Warnw(ctx, "Error Getting Device", log.Fields{"Reason": err.Error(), "Port": vs.Port})
- return false
- }
- vs.SetSvcDeactivationFlags(SvcDeacRsn_Controller)
- GetApplication().ServiceByName.Store(vs.Name, vs)
- p := device.GetPort(vs.Port)
- if p != nil && (p.State == PortStateUp) {
- if vpv := GetApplication().GetVnetByPort(vs.Port, vs.SVlan, vs.CVlan, vs.UniVlan); vpv != nil {
- // Port down call internally deletes all the flows
- vpv.PortDownInd(cntx, vs.Device, vs.Port, true, true)
- } else {
- logger.Warnw(ctx, "VPV does not exists!!!", log.Fields{"Device": vs.Device, "port": vs.Port, "SvcName": vs.Name})
- }
- }
- vs.DeactivateInProgress = false
- GetApplication().ServiceByName.Store(vs.Name, vs)
- vs.WriteToDb(cntx)
- logger.Infow(ctx, "Service deactivated after max flow install attempts", log.Fields{"SvcName": vs.Name, "Cookie": cookie})
- return true
- }
- }
- vs.ServiceLock.Unlock()
- return false
-}
-
// FlowRemoveSuccess - Called when corresponding service flow removal is success
// If no more associated flows, DelHSIA indication wil be triggered
-func (vs *VoltService) FlowRemoveSuccess(cntx context.Context, cookie string, flowEventMap *util.ConcurrentMap) {
+func (vs *VoltService) FlowRemoveSuccess(cntx context.Context, cookie string) {
// if vs.DeleteInProgress {
// logger.Warnw(ctx, "Skipping Flow Remove Success Notification. Service deletion in-progress", log.Fields{"Cookie": cookie, "Service": vs.Name})
// return
// }
- flowEventMap.MapLock.Lock()
- flowEventMap.Remove(cookie)
- flowEventMap.MapLock.Unlock()
vs.ServiceLock.Lock()
- logger.Infow(ctx, "Processing Service Flow Remove Success Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
+ logger.Debugw(ctx, "Processing Service Flow Remove Success Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
if _, ok := vs.AssociatedFlows[cookie]; ok {
delete(vs.AssociatedFlows, cookie)
@@ -1468,13 +1421,12 @@
} else {
logger.Debugw(ctx, "Service Flow Remove Success for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie, "AssociatedFlows": vs.AssociatedFlows, "PendingFlows": vs.PendingFlows})
}
-
- vs.FlowPushCount[cookie] = 0
vs.ServiceLock.Unlock()
-
vs.WriteToDb(cntx)
+ vs.ServiceLock.RLock()
if len(vs.AssociatedFlows) == 0 && !vs.DsHSIAFlowsApplied {
+ vs.ServiceLock.RUnlock()
device := GetApplication().GetDevice(vs.Device)
if device == nil {
logger.Errorw(ctx, "Error Getting Device. Dropping DEL_HSIA Success indication to NB", log.Fields{"Service": vs.Name, "Port": vs.Port})
@@ -1490,20 +1442,25 @@
return
}
logger.Infow(ctx, "All Flows removed for Service. Triggering Service De-activation Success indication to NB", log.Fields{"Service": vs.Name, "DeleteFlag": vs.DeleteInProgress})
- vs.CheckAndDeleteService(cntx)
+ // Get the service from application before proceeding to delete, as the service might have been activated
+ // by the time the flow removal response is received from SB
+ svc := GetApplication().GetService(vs.Name)
+ if svc != nil {
+ svc.CheckAndDeleteService(cntx)
+ }
return
}
- logger.Infow(ctx, "Processed Service Flow Remove Success Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
+ vs.ServiceLock.RUnlock()
+ logger.Debugw(ctx, "Processed Service Flow Remove Success Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
}
// FlowRemoveFailure - Called when corresponding service flow installation is failed
// Trigger service failure indication to NB
-func (vs *VoltService) FlowRemoveFailure(cntx context.Context, cookie string, errorCode uint32, errReason string, flowEventMap *util.ConcurrentMap) {
+func (vs *VoltService) FlowRemoveFailure(cntx context.Context, cookie string, errorCode uint32, errReason string) {
vs.ServiceLock.Lock()
- logger.Debugw(ctx, "Processing Service Flow Remove Failure Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied, "FlowPushCount": vs.FlowPushCount[cookie]})
+ logger.Debugw(ctx, "Processing Service Flow Remove Failure Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
- vs.FlowPushCount[cookie]++
if _, ok := vs.AssociatedFlows[cookie]; !ok {
logger.Warnw(ctx, "Flow Failure for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie})
vs.ServiceLock.Unlock()
@@ -1516,7 +1473,12 @@
logger.Debugw(ctx, "Service Flow Remove Failure Notification", log.Fields{"uniPort": vs.Port, "Cookie": cookie, "Service": vs.Name, "ErrorCode": errorCode, "ErrorReason": errReason})
vs.triggerServiceFailureInd(errorCode, errReason)
- vs.CheckAndDeleteService(cntx)
+ // Get the service from application before proceeding to delete, as the service might have been activated
+ // by the time the flow removal response is received from SB
+ svc := GetApplication().GetService(vs.Name)
+ if svc != nil {
+ svc.CheckAndDeleteService(cntx)
+ }
}
func (vs *VoltService) triggerServiceFailureInd(errorCode uint32, errReason string) {
@@ -1554,12 +1516,12 @@
}
if vvs.VoltServiceOper.DeactivateInProgress {
- va.ServicesToDeactivate[vvs.VoltServiceCfg.Name] = true
+ va.ServicesToDeactivate.Store(vvs.VoltServiceCfg.Name, true)
logger.Warnw(ctx, "Service (restored) to be deactivated", log.Fields{"Service": vvs.Name})
}
if vvs.VoltServiceOper.DeleteInProgress {
- va.ServicesToDelete[vvs.VoltServiceCfg.Name] = true
+ va.ServicesToDelete.Store(vvs.VoltServiceCfg.Name, true)
logger.Warnw(ctx, "Service (restored) to be deleted", log.Fields{"Service": vvs.Name})
}
}
@@ -2248,12 +2210,11 @@
// DeactivateService to activate pre-provisioned service
func (va *VoltApplication) DeactivateService(cntx context.Context, deviceID, portNo string, sVlan, cVlan of.VlanType, tpID uint16) error {
- logger.Infow(ctx, "Service Deactivate Request ", log.Fields{"Device": deviceID, "Port": portNo, "Svaln": sVlan, "Cvlan": cVlan, "TpID": tpID})
+ logger.Debugw(ctx, "Service Deactivate Request ", log.Fields{"Device": deviceID, "Port": portNo, "Svaln": sVlan, "Cvlan": cVlan, "TpID": tpID})
va.ServiceByName.Range(func(key, value interface{}) bool {
vs := value.(*VoltService)
// If svlan if provided, then the tags and tpID of service has to be matching
- logger.Debugw(ctx, "Service Deactivate Request ", log.Fields{"Device": deviceID, "Port": portNo})
if sVlan != of.VlanNone && (sVlan != vs.SVlan || cVlan != vs.CVlan || tpID != vs.TechProfileID) {
logger.Warnw(ctx, "condition not matched", log.Fields{"Device": deviceID, "Port": portNo, "sVlan": sVlan, "cVlan": cVlan, "tpID": tpID})
return true
@@ -2277,11 +2238,13 @@
if vpv.IgmpEnabled {
va.ReceiverDownInd(cntx, deviceID, portNo)
}
- vs.DeactivateInProgress = false
} else {
logger.Warnw(ctx, "VPV does not exists!!!", log.Fields{"Device": deviceID, "port": portNo, "SvcName": vs.Name})
}
}
+ vs.DeactivateInProgress = false
+ va.ServiceByName.Store(vs.Name, vs)
+ vs.WriteToDb(cntx)
}
return true
})
diff --git a/internal/pkg/application/service_test.go b/internal/pkg/application/service_test.go
index 13201ff..746666a 100644
--- a/internal/pkg/application/service_test.go
+++ b/internal/pkg/application/service_test.go
@@ -140,11 +140,11 @@
Name: "test_service_name",
},
}
- serviceToDelete := map[string]bool{}
- serviceToDelete[voltService4.VoltServiceCfg.Name] = true
+
va := &VoltApplication{
- ServicesToDelete: serviceToDelete,
+ ServicesToDelete: sync.Map{},
}
+ va.ServicesToDelete.Store(voltService4.VoltServiceCfg.Name, true)
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
switch tt.name {
@@ -224,30 +224,24 @@
switch tt.name {
case "VoltService_FlowRemoveFailure":
associatedFlows := map[string]bool{}
- flowPushCountMap := map[string]uint32{}
associatedFlows["test_cookie"] = true
vs := &VoltService{
VoltServiceOper: VoltServiceOper{
AssociatedFlows: associatedFlows,
},
- VoltServiceCfg: VoltServiceCfg{
- FlowPushCount: flowPushCountMap,
- },
+ VoltServiceCfg: VoltServiceCfg{},
}
- vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
+ vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason)
case "cookie_not_found":
associatedFlows := map[string]bool{}
- flowPushCountMap := map[string]uint32{}
associatedFlows["cookie"] = true
vs := &VoltService{
VoltServiceOper: VoltServiceOper{
AssociatedFlows: associatedFlows,
},
- VoltServiceCfg: VoltServiceCfg{
- FlowPushCount: flowPushCountMap,
- },
+ VoltServiceCfg: VoltServiceCfg{},
}
- vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
+ vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason)
}
})
}
@@ -560,7 +554,6 @@
pendingFlows := map[string]bool{}
pendingFlows["test_cookie"] = true
associatedFlows := map[string]bool{}
- flowPushCountMap := map[string]uint32{}
associatedFlows["test_cookie"] = true
vs := &VoltService{
VoltServiceOper: VoltServiceOper{
@@ -569,14 +562,13 @@
DsHSIAFlowsApplied: true,
},
VoltServiceCfg: VoltServiceCfg{
- Port: "test_port",
- FlowPushCount: flowPushCountMap,
+ Port: "test_port",
},
}
ga := GetApplication()
ga.PortsDisc.Store("test_port", voltPort)
ga.DevicesDisc.Store(test_device, voltDevice)
- vs.FlowInstallSuccess(tt.args.cntx, tt.args.cookie, tt.args.bwAvailInfo, tt.args.flowEventMap)
+ vs.FlowInstallSuccess(tt.args.cntx, tt.args.cookie, tt.args.bwAvailInfo)
})
}
}
@@ -2357,19 +2349,16 @@
pendingFlows["test_cookie"] = true
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- flowPushCountMap := map[string]uint32{}
vs := &VoltService{
VoltServiceOper: VoltServiceOper{},
- VoltServiceCfg: VoltServiceCfg{
- FlowPushCount: flowPushCountMap,
- },
+ VoltServiceCfg: VoltServiceCfg{},
}
switch tt.name {
case "VoltService_FlowInstallFailure":
vs.PendingFlows = pendingFlows
- vs.FlowInstallFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
+ vs.FlowInstallFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason)
case "PendingFlows[cookie]_false":
- vs.FlowInstallFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
+ vs.FlowInstallFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason)
}
})
}
@@ -2396,14 +2385,11 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- flowPushCountMap := map[string]uint32{}
vs := &VoltService{
VoltServiceOper: VoltServiceOper{
Device: test_device,
},
- VoltServiceCfg: VoltServiceCfg{
- FlowPushCount: flowPushCountMap,
- },
+ VoltServiceCfg: VoltServiceCfg{},
}
ga := GetApplication()
ga.DevicesDisc.Store(test_device, voltDevice2)
@@ -2411,7 +2397,7 @@
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutService(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
- vs.FlowRemoveSuccess(tt.args.cntx, tt.args.cookie, tt.args.flowEventMap)
+ vs.FlowRemoveSuccess(tt.args.cntx, tt.args.cookie)
})
}
}
diff --git a/internal/pkg/application/timer.go b/internal/pkg/application/timer.go
index 42b16d8..7ad35c3 100644
--- a/internal/pkg/application/timer.go
+++ b/internal/pkg/application/timer.go
@@ -17,6 +17,7 @@
import (
"context"
+ "sync"
"time"
"voltha-go-controller/log"
)
@@ -34,7 +35,7 @@
pendingPoolTimer: false,
}
-var timerChannels = make(map[TimerType](chan bool))
+var timerChannels = sync.Map{}
// TimerCfg structure
type TimerCfg struct {
@@ -49,7 +50,8 @@
return
}
timerMap[timerType] = true
- timerChannels[timerType] = make(chan bool)
+ ch := make(chan bool)
+ timerChannels.Store(timerType, ch)
for {
select {
case <-time.After(cfg.tick):
@@ -59,7 +61,7 @@
case pendingPoolTimer:
va.removeExpiredGroups(cntx)
}
- case <-timerChannels[timerType]:
+ case <-ch:
return
}
}
@@ -67,7 +69,11 @@
// StopTimer to stop timers
func StopTimer() {
- for _, ch := range timerChannels {
+ timerChannels.Range(func(key, value interface{}) bool {
+ ch := value.(chan bool)
ch <- true
- }
+ /* Range calls function sequentially for each key and value present in the map.
+ If function returns false, range stops the iteration. */
+ return true
+ })
}
diff --git a/internal/pkg/application/vnets.go b/internal/pkg/application/vnets.go
index 72c43fd..80e8212 100644
--- a/internal/pkg/application/vnets.go
+++ b/internal/pkg/application/vnets.go
@@ -314,7 +314,7 @@
devicesToHandle = append(devicesToHandle, serialNum)
}
if len(devicesToHandle) == 0 {
- logger.Debugw(ctx, "Ignoring Duplicate VNET by name ", log.Fields{"Vnet": cfg.Name})
+ logger.Warnw(ctx, "Ignoring Duplicate VNET by name ", log.Fields{"Vnet": cfg.Name})
AppMutex.VnetMutex.Unlock()
return nil
}
@@ -761,7 +761,7 @@
func (vpv *VoltPortVnet) PortUpInd(cntx context.Context, device *VoltDevice, port string) {
logger.Infow(ctx, "Port UP Ind, pushing flows for the port", log.Fields{"Device": device, "Port": port, "VnetDhcp": vpv.DhcpRelay, "McastService": vpv.McastService})
if vpv.DeleteInProgress {
- logger.Warnw(ctx, "Ignoring VPV Port UP Ind, VPV deleteion In-Progress", log.Fields{"Device": device, "Port": port, "Vnet": vpv.VnetName})
+ logger.Warnw(ctx, "Ignoring VPV Port UP Ind, VPV deletion In-Progress", log.Fields{"Device": device, "Port": port, "Vnet": vpv.VnetName})
return
}
vpv.setDevice(device.Name)
@@ -856,7 +856,7 @@
return
}
logger.Infow(ctx, "VPV Port DOWN Ind, deleting all flows for services",
- log.Fields{"service count": vpv.servicesCount.Load()})
+ log.Fields{"service count": vpv.servicesCount.Load(), "Port": port})
//vpv.RangeOnServices(cntx, DelAllFlows)
vpv.DelTrapFlows(cntx)
@@ -977,6 +977,16 @@
return service
}
+func (vpv *VoltPortVnet) GetSvcFromVPV() *VoltService {
+ var service *VoltService
+ vpv.services.Range(func(key, value interface{}) bool {
+ service = value.(*VoltService)
+ logger.Debugw(ctx, "Get Service from VPV", log.Fields{"Service": value})
+ return false
+ })
+ return service
+}
+
// GetRemarkedPriority : If the VNET matches priority of the incoming packet with any service, return true. Else, return false
func (vpv *VoltPortVnet) GetRemarkedPriority(priority uint8) uint8 {
logger.Debugw(ctx, "Get Remarked Priority", log.Fields{"Priority": priority})
@@ -1064,7 +1074,7 @@
voltDevice, err := GetApplication().GetDeviceFromPort(vpv.Port)
if err != nil {
- logger.Warnw(ctx, "Not pushing Service Flows: Error Getting Device", log.Fields{"Reason": err.Error()})
+ logger.Errorw(ctx, "Not pushing Service Flows: Error Getting Device", log.Fields{"Reason": err.Error()})
// statusCode, statusMessage := errorCodes.GetErrorInfo(err)
// TODO-COMM: vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
return
@@ -1078,7 +1088,7 @@
devConfig := GetApplication().GetDeviceConfig(voltDevice.SerialNum)
if devConfig.UplinkPort != voltDevice.NniPort {
- logger.Warnw(ctx, "NNI port mismatch", log.Fields{"NB NNI Port": devConfig.UplinkPort, "SB NNI port": voltDevice.NniPort})
+ logger.Errorw(ctx, "NNI port mismatch", log.Fields{"NB NNI Port": devConfig.UplinkPort, "SB NNI port": voltDevice.NniPort})
return
}
// Push Service Flows if DHCP relay is not configured
@@ -1308,7 +1318,7 @@
// DelHsiaFlows deletes the service flows
func (vpv *VoltPortVnet) DelHsiaFlows(cntx context.Context, delFlowsInDevice bool) {
- logger.Infow(ctx, "Received Delete Hsia Flows", log.Fields{"McastService": vpv.McastService})
+ logger.Debugw(ctx, "Received Delete Hsia Flows", log.Fields{"McastService": vpv.McastService})
// no HSIA flows for multicast service
if !vpv.McastService {
vpv.RangeOnServices(cntx, DelUsHsiaFlows, delFlowsInDevice)
@@ -1597,7 +1607,7 @@
// Write the status of the VPV to the DB once the delete is scheduled
// for dispatch
func (vpv *VoltPortVnet) DelDsPppoeFlows(cntx context.Context) error {
- logger.Debugw(ctx, "Deleting DS PPPoE flows", log.Fields{"STAG": vpv.SVlan, "CTAG": vpv.CVlan, "Device": vpv.Device})
+ logger.Infow(ctx, "Deleting DS PPPoE flows", log.Fields{"STAG": vpv.SVlan, "CTAG": vpv.CVlan, "Device": vpv.Device})
device, err := GetApplication().GetDeviceFromPort(vpv.Port)
if err != nil {
return fmt.Errorf("DS PPPoE Flow Delete Failed : DeviceName %s : %w", device.Name, err)
@@ -2401,17 +2411,22 @@
case ONUCVlan,
None:
service = vnet.MatchesPriority(priority)
- // In case of DHCP Flow - cvlan == VlanNone
// In case of HSIA Flow - cvlan == Svlan
- if len(vlans) == 1 && (vlans[0] == vnet.SVlan || vlans[0] == of.VlanNone) && service != nil {
+ // In case of DHCP flow, match for cvlan as we are setting cvlan for DHCP flows
+ if len(vlans) == 1 && (vlans[0] == vnet.CVlan || vlans[0] == of.VlanNone) && (service != nil && service.IsActivated) {
return service
}
- case OLTCVlanOLTSVlan,
- OLTSVlan:
+ case OLTCVlanOLTSVlan:
service = vnet.MatchesPriority(priority)
if len(vlans) == 1 && vlans[0] == vnet.UniVlan && service != nil {
return service
}
+ case OLTSVlan:
+ // For OLTSVlan, return only the active service attached to the VPV.
+ service = vnet.GetSvcFromVPV()
+ if service != nil && service.IsActivated {
+ return service
+ }
default:
logger.Warnw(ctx, "Invalid Vlan Control Option", log.Fields{"Value": vnet.VlanControl})
}
@@ -2696,7 +2711,7 @@
// DeleteDevFlowForVlanFromDevice to delete icmpv6 flow for vlan from device
func (va *VoltApplication) DeleteDevFlowForVlanFromDevice(cntx context.Context, vnet *VoltVnet, deviceSerialNum string) {
- logger.Debugw(ctx, "DeleteDevFlowForVlanFromDevice", log.Fields{"Device-serialNum": deviceSerialNum, "SVlan": vnet.SVlan, "CVlan": vnet.CVlan})
+ logger.Infow(ctx, "DeleteDevFlowForVlanFromDevice", log.Fields{"Device-serialNum": deviceSerialNum, "SVlan": vnet.SVlan, "CVlan": vnet.CVlan})
delflows := func(key interface{}, value interface{}) bool {
device := value.(*VoltDevice)
if device.SerialNum != deviceSerialNum {
diff --git a/internal/pkg/controller/addflows.go b/internal/pkg/controller/addflows.go
index b7d42cd..a4f8a8d 100644
--- a/internal/pkg/controller/addflows.go
+++ b/internal/pkg/controller/addflows.go
@@ -93,7 +93,7 @@
if err.Error() == ErrDuplicateFlow {
dbFlow, _ := aft.device.GetFlow(flow.Cookie)
if dbFlow.State == of.FlowAddSuccess {
- aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
+ aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
flowsPresent++
continue
}
@@ -109,7 +109,8 @@
// aft.device.AddFlowToDb(dbFlow)
flowsToProcess[flow.Cookie] = dbFlow
}
- aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, false)
+ // Below call will delete flow from DB and will not allow to maintain flow count and state. Hence commenting the below call.
+ //aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, false)
}
}
@@ -126,7 +127,7 @@
for _, flow := range aft.flow.SubFlows {
logger.Warnw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
if aft.flow.Command == of.CommandDel {
- aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
+ aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
}
}
return nil
@@ -162,7 +163,7 @@
}
break
}
- aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, err, true)
+ aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, err)
} else {
logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
}
diff --git a/internal/pkg/controller/auditdevice.go b/internal/pkg/controller/auditdevice.go
index 461f54a..ecee452 100644
--- a/internal/pkg/controller/auditdevice.go
+++ b/internal/pkg/controller/auditdevice.go
@@ -121,7 +121,7 @@
// This port exists in the received list and the map at
// VGC. This is common so delete it
logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
- ad.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State)
+ ad.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name)
} else {
//To ensure the flows are in sync with port status and no mismatch due to reboot,
// repush/delete flows based on current port status
@@ -179,7 +179,7 @@
logger.Warnw(ctx, "AddPort Failed", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name, "Reason": err})
}
if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
- ad.device.ProcessPortState(cntx, mp.PortNo, mp.State)
+ ad.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name)
}
logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
}
diff --git a/internal/pkg/controller/audittables.go b/internal/pkg/controller/audittables.go
index d5ce858..0be505a 100644
--- a/internal/pkg/controller/audittables.go
+++ b/internal/pkg/controller/audittables.go
@@ -257,7 +257,7 @@
break
}
- logger.Debugw(ctx, "Auditing Flow", log.Fields{"Cookie": flow.Cookie})
+ logger.Debugw(ctx, "Auditing Flow", log.Fields{"Cookie": flow.Cookie, "State": flow.State})
if _, ok := rcvdFlows[flow.Cookie]; ok {
// The flow exists in the device too. Just remove it from
// the received flows & trigger flow success indication unless
@@ -265,13 +265,26 @@
if flow.State != of.FlowDelFailure && flow.State != of.FlowDelPending {
delete(rcvdFlows, flow.Cookie)
+ } else {
+ // Update flow delete count since we are retrying the flow delete due to failure
+ att.device.UpdateFlowCount(cntx, flow.Cookie)
}
defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
} else {
// The flow exists at the controller but not at the device
// Push the flow to the device
logger.Debugw(ctx, "Adding Flow To Missing Flows", log.Fields{"Cookie": flow.Cookie})
- flowsToAdd.SubFlows[flow.Cookie] = flow
+ if !att.device.IsFlowAddThresholdReached(flow.FlowCount, flow.Cookie) {
+ flowsToAdd.SubFlows[flow.Cookie] = flow
+ att.device.UpdateFlowCount(cntx, flow.Cookie)
+ } else if flow.State != of.FlowDelFailure {
+ // Release the lock before deactivating service, as we acquire the same lock to delete flows
+ att.device.flowLock.Unlock()
+ // If flow add threshold has reached, deactivate the service corresponding to the UNI
+ GetController().CheckAndDeactivateService(cntx, flow, att.device.SerialNum, att.device.ID)
+ // Acquire the lock again for processing remaining flows
+ att.device.flowLock.Lock()
+ }
}
}
att.device.flowLock.Unlock()
@@ -301,10 +314,8 @@
return
}
for _, flow := range ofFlows {
- var dbFlow *of.VoltSubFlow
- var present bool
if flow.FlowMod != nil {
- if dbFlow, present = att.device.GetFlow(flow.FlowMod.Cookie); !present {
+ if _, present := att.device.GetFlow(flow.FlowMod.Cookie); !present {
logger.Warnw(ctx, "Flow Removed from DB. Ignoring Add Missing Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.FlowMod.Cookie})
continue
}
@@ -313,7 +324,7 @@
if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flow); err != nil {
logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Reason": err.Error()})
}
- att.device.triggerFlowResultNotification(cntx, flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err, true)
+ att.device.triggerFlowNotification(cntx, flow.FlowMod.Cookie, of.CommandAdd, bwConsumedInfo, err)
}
}
@@ -329,13 +340,13 @@
// Let's cycle through the flows to delete the excess flows
for _, flow := range flows {
- if _, present := att.device.GetFlow(flow.Cookie); present {
- logger.Warnw(ctx, "Flow Present in DB. Ignoring Delete Excess Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
- continue
- }
-
- if flag := GetController().IsFlowDelThresholdReached(cntx, strconv.FormatUint(flow.Cookie, 10), att.device.ID); flag {
- logger.Warnw(ctx, "Flow delete threshold reached, skipping flow delete", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
+ if dbFlow, present := att.device.GetFlow(flow.Cookie); present {
+ if dbFlow.State != of.FlowDelFailure && dbFlow.State != of.FlowDelPending {
+ logger.Warnw(ctx, "Flow Present in DB. Ignoring Delete Excess Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
+ continue
+ }
+ } else {
+ logger.Debugw(ctx, "Flow removed from DB after delete threshold reached. Ignoring Delete Excess Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
continue
}
@@ -365,7 +376,7 @@
if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
logger.Errorw(ctx, "Flow Audit Delete Failed", log.Fields{"Reason": err.Error()})
}
- att.device.triggerFlowResultNotification(cntx, flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err, true)
+ att.device.triggerFlowNotification(cntx, flow.Cookie, of.CommandDel, of.BwAvailDetails{}, err)
}
}
@@ -562,11 +573,16 @@
logger.Debugw(ctx, "Process Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
if ofpPort, ok := missingPorts[id]; ok {
+ if vgcPort.Name != ofpPort.Name {
+ logger.Infow(ctx, "Port Name Mismatch", log.Fields{"vgcPort": vgcPort.Name, "ofpPort": ofpPort.Name, "ID": id})
+ att.DeleteMismatchPorts(ctx, vgcPort, ofpPort.Name)
+ return
+ }
if ((vgcPort.State == PortStateDown) && (ofpPort.State == uint32(ofp.OfpPortState_OFPPS_LIVE))) || ((vgcPort.State == PortStateUp) && (ofpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE))) {
// This port exists in the received list and the map at
// VGC. This is common so delete it
logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
- att.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State)
+ att.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name)
}
delete(missingPorts, id)
} else {
@@ -613,7 +629,7 @@
logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
}
if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
- att.device.ProcessPortState(cntx, mp.PortNo, mp.State)
+ att.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name)
}
logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
}
@@ -642,3 +658,12 @@
}
}
}
+
+func (att *AuditTablesTask) DeleteMismatchPorts(cntx context.Context, vgcPort *DevicePort, ofpPortName string) {
+ logger.Infow(ctx, "Deleting port in VGC due to mismatch with voltha", log.Fields{"vgcPortID": vgcPort.ID, "vgcPortName": vgcPort.Name})
+ _ = att.device.DelPort(cntx, vgcPort.ID, vgcPort.Name)
+ if p := att.device.GetPortByName(ofpPortName); p != nil {
+ logger.Infow(ctx, "Delete port by name in VGC due to mismatch with voltha", log.Fields{"portID": p.ID, "portName": p.Name})
+ _ = att.device.DelPort(cntx, p.ID, p.Name)
+ }
+}
diff --git a/internal/pkg/controller/changeevent.go b/internal/pkg/controller/changeevent.go
index fad74a8..c146f23 100644
--- a/internal/pkg/controller/changeevent.go
+++ b/internal/pkg/controller/changeevent.go
@@ -68,7 +68,6 @@
func (cet *ChangeEventTask) Start(ctx context.Context, taskID uint8) error {
cet.taskID = taskID
cet.ctx = ctx
-
if status, ok := cet.event.Event.(*ofp.ChangeEvent_PortStatus); ok {
portNo := status.PortStatus.Desc.PortNo
portName := status.PortStatus.Desc.Name
@@ -77,12 +76,10 @@
if status.PortStatus.Reason == ofp.OfpPortReason_OFPPR_ADD {
_ = cet.device.AddPort(ctx, status.PortStatus.Desc)
if state == uint32(ofp.OfpPortState_OFPPS_LIVE) {
- cet.device.ProcessPortState(ctx, portNo, state)
+ cet.device.ProcessPortState(ctx, portNo, state, portName)
}
} else if status.PortStatus.Reason == ofp.OfpPortReason_OFPPR_DELETE {
- if err := cet.device.DelPort(ctx, portNo, portName); err != nil {
- logger.Warnw(ctx, "DelPort Failed", log.Fields{"Port No": portNo, "Error": err})
- }
+ cet.device.CheckAndDeletePort(ctx, portNo, portName)
} else if status.PortStatus.Reason == ofp.OfpPortReason_OFPPR_MODIFY {
cet.device.ProcessPortUpdate(ctx, portName, portNo, state)
}
diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go
index 76e4558..7d974f8 100644
--- a/internal/pkg/controller/controller.go
+++ b/internal/pkg/controller/controller.go
@@ -266,9 +266,8 @@
v.app.ProcessFlowModResultIndication(cntx, flowStatus)
}
-// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
-func (v *VoltController) IsFlowDelThresholdReached(cntx context.Context, cookie string, device string) bool {
- return v.app.IsFlowDelThresholdReached(cntx, cookie, device)
+func (v *VoltController) CheckAndDeactivateService(ctx context.Context, flow *of.VoltSubFlow, devSerialNum string, devID string) {
+ v.app.CheckAndDeactivateService(ctx, flow, devSerialNum, devID)
}
// AddVPAgent to add the vpagent
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
index bdac105..abf3108 100644
--- a/internal/pkg/controller/device.go
+++ b/internal/pkg/controller/device.go
@@ -199,8 +199,8 @@
func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
d.flowLock.RLock()
defer d.flowLock.RUnlock()
- logger.Debugw(ctx, "Get Flow", log.Fields{"Cookie": cookie})
flow, ok := d.flows[cookie]
+ logger.Debugw(ctx, "Get Flow", log.Fields{"Cookie": cookie})
return flow, ok
}
@@ -235,8 +235,11 @@
d.flowLock.Lock()
defer d.flowLock.Unlock()
logger.Debugw(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
- if _, ok := d.flows[flow.Cookie]; ok {
- return errors.New(ErrDuplicateFlow)
+ if dbFlow, ok := d.flows[flow.Cookie]; ok {
+ // In case of ONU reboot after flow delete failure, try to install flow in the device by checking for previous flow state
+ if dbFlow.State != of.FlowDelFailure {
+ return errors.New(ErrDuplicateFlow)
+ }
}
d.flows[flow.Cookie] = flow
d.AddFlowToDb(cntx, flow)
@@ -277,7 +280,7 @@
return false
} else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
if _, ok := d.flows[flow.OldCookie]; ok {
- logger.Debugw(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
+ logger.Warnw(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
return true
}
}
@@ -487,13 +490,14 @@
// Inform the application if the port is successfully added
func (d *Device) AddPort(cntx context.Context, mp *ofp.OfpPort) error {
d.portLock.Lock()
- defer d.portLock.Unlock()
id := mp.PortNo
name := mp.Name
if _, ok := d.PortsByID[id]; ok {
+ d.portLock.Unlock()
return errors.New(Duplicate_Port)
}
if _, ok := d.PortsByName[name]; ok {
+ d.portLock.Unlock()
return errors.New(Duplicate_Port)
}
@@ -501,6 +505,7 @@
d.PortsByID[id] = p
d.PortsByName[name] = p
d.WritePortToDb(cntx, p)
+ d.portLock.Unlock()
GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
return nil
@@ -533,6 +538,19 @@
return nil
}
+// CheckAndDeletePort deletes the port if the port name matches with VGC and one sent from voltha in OFPPR_DELETE
+func (d *Device) CheckAndDeletePort(cntx context.Context, portNo uint32, portName string) {
+ if p := d.GetPortByID(portNo); p != nil {
+ if p.Name != portName {
+ logger.Warnw(ctx, "Dropping Del Port event: Port name mismatch", log.Fields{"vgcPortName": p.Name, "ofpPortName": portName, "ID": p.ID})
+ return
+ }
+ if err := d.DelPort(cntx, portNo, portName); err != nil {
+ logger.Warnw(ctx, "DelPort Failed", log.Fields{"Port No": portNo, "Error": err})
+ }
+ }
+}
+
// UpdatePortByName is utility to update the port by Name
func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
d.portLock.Lock()
@@ -796,15 +814,11 @@
func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
if p := d.GetPortByName(portName); p != nil {
if p.ID != port {
- logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
- if p.State != PortStateDown {
- logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
- return
- }
- d.UpdatePortByName(cntx, portName, port)
- logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
+ logger.Warnw(ctx, "Port update indication received with mismatching ID", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
+ return
+ //Do not process port update received from change event, as we will only handle port updates during polling
}
- d.ProcessPortState(cntx, port, state)
+ d.ProcessPortState(cntx, port, state, portName)
}
}
@@ -824,7 +838,7 @@
// ProcessPortState deals with the change in port status and taking action
// based on the new state and the old state
-func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32) {
+func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32, portName string) {
if d.State != DeviceStateUP && !util.IsNniPort(port) {
logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
return
@@ -832,6 +846,10 @@
if p := d.GetPortByID(port); p != nil {
logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
+ if p.Name != portName {
+ logger.Warnw(ctx, "Dropping Port State processing: Port name does not match", log.Fields{"vgcPort": p.Name, "ofpPort": portName, "ID": port})
+ return
+ }
// Avoid blind initialization as the current tasks in the queue will be lost
// Eg: Service Del followed by Port Down - The flows will be dangling
// Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
@@ -1059,19 +1077,43 @@
return false
}
-func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
- flow, _ := d.GetFlow(cookie)
- d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err, sendFlowNotif)
+// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
+func (d *Device) IsFlowDelThresholdReached(flowCount uint32, cookie uint64) bool {
+ logger.Debugw(ctx, "Check flow delete threshold", log.Fields{"Cookie": cookie, "FlowCount": flowCount})
+ return flowCount >= GetController().GetMaxFlowRetryAttempt()
}
-func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
+// IsFlowAddThresholdReached - check if the attempts for flow add has reached threshold or not
+func (d *Device) IsFlowAddThresholdReached(flowCount uint32, cookie uint64) bool {
+ logger.Debugw(ctx, "Check flow add threshold", log.Fields{"Cookie": cookie, "FlowCount": flowCount})
+ return flowCount >= GetController().GetMaxFlowRetryAttempt()
+}
+
+func (d *Device) UpdateFlowCount(cntx context.Context, cookie uint64) {
+ if dbFlow, ok := d.flows[cookie]; ok {
+ dbFlow.FlowCount++
+ d.AddFlowToDb(cntx, dbFlow)
+ }
+}
+
+func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+ flow, _ := d.GetFlow(cookie)
+ d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
+}
+
+func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
statusCode, statusMsg := infraerror.GetErrorInfo(err)
success := isFlowOperSuccess(statusCode, oper)
- updateFlow := func(cookie uint64, state int, reason string) {
- if dbFlow, ok := d.GetFlow(cookie); ok {
+ updateFlowStatus := func(cookie uint64, state int, reason string) {
+ d.flowLock.Lock()
+ defer d.flowLock.Unlock()
+ if dbFlow, ok := d.flows[cookie]; ok {
dbFlow.State = uint8(state)
dbFlow.ErrorReason = reason
+ if state == of.FlowAddSuccess {
+ dbFlow.FlowCount = 0
+ }
d.AddFlowToDb(cntx, dbFlow)
}
}
@@ -1086,15 +1128,24 @@
state = of.FlowAddFailure
reason = statusMsg
}
- updateFlow(cookie, state, reason)
- logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
+ updateFlowStatus(cookie, state, reason)
+ logger.Debugw(ctx, "Add flow updated to DB", log.Fields{"Cookie": cookie, "State": state})
} else {
if success && flow != nil {
+ logger.Debugw(ctx, "Deleted flow from device and DB", log.Fields{"Cookie": cookie})
if err := d.DelFlow(cntx, flow); err != nil {
logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
}
} else if !success {
- updateFlow(cookie, of.FlowDelFailure, statusMsg)
+ if d.IsFlowDelThresholdReached(flow.FlowCount, flow.Cookie) {
+ logger.Debugw(ctx, "Deleted flow from device and DB after delete threshold reached", log.Fields{"Cookie": cookie})
+ if err := d.DelFlow(cntx, flow); err != nil {
+ logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
+ }
+ } else {
+ updateFlowStatus(cookie, of.FlowDelFailure, statusMsg)
+ logger.Debugw(ctx, "Delete flow updated to DB", log.Fields{"Cookie": cookie})
+ }
}
}
@@ -1108,8 +1159,6 @@
AdditionalData: bwDetails,
}
- if sendFlowNotif {
- logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
- GetController().ProcessFlowModResultIndication(cntx, flowResult)
- }
+ logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
+ GetController().ProcessFlowModResultIndication(cntx, flowResult)
}
diff --git a/internal/pkg/controller/device_test.go b/internal/pkg/controller/device_test.go
index 7089c64..8f35aef 100644
--- a/internal/pkg/controller/device_test.go
+++ b/internal/pkg/controller/device_test.go
@@ -170,7 +170,7 @@
db = dbintf
dbintf.EXPECT().PutFlow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
appMock.EXPECT().ProcessFlowModResultIndication(gomock.Any(), gomock.Any()).Times(1)
- d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err, false)
+ d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err)
})
}
}
diff --git a/internal/pkg/controller/modmeter.go b/internal/pkg/controller/modmeter.go
index 6b677a9..0d7b0e8 100644
--- a/internal/pkg/controller/modmeter.go
+++ b/internal/pkg/controller/modmeter.go
@@ -93,7 +93,7 @@
// Meter already exists so we dont have to do anything here
return nil
}
- logger.Infow(ctx, "Updated meter state to pending", log.Fields{"Meter": mmt.meter.ID})
+ logger.Debugw(ctx, "Updated meter state to pending", log.Fields{"Meter": mmt.meter.ID})
} else {
if !mmt.device.DelMeter(ctx, mmt.meter) {
// Meter doesn't exist so we dont have to do anything here
@@ -120,7 +120,7 @@
// Meter does not exist, update failed
logger.Error(ctx, "Update meter to DB failed")
}
- logger.Infow(ctx, "Updated meter state to success", log.Fields{"Meter": mmt.meter.ID})
+ logger.Debugw(ctx, "Updated meter state to success", log.Fields{"Meter": mmt.meter.ID})
}
//triggerMeterNotification(err)
return err
diff --git a/internal/pkg/intf/appif.go b/internal/pkg/intf/appif.go
index 46fb9c9..91a9f8a 100644
--- a/internal/pkg/intf/appif.go
+++ b/internal/pkg/intf/appif.go
@@ -15,7 +15,10 @@
package intf
-import "context"
+import (
+ "context"
+ "voltha-go-controller/internal/pkg/of"
+)
// App Interface
type App interface {
@@ -31,7 +34,7 @@
DelDevice(context.Context, string)
SetRebootFlag(bool)
ProcessFlowModResultIndication(context.Context, FlowStatus)
- IsFlowDelThresholdReached(context.Context, string, string) bool
+ CheckAndDeactivateService(context.Context, *of.VoltSubFlow, string, string)
DeviceRebootInd(context.Context, string, string, string)
DeviceDisableInd(context.Context, string)
UpdateMvlanProfilesForDevice(context.Context, string)
diff --git a/internal/pkg/of/flows.go b/internal/pkg/of/flows.go
index aa43cfc..c0666d9 100644
--- a/internal/pkg/of/flows.go
+++ b/internal/pkg/of/flows.go
@@ -428,6 +428,7 @@
TableID uint32
Priority uint32
State uint8
+ FlowCount uint32
}
// NewVoltSubFlow is constructor for VoltSubFlow
@@ -741,7 +742,7 @@
and := (vlan & 0xfff)
or := and + 0x1000
v := uint32(vlan&0x0fff + 0x1000)
- logger.Infow(ctx, "Vlan Construction", log.Fields{"Vlan": vlan, "vlan&0x0fff": and, "OR": or, "final": v})
+ logger.Debugw(ctx, "Vlan Construction", log.Fields{"Vlan": vlan, "vlan&0x0fff": and, "OR": or, "final": v})
setField.Value = &ofp.OfpOxmOfbField_VlanVid{
VlanVid: uint32(vlan&0x0fff + 0x1000),
}
diff --git a/internal/pkg/util/envutils/envutils.go b/internal/pkg/util/envutils/envutils.go
index 10a3219..8ef7149 100644
--- a/internal/pkg/util/envutils/envutils.go
+++ b/internal/pkg/util/envutils/envutils.go
@@ -87,14 +87,16 @@
MaxFlowRetryDuration = "MAX_FLOW_RETRY_DURATION"
// openonu environment variables
- OmciPacketCapture = "SAVE_OMCI_PACKET_CAPTURE"
+ OmciPacketCapture = "SAVE_OMCI_PACKET_CAPTURE"
+ Undefined = " undefined"
+ EnvironmentVariable = "Environment variable "
)
// ParseStringEnvVariable reads the environment variable and returns env as string
func ParseStringEnvVariable(envVarName string, defaultVal string) string {
envValue := os.Getenv(envVarName)
if envValue == "" {
- fmt.Println("Environment variable " + envVarName + " undefined")
+ fmt.Println(EnvironmentVariable + envVarName + Undefined)
return defaultVal
}
return envValue
@@ -104,7 +106,7 @@
func ParseIntEnvVariable(envVarName string, defaultVal int64) int64 {
envValue := os.Getenv(envVarName)
if envValue == "" {
- fmt.Println("Environment variable "+envVarName+" undefined", envVarName)
+ fmt.Println(EnvironmentVariable+envVarName+Undefined, envVarName)
return defaultVal
}
returnVal, err := strconv.Atoi(envValue)
@@ -119,7 +121,7 @@
func ParseBoolEnvVariable(envVarName string, defaultVal bool) bool {
envValue := os.Getenv(envVarName)
if envValue == "" {
- fmt.Println("Environment variable " + envVarName + " undefined")
+ fmt.Println(EnvironmentVariable + envVarName + Undefined)
return defaultVal
}
if envValue == "true" || envValue == "True" {
diff --git a/internal/pkg/util/utils.go b/internal/pkg/util/utils.go
index 6157413..36aab0b 100644
--- a/internal/pkg/util/utils.go
+++ b/internal/pkg/util/utils.go
@@ -18,6 +18,7 @@
import (
"encoding/binary"
"net"
+ "strconv"
"strings"
"voltha-go-controller/internal/pkg/of"
@@ -132,6 +133,34 @@
return ipList
}
+// GetUniFromMetadata returns uni port from write metadata of DS flows.
+func GetUniFromMetadata(metadata uint64) uint32 {
+ return uint32(metadata & 0xFFFFFFFF)
+}
+
+// GetUniFromDSDhcpFlow returns uni port from the flow cookie
+func GetUniFromDSDhcpFlow(cookie uint64) uint32 {
+ uniport := uint32(cookie >> 16)
+ uniport = uniport & 0xFFFFFFFF
+ return uniport
+}
+
+// GetUniPortFromFlow returns uni port from the flow data
+func GetUniPortFromFlow(nniPort string, flow *of.VoltSubFlow) uint32 {
+ var portNo uint32
+ if nniPort == strconv.Itoa(int(flow.Match.InPort)) {
+ if of.IPProtocolUDP == flow.Match.L4Protocol {
+ // For DHCP DS flow, uniport is not part of metadata. Hence retrieve it from cookie
+ portNo = GetUniFromDSDhcpFlow(flow.Cookie)
+ } else {
+ portNo = GetUniFromMetadata(flow.Action.Metadata)
+ }
+ } else {
+ portNo = flow.Match.InPort
+ }
+ return portNo
+}
+
// MacAddrsMatch for comparison of MAC addresses and return true if MAC addresses matches
func MacAddrsMatch(addr1 net.HardwareAddr, addr2 net.HardwareAddr) bool {
if len(addr1) != len(addr2) {
diff --git a/internal/pkg/vpagent/connection.go b/internal/pkg/vpagent/connection.go
index 4c274d4..971b8b4 100644
--- a/internal/pkg/vpagent/connection.go
+++ b/internal/pkg/vpagent/connection.go
@@ -23,7 +23,6 @@
"voltha-go-controller/log"
"github.com/golang/protobuf/ptypes/empty"
- "github.com/opencord/voltha-lib-go/v7/pkg/probe"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc"
)
@@ -31,11 +30,7 @@
// GrpcMaxSize Max size of grpc message
const GrpcMaxSize int = 17455678
-func (vpa *VPAgent) establishConnectionToVoltha(ctx context.Context, p *probe.Probe) error {
- if p != nil {
- p.UpdateStatus(ctx, "voltha", probe.ServiceStatusPreparing)
- }
-
+func (vpa *VPAgent) establishConnectionToVoltha(ctx context.Context) error {
if vpa.volthaConnection != nil {
vpa.volthaConnection.Close()
}
@@ -55,9 +50,6 @@
})
vpa.volthaConnection = conn
vpa.volthaClient.Set(svc)
- if p != nil {
- p.UpdateStatus(ctx, "voltha", probe.ServiceStatusRunning)
- }
vpa.events <- vpaEventVolthaConnected
return nil
}
@@ -75,9 +67,6 @@
time.Sleep(vpa.ConnectionRetryDelay)
}
}
- if p != nil {
- p.UpdateStatus(ctx, "voltha", probe.ServiceStatusFailed)
- }
return errors.New("failed-to-connect-to-voltha")
}
diff --git a/internal/pkg/vpagent/volthaprotoagent.go b/internal/pkg/vpagent/volthaprotoagent.go
index 3d962ef..893f69d 100644
--- a/internal/pkg/vpagent/volthaprotoagent.go
+++ b/internal/pkg/vpagent/volthaprotoagent.go
@@ -26,7 +26,6 @@
"voltha-go-controller/log"
- "github.com/opencord/voltha-lib-go/v7/pkg/probe"
ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc"
@@ -137,12 +136,6 @@
log.Fields{
"voltha-endpoint": vpa.VolthaAPIEndPoint})
- // If the context contains a k8s probe then register services
- p := probe.GetProbeFromContext(ctx)
- if p != nil {
- p.RegisterService(ctx, "voltha")
- }
-
vpa.events <- vpaEventStart
/*
@@ -185,7 +178,7 @@
// connection to voltha
state = vpaStateConnecting
go func() {
- if err := vpa.establishConnectionToVoltha(hdlCtx, p); err != nil {
+ if err := vpa.establishConnectionToVoltha(hdlCtx); err != nil {
logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
}
}()
@@ -204,9 +197,6 @@
}
case vpaEventVolthaDisconnected:
- if p != nil {
- p.UpdateStatus(ctx, "voltha", probe.ServiceStatusNotReady)
- }
logger.Debug(ctx, "vpagent-voltha-disconnect-event")
if state == vpaStateConnected {
state = vpaStateDisconnected
@@ -218,7 +208,7 @@
state = vpaStateConnecting
go func() {
hdlCtx, hdlDone = context.WithCancel(context.Background())
- if err := vpa.establishConnectionToVoltha(hdlCtx, p); err != nil {
+ if err := vpa.establishConnectionToVoltha(hdlCtx); err != nil {
logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
}
}()
diff --git a/internal/test/mocks/mock_appif.go b/internal/test/mocks/mock_appif.go
index 8b986ec..4c8b7f7 100644
--- a/internal/test/mocks/mock_appif.go
+++ b/internal/test/mocks/mock_appif.go
@@ -23,6 +23,7 @@
context "context"
reflect "reflect"
intf "voltha-go-controller/internal/pkg/intf"
+ "voltha-go-controller/internal/pkg/of"
gomock "github.com/golang/mock/gomock"
)
@@ -213,12 +214,22 @@
return false
}
+ // CheckAndDeactivateService mocks base method.
+ func (m *MockApp) CheckAndDeactivateService(arg0 context.Context, arg1 *of.VoltSubFlow, arg2 string, arg3 string) {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "CheckAndDeactivateService", arg0, arg1, arg2, arg3)
+ }
+
// IsFlowDelThresholdReached indicates an expected call of IsFlowDelThresholdReached.
func (mr *MockAppMockRecorder) IsFlowDelThresholdReached(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsFlowDelThresholdReached", reflect.TypeOf((*MockApp)(nil).IsFlowDelThresholdReached), arg0, arg1, arg2)
}
-
+ // CheckAndDeactivateService indicates an expected call of CheckAndDeactivateService.
+ func (mr *MockAppMockRecorder) CheckAndDeactivateService(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckAndDeactivateService", reflect.TypeOf((*MockApp)(nil).CheckAndDeactivateService), arg0, arg1, arg2)
+ }
// SetRebootFlag mocks base method.
func (m *MockApp) SetRebootFlag(arg0 bool) {
m.ctrl.T.Helper()
diff --git a/voltha-go-controller/config.go b/voltha-go-controller/config.go
index d147aed..c95f42d 100644
--- a/voltha-go-controller/config.go
+++ b/voltha-go-controller/config.go
@@ -18,6 +18,7 @@
import (
"strconv"
"strings"
+ "time"
"voltha-go-controller/internal/pkg/util/envutils"
)
@@ -29,6 +30,8 @@
defaultVolthaPort = 50057
defaultProbeHost = ""
defaultProbePort = 8090
+ defaultLiveProbeInterval = 60
+ defaultNotLiveProbeInterval = 5 // Probe more frequently when not alive
defaultBanner = true
defaultDisplayVersion = false
defaultCPUProfile = ""
@@ -68,6 +71,8 @@
KafkaAdapterPort: defaultKafkaAdapterPort,
ProbeHost: defaultProbeHost,
ProbePort: defaultProbePort,
+ LiveProbeInterval: defaultLiveProbeInterval,
+ NotLiveProbeInterval: defaultNotLiveProbeInterval,
Banner: defaultBanner,
DisplayVersion: defaultDisplayVersion,
CPUProfile: defaultCPUProfile,
@@ -105,6 +110,8 @@
KVStorePort int
VolthaPort int
ProbePort int
+ LiveProbeInterval time.Duration
+ NotLiveProbeInterval time.Duration
DeviceListRefreshInterval int // in seconds
ConnectionRetryDelay int // in seconds
ConnectionMaxRetries int
@@ -127,6 +134,8 @@
cf.KafkaAdapterPort = int(envutils.ParseIntEnvVariable(envutils.KafkaAdapterPort, defaultKafkaAdapterPort))
cf.ProbeHost = envutils.ParseStringEnvVariable(envutils.ProbeHost, defaultProbeHost)
cf.ProbePort = int(envutils.ParseIntEnvVariable(envutils.ProbePort, defaultProbePort))
+ cf.LiveProbeInterval = time.Duration(envutils.ParseIntEnvVariable(envutils.LiveProbeInterval, defaultLiveProbeInterval)) * time.Second
+ cf.NotLiveProbeInterval = time.Duration(envutils.ParseIntEnvVariable(envutils.NotLiveProbeInterval, defaultNotLiveProbeInterval)) * time.Second
cf.Banner = envutils.ParseBoolEnvVariable(envutils.Banner, defaultBanner)
cf.DisplayVersion = envutils.ParseBoolEnvVariable(envutils.DisplayVersionOnly, defaultDisplayVersion)
cf.CPUProfile = envutils.ParseStringEnvVariable(envutils.CPUProfile, defaultCPUProfile)
diff --git a/voltha-go-controller/main.go b/voltha-go-controller/main.go
index 5f06351..9c904a2 100644
--- a/voltha-go-controller/main.go
+++ b/voltha-go-controller/main.go
@@ -39,6 +39,10 @@
"github.com/opencord/voltha-lib-go/v7/pkg/probe"
)
+const (
+ KVService = "kv-service"
+)
+
// VgcInfo structure
type VgcInfo struct {
kvClient kvstore.Client
@@ -110,6 +114,99 @@
return nil
}
+// This function checks the liveliness and readiness of the kv-store service and update the status in the probe.
+func MonitorKVStoreReadiness(ctx context.Context, config *VGCFlags) {
+ logger.Infow(ctx, "Start Monitoring KVStore Readiness...", log.Fields{"KVStoreType": config.KVStoreType, "Address": config.KVStoreEndPoint,
+ "LiveProbeInterval": config.LiveProbeInterval, "NotLiveProbeInterval": config.NotLiveProbeInterval})
+ // dividing the live probe interval by 2 to get updated status every 30s
+ timeout := config.LiveProbeInterval / 2
+ kvStoreChannel := make(chan bool, 1)
+
+ timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
+ kvStoreChannel <- vgcInfo.kvClient.IsConnectionUp(timeoutCtx)
+ cancelFunc()
+
+ for {
+ timeoutTimer := time.NewTimer(timeout)
+ select {
+ case liveliness := <-kvStoreChannel:
+ if !liveliness {
+ // kv-store not reachable or down, updating the status to not ready state
+ probe.UpdateStatusFromContext(ctx, KVService, probe.ServiceStatusNotReady)
+ timeout = config.NotLiveProbeInterval
+ } else {
+ // kv-store is reachable , updating the status to running state
+ probe.UpdateStatusFromContext(ctx, KVService, probe.ServiceStatusRunning)
+ timeout = config.LiveProbeInterval / 2
+ }
+
+ // Check if the timer has expired or not
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+
+ case <-timeoutTimer.C:
+ // Check the status of the kv-store. Use timeout of 2 seconds to avoid forever blocking
+ timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
+
+ kvStoreChannel <- vgcInfo.kvClient.IsConnectionUp(timeoutCtx)
+ // Cleanup cancel func resources
+ cancelFunc()
+ }
+ }
+}
+
+func initializeKVStore(ctx context.Context, config *VGCFlags, logLevel log.LevelLog) {
+ var err error
+ var dblogLevel string
+ var p *probe.Probe
+
+ // k8s probe register services for KVStore
+ if value := ctx.Value(probe.ProbeContextKey); value != nil {
+ if _, ok := value.(*probe.Probe); ok {
+ p = value.(*probe.Probe)
+ p.RegisterService(
+ ctx,
+ KVService,
+ )
+ }
+ }
+
+ if vgcInfo.kvClient, err = newKVClient(ctx, config.KVStoreType, config.KVStoreEndPoint, config.KVStoreTimeout); err != nil {
+ logger.Errorw(ctx, "KVClient Establishment Failure", log.Fields{"error": err})
+ }
+
+ p.UpdateStatus(ctx, KVService, probe.ServiceStatusPreparing)
+ if dbHandler, err = db.Initialize(ctx, config.KVStoreType, config.KVStoreEndPoint, config.KVStoreTimeout); err != nil {
+ logger.Errorw(ctx, "unable-to-connect-to-db", log.Fields{"error": err})
+ }
+ p.UpdateStatus(ctx, KVService, probe.ServiceStatusPrepared)
+
+ db.SetDatabase(dbHandler)
+ logger.Infow(ctx, "verifying-KV-store-connectivity", log.Fields{"host": config.KVStoreHost,
+ "port": config.KVStorePort, "retries": config.ConnectionMaxRetries,
+ "retryInterval": config.ConnectionRetryDelay})
+
+ err = waitUntilKVStoreReachableOrMaxTries(ctx, config)
+ if err != nil {
+ p.UpdateStatus(ctx, KVService, probe.ServiceStatusNotReady)
+ logger.Fatalw(ctx, "Unable-to-connect-to-KV-store", log.Fields{"KVStoreType": config.KVStoreType, "Address": config.KVStoreEndPoint})
+ }
+
+ p.UpdateStatus(ctx, KVService, probe.ServiceStatusRunning)
+
+ logger.Info(ctx, "KV-store-reachable")
+ //Read if log-level is stored in DB
+ if dblogLevel, err = dbHandler.Get(ctx, db.GetKeyPath(db.LogLevelPath)); err == nil {
+ logger.Infow(ctx, "Read log-level from db", log.Fields{"logLevel": logLevel})
+ storedLogLevel, _ := log.StringToLogLevel(dblogLevel)
+ log.SetAllLogLevel(int(storedLogLevel))
+ log.SetDefaultLogLevel(int(storedLogLevel))
+ }
+
+ go MonitorKVStoreReadiness(ctx, config)
+}
+
func main() {
// Environment variables processing
config := newVGCFlags()
@@ -118,8 +215,13 @@
if config.Banner {
printBanner()
}
- // Create a context adding the status update channel
+ /*
+ * Create and start the liveness and readiness container management probes. This
+ * is done in the main function so just in case the main starts multiple other
+ * objects there can be a single probe end point for the process.
+ */
p := &probe.Probe{}
+ go p.ListenAndServe(ctx, config.ProbeEndPoint)
ctx = context.WithValue(context.Background(), probe.ProbeContextKey, p)
pc.Init()
@@ -129,7 +231,6 @@
// Setup default logger - applies for packages that do not have specific logger set
var logLevel log.LevelLog
var err error
- var dblogLevel string
if logLevel, err = log.StringToLogLevel(config.LogLevel); err != nil {
logLevel = log.DebugLevel
}
@@ -143,40 +244,8 @@
}
log.SetAllLogLevel(int(logLevel))
- if vgcInfo.kvClient, err = newKVClient(ctx, config.KVStoreType, config.KVStoreEndPoint, config.KVStoreTimeout); err != nil {
- logger.Errorw(ctx, "KVClient Establishment Failure", log.Fields{"Reason": err})
- }
-
- if dbHandler, err = db.Initialize(ctx, config.KVStoreType, config.KVStoreEndPoint, config.KVStoreTimeout); err != nil {
- logger.Errorw(ctx, "unable-to-connect-to-db", log.Fields{"error": err})
- return
- }
-
- db.SetDatabase(dbHandler)
- logger.Infow(ctx, "verifying-KV-store-connectivity", log.Fields{"host": config.KVStoreHost,
- "port": config.KVStorePort, "retries": config.ConnectionMaxRetries,
- "retryInterval": config.ConnectionRetryDelay})
-
- err = waitUntilKVStoreReachableOrMaxTries(ctx, config)
- if err != nil {
- logger.Fatalw(ctx, "Unable-to-connect-to-KV-store", log.Fields{"KVStoreType": config.KVStoreType, "Address": config.KVStoreEndPoint})
- }
-
- logger.Info(ctx, "KV-store-reachable")
- //Read if log-level is stored in DB
- if dblogLevel, err = dbHandler.Get(ctx, db.GetKeyPath(db.LogLevelPath)); err == nil {
- logger.Infow(ctx, "Read log-level from db", log.Fields{"logLevel": logLevel})
- storedLogLevel, _ := log.StringToLogLevel(dblogLevel)
- log.SetAllLogLevel(int(storedLogLevel))
- log.SetDefaultLogLevel(int(storedLogLevel))
- }
-
- // Check if Data Migration is required
- // Migration has to be done before Initialzing the Kafka
- if app.CheckIfMigrationRequired(ctx) {
- logger.Debug(ctx, "Migration Initiated")
- app.InitiateDataMigration(ctx)
- }
+ // Done: TODO: Wrap it up properly and monitor the KV store to check for faults
+ initializeKVStore(ctx, config, logLevel)
defer func() {
err = log.CleanUp()
@@ -185,15 +254,12 @@
}
}()
- // TODO: Wrap it up properly and monitor the KV store to check for faults
-
- /*
- * Create and start the liveness and readiness container management probes. This
- * is done in the main function so just in case the main starts multiple other
- * objects there can be a single probe end point for the process.
- */
- go p.ListenAndServe(ctx, config.ProbeEndPoint)
-
+ // Check if Data Migration is required
+ // Migration has to be done before Initialzing the Kafka
+ if app.CheckIfMigrationRequired(ctx) {
+ logger.Debug(ctx, "Migration Initiated")
+ app.InitiateDataMigration(ctx)
+ }
app.GetApplication().ReadAllFromDb(ctx)
app.GetApplication().InitStaticConfig()
app.GetApplication().SetVendorID(config.VendorID)
@@ -218,6 +284,7 @@
logger.Error(ctx, "Trigger Rest Server...")
go nbi.RestStart()
go vpa.Run(ctx)
+ // check the readiness and liveliness and update the probe status
//FIXME: Need to enhance CLI to use in docker environment
//go ProcessCli()
//go handler.MsgHandler()
diff --git a/voltha-go-controller/nbi/bwprofile.go b/voltha-go-controller/nbi/bwprofile.go
index a630616..3efab04 100644
--- a/voltha-go-controller/nbi/bwprofile.go
+++ b/voltha-go-controller/nbi/bwprofile.go
@@ -38,7 +38,7 @@
cDelete = "DELETE"
)
-//BWProfile - Sadis BW Profile
+// BWProfile - Sadis BW Profile
type BWProfile struct {
ID string `json:"id"`
PeakInformationRate uint32 `json:"pir"`
diff --git a/voltha-go-controller/nbi/get_device_id_list.go b/voltha-go-controller/nbi/get_device_id_list.go
index ab41ebb..238da8e 100644
--- a/voltha-go-controller/nbi/get_device_id_list.go
+++ b/voltha-go-controller/nbi/get_device_id_list.go
@@ -44,7 +44,7 @@
var deviceID string
var deviceIDListResp []string
- logger.Info(ctx, "Received get DeviceIDList")
+ logger.Info(ctx, "Received get all device ids req")
getDeviceIDList := func(key, value interface{}) bool {
voltDevice := value.(*app.VoltDevice)
deviceID = voltDevice.Name
diff --git a/voltha-go-controller/nbi/igmpproxy.go b/voltha-go-controller/nbi/igmpproxy.go
index 1961d8b..2cecbd8 100644
--- a/voltha-go-controller/nbi/igmpproxy.go
+++ b/voltha-go-controller/nbi/igmpproxy.go
@@ -34,6 +34,10 @@
"voltha-go-controller/log"
)
+const (
+ mvlan = "mvlan"
+)
+
// IgmpProxy - configurations
type IgmpProxy struct {
FastLeave string `json:"fastleave"`
@@ -103,14 +107,14 @@
voltApp := app.GetApplication()
voltAppIntr = voltApp
if mvp := voltAppIntr.GetMvlanProfileByTag(of.VlanType(req.OutgoingIgmpVlanID)); mvp == nil {
- logger.Errorw(ctx, "MVLAN ID not configured", log.Fields{"mvlan": req.OutgoingIgmpVlanID})
+ logger.Errorw(ctx, "MVLAN ID not configured", log.Fields{mvlan: req.OutgoingIgmpVlanID})
http.Error(w, "MVLAN profile does not exists", http.StatusConflict)
return
}
config.OltSerialNum = req.SourceDeviceAndPort
var splits = strings.Split(req.SourceDeviceAndPort, "/")
config.OltSerialNum = splits[0]
- config.MvlanProfileID = "mvlan" + strconv.Itoa(req.OutgoingIgmpVlanID)
+ config.MvlanProfileID = mvlan + strconv.Itoa(req.OutgoingIgmpVlanID)
logger.Infow(ctx, "northbound-add-igmpProxy-request", log.Fields{"config": config})
diff --git a/voltha-go-controller/nbi/mvlan.go b/voltha-go-controller/nbi/mvlan.go
index 5c77793..18af5ce 100644
--- a/voltha-go-controller/nbi/mvlan.go
+++ b/voltha-go-controller/nbi/mvlan.go
@@ -120,7 +120,7 @@
vars := mux.Vars(r)
egressvlan := vars["egressvlan"]
- name := "mvlan" + egressvlan
+ name := mvlan + egressvlan
// HTTP response with 202 accepted for service delete request
w.WriteHeader(http.StatusAccepted)
diff --git a/voltha-go-controller/nbi/rest.go b/voltha-go-controller/nbi/rest.go
index 9c1fe82..9070163 100644
--- a/voltha-go-controller/nbi/rest.go
+++ b/voltha-go-controller/nbi/rest.go
@@ -22,6 +22,7 @@
onosnbi "voltha-go-controller/voltha-go-controller/onos_nbi"
"github.com/gorilla/mux"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
"voltha-go-controller/log"
)
@@ -30,6 +31,9 @@
var ctx = context.TODO()
const (
+ VGCService = "vgc-nbi-rest"
+)
+const (
BasePath string = "/vgc/v1"
SubscribersPath string = "/subscribers/{id}"
ProfilesPath string = "/profiles/{id}"
@@ -67,6 +71,11 @@
// RestStart to execute for API
func RestStart() {
+ // If the context contains a k8s probe then register services
+ p := probe.GetProbeFromContext(ctx)
+ if p != nil {
+ p.RegisterService(ctx, VGCService)
+ }
mu := mux.NewRouter()
logger.Info(ctx, "Rest Server Starting...")
mu.HandleFunc(BasePath+SubscribersPath, (&SubscriberHandle{}).ServeHTTP)
@@ -103,6 +112,9 @@
mu.HandleFunc(BasePath+FlowProvisionStatus, (&SubscriberHandle{}).StatusServeHTTP)
err := http.ListenAndServe(":8181", mu)
+ if p != nil {
+ p.UpdateStatus(ctx, VGCService, probe.ServiceStatusRunning)
+ }
logger.Infow(ctx, "Rest Server Started", log.Fields{"Error": err})
}
diff --git a/voltha-go-controller/nbi/subscriber.go b/voltha-go-controller/nbi/subscriber.go
index df20972..4cca904 100644
--- a/voltha-go-controller/nbi/subscriber.go
+++ b/voltha-go-controller/nbi/subscriber.go
@@ -167,6 +167,12 @@
vs.IgmpEnabled = uniTagInfo.IsIgmpRequired
vs.ServiceType = uniTagInfo.ServiceName
+ // Check if the service already exists for same Uniport and TechProfID
+ if voltApp.CheckServiceExists(vs.Port, vs.TechProfileID) {
+ logger.Warnw(ctx, "Service already exists for same port and TP Id", log.Fields{"ServiceName": vs.Name, "Port": vs.Port, "TechProfileID": vs.TechProfileID, "SVlan": vs.SVlan})
+ continue
+ }
+
logger.Debugw(ctx, "", log.Fields{"ServiceName": vs.Name})
if uniTagInfo.ServiceName == app.DpuMgmtTraffic ||
diff --git a/voltha-go-controller/onos_nbi/oltapprestadapter.go b/voltha-go-controller/onos_nbi/oltapprestadapter.go
index ab2c361..f7d8f3b 100644
--- a/voltha-go-controller/onos_nbi/oltapprestadapter.go
+++ b/voltha-go-controller/onos_nbi/oltapprestadapter.go
@@ -200,7 +200,7 @@
sVlan := of.VlanNone
cVlan := of.VlanNone
techProfile := uint16(0)
- logger.Infow(ctx, "Received ActivateService request specific for portNo, sVlan, cVlan and techProfile", log.Fields{"Port": portNo, "SVlan": sVlan, "CVlan": cVlan, "techProfile": techProfile})
+ logger.Infow(ctx, "Received ActivateService request specific for portNo, sTag, cTag and tpID", log.Fields{"Port": portNo, "STag": sTag, "CTag": cTag, "TPID": tpID})
if len(sTag) > 0 {
sv, err := strconv.Atoi(sTag)
@@ -251,7 +251,7 @@
sVlan := of.VlanNone
cVlan := of.VlanNone
techProfile := uint16(0)
- logger.Infow(ctx, "Received DeactivateService request specific for portNo, sVlan, cVlan and techProfile", log.Fields{"Port": portNo, "SVlan": sVlan, "CVlan": cVlan, "techProfile": techProfile})
+ logger.Infow(ctx, "Received DeactivateService request specific for portNo, sVlan, cVlan and techProfile", log.Fields{"Port": portNo, "SVlan": sTag, "CVlan": cTag, "techProfile": tpID})
if len(sTag) > 0 {
sv, err := strconv.Atoi(sTag)
@@ -290,7 +290,7 @@
return
}
}
- logger.Debugw(ctx, "DeactivateService request specific for portNo, sVlan, cVlan and techProfile", log.Fields{"Port": portNo, "SVlan": sVlan, "CVlan": cVlan, "techProfile": techProfile})
+ logger.Debugw(ctx, "DeactivateService request specific for portNo, sVlan, cVlan and techProfile", log.Fields{"Port": portNo, "SVlan": sTag, "CVlan": cTag, "techProfile": tpID})
}
func (sa *ServiceAdapter) GetProgrammedSubscribers(cntx context.Context, w http.ResponseWriter, r *http.Request) {