Fix for service activation and audit issues
Change-Id: I1517f9be1532f384f5236e8d6328c8fda93c1776
diff --git a/internal/pkg/application/service.go b/internal/pkg/application/service.go
index 79c59c0..d710298 100644
--- a/internal/pkg/application/service.go
+++ b/internal/pkg/application/service.go
@@ -2046,25 +2046,28 @@
}
// ActivateService to activate pre-provisioned service
-func (va *VoltApplication) ActivateService(cntx context.Context, deviceID, portNo string, sVlan, cVlan of.VlanType, tpID uint16) {
+func (va *VoltApplication) ActivateService(cntx context.Context, deviceID, portNo string, sVlan, cVlan of.VlanType, tpID uint16) error {
logger.Infow(ctx, "Service Activate Request ", log.Fields{"Device": deviceID, "Port": portNo})
+ device, err := va.GetDeviceFromPort(portNo)
+ if err != nil {
+ logger.Errorw(ctx, "Error Getting Device", log.Fields{"Reason": err.Error(), "Port": portNo})
+ return errorCodes.ErrPortNotFound
+ }
+ // If device id is not provided check only port number
+ if deviceID == DeviceAny {
+ deviceID = device.Name
+ } else if deviceID != device.Name {
+ logger.Errorw(ctx, "Wrong Device ID", log.Fields{"Device": deviceID, "Port": portNo})
+ return errorCodes.ErrDeviceNotFound
+ }
va.ServiceByName.Range(func(key, value interface{}) bool {
vs := value.(*VoltService)
- // If device id is not provided check only port number
- if deviceID == DeviceAny {
- deviceID = vs.Device
- }
// If svlan if provided, then the tags and tpID of service has to be matching
if sVlan != of.VlanNone && (sVlan != vs.SVlan || cVlan != vs.CVlan || tpID != vs.TechProfileID) {
return true
}
if portNo == vs.Port && !vs.IsActivated {
- d := va.GetDevice(deviceID)
- if d == nil {
- logger.Warnw(ctx, "Device Not Found", log.Fields{"Device": deviceID})
- return true
- }
- p := d.GetPort(vs.Port)
+ p := device.GetPort(vs.Port)
if p == nil {
logger.Warnw(ctx, "Wrong device or port", log.Fields{"Device": deviceID, "Port": portNo})
return true
@@ -2077,7 +2080,7 @@
if p.State == PortStateUp {
if vpv := va.GetVnetByPort(vs.Port, vs.SVlan, vs.CVlan, vs.UniVlan); vpv != nil {
// PortUp call initiates flow addition
- vpv.PortUpInd(cntx, d, portNo)
+ vpv.PortUpInd(cntx, device, portNo)
} else {
logger.Warnw(ctx, "VPV does not exists!!!", log.Fields{"Device": deviceID, "port": portNo, "SvcName": vs.Name})
}
@@ -2085,11 +2088,24 @@
}
return true
})
+ return nil
}
// DeactivateService to activate pre-provisioned service
-func (va *VoltApplication) DeactivateService(cntx context.Context, deviceID, portNo string, sVlan, cVlan of.VlanType, tpID uint16) {
+func (va *VoltApplication) DeactivateService(cntx context.Context, deviceID, portNo string, sVlan, cVlan of.VlanType, tpID uint16) error {
logger.Infow(ctx, "Service Deactivate Request ", log.Fields{"Device": deviceID, "Port": portNo})
+ device, err := va.GetDeviceFromPort(portNo)
+ if err != nil {
+ logger.Errorw(ctx, "Error Getting Device", log.Fields{"Reason": err.Error(), "Port": portNo})
+ return errorCodes.ErrPortNotFound
+ }
+ // If device id is not provided check only port number
+ if deviceID == DeviceAny {
+ deviceID = device.Name
+ } else if deviceID != device.Name {
+ logger.Errorw(ctx, "Wrong Device ID in request", log.Fields{"Device": deviceID, "Port": portNo})
+ return errorCodes.ErrDeviceNotFound
+ }
va.ServiceByName.Range(func(key, value interface{}) bool {
vs := value.(*VoltService)
// If svlan if provided, then the tags and tpID of service has to be matching
@@ -2098,20 +2114,11 @@
logger.Infow(ctx, "condition not matched", log.Fields{"Device": deviceID, "Port": portNo, "sVlan": sVlan, "cVlan":cVlan, "tpID": tpID})
return true
}
- // If device id is not provided check only port number
- if deviceID == DeviceAny {
- deviceID = vs.Device
- }
- if deviceID == vs.Device && portNo == vs.Port && vs.IsActivated {
+ if portNo == vs.Port && vs.IsActivated {
vs.IsActivated = false
va.ServiceByName.Store(vs.Name, vs)
vs.WriteToDb(cntx)
- d := va.GetDevice(deviceID)
- if d == nil {
- logger.Warnw(ctx, "Device Not Found", log.Fields{"Device": deviceID})
- return true
- }
- p := d.GetPort(vs.Port)
+ p := device.GetPort(vs.Port)
if p != nil && p.State == PortStateUp {
if vpv := va.GetVnetByPort(vs.Port, vs.SVlan, vs.CVlan, vs.UniVlan); vpv != nil {
// Port down call internally deletes all the flows
@@ -2126,6 +2133,7 @@
}
return true
})
+ return nil
}
/* GetServicePbit to get first set bit in the pbit map
returns -1 : If configured to match on all pbits
diff --git a/internal/pkg/controller/audittables.go b/internal/pkg/controller/audittables.go
index 079ff2f..334ce41 100644
--- a/internal/pkg/controller/audittables.go
+++ b/internal/pkg/controller/audittables.go
@@ -84,6 +84,12 @@
var errInfo error
var err error
+ // Audit ports
+ if err = att.AuditPorts(); err != nil {
+ logger.Errorw(ctx, "Audit Ports Failed", log.Fields{"Reason": err.Error()})
+ errInfo = err
+ }
+
// Audit the meters
if err = att.AuditMeters(); err != nil {
logger.Errorw(ctx, "Audit Meters Failed", log.Fields{"Reason": err.Error()})
@@ -530,3 +536,121 @@
}
}
}
+
+func (att *AuditTablesTask) AuditPorts() error {
+
+ if att.stop {
+ return tasks.ErrTaskCancelError
+ }
+
+ var vc voltha.VolthaServiceClient
+ if vc = att.device.VolthaClient(); vc == nil {
+ logger.Error(ctx, "Flow Audit Failed: Voltha Client Unavailable")
+ return nil
+ }
+ ofpps, err := vc.ListLogicalDevicePorts(att.ctx, &common.ID{Id: att.device.ID})
+ if err != nil {
+ return err
+ }
+
+ // Compute the difference between the ports received and ports at VGC
+ // First build a map of all the received ports under missing ports. We
+ // will eliminate the ports that are in the device from the missing ports
+ // so that the elements remaining are missing ports. The ones that are
+ // not in missing ports are added to excess ports which should be deleted
+ // from the VGC.
+ missingPorts := make(map[uint32]*ofp.OfpPort)
+ for _, ofpp := range ofpps.Items {
+ missingPorts[ofpp.OfpPort.PortNo] = ofpp.OfpPort
+ }
+
+ var excessPorts []uint32
+ processPortState := func(id uint32, vgcPort *DevicePort) {
+ logger.Debugw(ctx, "Process Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
+
+ if ofpPort, ok := missingPorts[id]; ok {
+ if ((vgcPort.State == PortStateDown) && (ofpPort.State == uint32(ofp.OfpPortState_OFPPS_LIVE))) || ((vgcPort.State == PortStateUp) && (ofpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE))) {
+ // This port exists in the received list and the map at
+ // VGC. This is common so delete it
+ logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
+ att.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State)
+ }
+ delete(missingPorts, id)
+ } else {
+ // This port is missing from the received list. This is an
+ // excess port at VGC. This must be added to excess ports
+ excessPorts = append(excessPorts, id)
+ }
+ logger.Debugw(ctx, "Processed Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
+
+ }
+ // 1st process the NNI port before all other ports so that the device state can be updated.
+ if vgcPort, ok := att.device.PortsByID[NNIPortID]; ok {
+ logger.Info(ctx, "Processing NNI port state")
+ processPortState(NNIPortID, vgcPort)
+ }
+
+ for id, vgcPort := range att.device.PortsByID {
+ if id == NNIPortID {
+ //NNI port already processed
+ continue
+ }
+ if att.stop {
+ break
+ }
+ processPortState(id, vgcPort)
+ }
+
+ if att.stop {
+ logger.Errorw(ctx, "Audit Device Task Cancelled", log.Fields{"Context": att.ctx, "Task": att.taskID})
+ return tasks.ErrTaskCancelError
+ }
+ att.AddMissingPorts(ctx, missingPorts)
+ att.DelExcessPorts(ctx, excessPorts)
+ return nil
+}
+
+// AddMissingPorts to add the missing ports
+func (att *AuditTablesTask) AddMissingPorts(cntx context.Context, mps map[uint32]*ofp.OfpPort) {
+ logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
+
+ addMissingPort := func(mp *ofp.OfpPort) {
+ logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
+
+ // Error is ignored as it only drops duplicate ports
+ logger.Infow(ctx, "Calling AddPort", log.Fields{"No": mp.PortNo, "Name": mp.Name})
+ if err := att.device.AddPort(cntx, mp); err != nil {
+ logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
+ }
+ if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
+ att.device.ProcessPortState(cntx, mp.PortNo, mp.State)
+ }
+ logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
+
+ }
+
+ // 1st process the NNI port before all other ports so that the flow provisioning for UNIs can be enabled
+ if mp, ok := mps[NNIPortID]; ok {
+ logger.Info(ctx, "Adding Missing NNI port")
+ addMissingPort(mp)
+ }
+
+ for portNo, mp := range mps {
+ if portNo != NNIPortID {
+ addMissingPort(mp)
+ }
+ }
+}
+
+// DelExcessPorts to delete the excess ports
+func (att *AuditTablesTask) DelExcessPorts(cntx context.Context, eps []uint32) {
+ logger.Debugw(ctx, "Device Audit - Delete Excess Ports", log.Fields{"NumPorts": len(eps)})
+ for _, id := range eps {
+ // Now delete the port from the device @ VGC
+ logger.Infow(ctx, "Device Audit - Deleting Port", log.Fields{"PortId": id})
+ if err := att.device.DelPort(cntx, id); err != nil {
+ logger.Warnw(ctx, "DelPort Failed", log.Fields{"PortId": id, "Reason": err})
+ }
+ }
+}
+
diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go
index 7f5d9f1..3ed1c04 100644
--- a/internal/pkg/controller/controller.go
+++ b/internal/pkg/controller/controller.go
@@ -48,14 +48,6 @@
var db database.DBIntf
-var deviceTableSyncDuration = 15 * time.Minute
-
-//SetDeviceTableSyncDuration - sets interval between device table sync up activity
-// duration - in minutes
-func SetDeviceTableSyncDuration(duration int) {
- deviceTableSyncDuration = time.Duration(duration) * time.Minute
-}
-
// VoltController structure
type VoltController struct {
rebootLock sync.Mutex
@@ -68,6 +60,7 @@
RebootFlow bool
BlockedDeviceList *util.ConcurrentMap
deviceTaskQueue *util.ConcurrentMap
+ deviceTableSyncDuration time.Duration
}
var vcontroller *VoltController
@@ -88,6 +81,17 @@
return &controller
}
+//SetDeviceTableSyncDuration - sets interval between device table sync up activity
+// duration - in minutes
+func (v *VoltController) SetDeviceTableSyncDuration(duration int) {
+ v.deviceTableSyncDuration = time.Duration(duration) * time.Second
+}
+
+//GetDeviceTableSyncDuration - returns configured device table sync duration
+func (v *VoltController) GetDeviceTableSyncDuration() time.Duration {
+ return v.deviceTableSyncDuration
+}
+
// AddDevice to add device
func (v *VoltController) AddDevice(cntx context.Context, config *intf.VPClientCfg) intf.IVPClient {
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
index 9fbe172..ea4ccb3 100644
--- a/internal/pkg/controller/device.go
+++ b/internal/pkg/controller/device.go
@@ -668,7 +668,7 @@
func (d *Device) synchronizeDeviceTables() {
- tick := time.NewTicker(deviceTableSyncDuration)
+ tick := time.NewTicker(GetController().GetDeviceTableSyncDuration())
loop:
for {
select {
diff --git a/internal/pkg/util/envutils/envutils.go b/internal/pkg/util/envutils/envutils.go
index 2c7d2ca..392e9b8 100644
--- a/internal/pkg/util/envutils/envutils.go
+++ b/internal/pkg/util/envutils/envutils.go
@@ -1,4 +1,6 @@
/*
+
+
* Copyright 2022-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -78,7 +80,7 @@
CPUProfile = "CPU_PROFILE"
MemProfile = "MEM_PROFILE"
VendorID = "VENDOR_ID"
- DeviceSyncDuration = "DEVICE_SYNC_DURATION"
+ DeviceSyncDuration = "DEVICE_SYNC_DURATION"
// openonu environment variables
OmciPacketCapture = "SAVE_OMCI_PACKET_CAPTURE"