[VOL-4756] Cleanup TODO context
Change-Id: I21d5ec8cc015154bc893e54c652d31562d8da5d9
diff --git a/internal/pkg/application/vnets.go b/internal/pkg/application/vnets.go
index 78fe54f..e11960c 100644
--- a/internal/pkg/application/vnets.go
+++ b/internal/pkg/application/vnets.go
@@ -18,6 +18,7 @@
import (
"encoding/json"
"errors"
+ "context"
"net"
infraerrorCodes "voltha-go-controller/internal/pkg/errorcodes"
"strconv"
@@ -162,7 +163,7 @@
}
//disassociatePortFromVnet - disassociate a port from Vnet and return true if the association map is empty
-func (vv *VoltVnet) disassociatePortFromVnet(device string, port string) {
+func (vv *VoltVnet) disassociatePortFromVnet(cntx context.Context, device string, port string) {
vv.VnetPortLock.Lock()
delete(vv.AssociatedPorts, port)
logger.Infow(ctx, "Disassociated Port from Vnet", log.Fields{"Device": device, "Port": port, "Vnet": vv.Name, "PendingDeleteFlow": vv.PendingDeleteFlow, "AssociatedPorts": vv.AssociatedPorts, "DeleteFlag": vv.DeleteInProgress})
@@ -173,7 +174,7 @@
if len(vv.PendingDeleteFlow[device]) == 0 {
logger.Warnw(ctx, "Deleting Vnet", log.Fields{"Name": vv.Name})
GetApplication().deleteVnetConfig(vv)
- _ = db.DelVnet(vv.Name)
+ _ = db.DelVnet(cntx, vv.Name)
} else {
logger.Warnw(ctx, "Skipping Del Vnet", log.Fields{"Name": vv.Name, "PendingDeleteFlow": vv.PendingDeleteFlow})
}
@@ -192,23 +193,23 @@
}
// WriteToDb commit the VNET to the database
-func (vv *VoltVnet) WriteToDb() {
+func (vv *VoltVnet) WriteToDb(cntx context.Context) {
if vv.DeleteInProgress {
logger.Warnw(ctx, "Skipping Redis Update for Vnet, Vnet delete in progress", log.Fields{"Vnet": vv.Name})
return
}
- vv.ForceWriteToDb()
+ vv.ForceWriteToDb(cntx)
}
//ForceWriteToDb force commit a vnet to the DB
-func (vv *VoltVnet) ForceWriteToDb() {
+func (vv *VoltVnet) ForceWriteToDb(cntx context.Context) {
vv.VnetPortLock.RLock()
defer vv.VnetPortLock.RUnlock()
vv.Version = database.PresentVersionMap[database.VnetPath]
logger.Debugw(ctx, "Updating VNET....", log.Fields{"vnet": vv})
if b, err := json.Marshal(vv); err == nil {
- if err:= db.PutVnet(vv.Name, string(b)); err != nil {
+ if err:= db.PutVnet(cntx, vv.Name, string(b)); err != nil {
logger.Warnw(ctx, "Add Vnet to DB failed", log.Fields{"vnet name": vv.Name, "Error": err})
}
}
@@ -275,7 +276,7 @@
}
// AddVnet to add a VNET to the list of VNETs configured.
-func (va *VoltApplication) AddVnet(cfg VnetConfig, oper *VnetOper) error {
+func (va *VoltApplication) AddVnet(cntx context.Context, cfg VnetConfig, oper *VnetOper) error {
AppMutex.VnetMutex.Lock()
var vv *VoltVnet
@@ -312,7 +313,7 @@
}
va.storeVnetConfig(cfg, vv)
- vv.WriteToDb()
+ vv.WriteToDb(cntx)
logger.Infow(ctx, "Added VNET TO DB", log.Fields{"cfg": cfg, "devicesToHandle": devicesToHandle})
@@ -322,7 +323,7 @@
}
// DelVnet to delete a VNET from the list of VNETs configured
-func (va *VoltApplication) DelVnet(name, deviceSerialNum string) error {
+func (va *VoltApplication) DelVnet(cntx context.Context, name, deviceSerialNum string) error {
logger.Infow(ctx, "Deleting Vnet", log.Fields{"Vnet": name})
AppMutex.VnetMutex.Lock()
if vnetIntf, ok := va.VnetsByName.Load(name); ok {
@@ -330,23 +331,23 @@
//Delete from mvp list
vnet.DevicesList = util.RemoveFromSlice(vnet.DevicesList, deviceSerialNum)
- va.DeleteDevFlowForVlanFromDevice(vnet, deviceSerialNum)
+ va.DeleteDevFlowForVlanFromDevice(cntx, vnet, deviceSerialNum)
if len(vnet.DevicesList) == 0 {
vnet.DeleteInProgress = true
vnet.PendingDeviceToDelete = deviceSerialNum
- vnet.ForceWriteToDb()
+ vnet.ForceWriteToDb(cntx)
vnet.VnetPortLock.RLock()
if len(vnet.PendingDeleteFlow) == 0 && !vnet.isAssociatedPortsPresent() {
logger.Warnw(ctx, "Deleting Vnet", log.Fields{"Name": vnet.Name, "AssociatedPorts": vnet.AssociatedPorts, "PendingDelFlows": vnet.PendingDeleteFlow})
va.deleteVnetConfig(vnet)
- _ = db.DelVnet(vnet.Name)
+ _ = db.DelVnet(cntx, vnet.Name)
} else {
logger.Warnw(ctx, "Skipping Del Vnet", log.Fields{"Name": vnet.Name, "AssociatedPorts": vnet.AssociatedPorts, "PendingDelFlows": vnet.PendingDeleteFlow})
}
vnet.VnetPortLock.RUnlock()
} else {
//Update the devicelist in db
- vnet.WriteToDb()
+ vnet.WriteToDb(cntx)
}
}
//TODO: if no vnets are present on device remove icmpv6 group from device
@@ -355,9 +356,9 @@
}
// UpdateVnet to update the VNET with associated service count
-func (va *VoltApplication) UpdateVnet(vv *VoltVnet) error {
+func (va *VoltApplication) UpdateVnet(cntx context.Context, vv *VoltVnet) error {
va.storeVnetConfig(vv.VnetConfig, vv)
- vv.WriteToDb()
+ vv.WriteToDb(cntx)
logger.Infow(ctx, "Updated VNET TO DB", log.Fields{"vv": vv.VnetConfig})
return nil
}
@@ -577,13 +578,13 @@
}
// DhcpResultInd for dhcp result indication
-func (vpv *VoltPortVnet) DhcpResultInd(res *layers.DHCPv4) {
- vpv.ProcessDhcpResult(res)
+func (vpv *VoltPortVnet) DhcpResultInd(cntx context.Context, res *layers.DHCPv4) {
+ vpv.ProcessDhcpResult(cntx, res)
}
// Dhcpv6ResultInd for dhcpv6 result indication
-func (vpv *VoltPortVnet) Dhcpv6ResultInd(ipv6Addr net.IP, leaseTime uint32) {
- vpv.ProcessDhcpv6Result(ipv6Addr, leaseTime)
+func (vpv *VoltPortVnet) Dhcpv6ResultInd(cntx context.Context, ipv6Addr net.IP, leaseTime uint32) {
+ vpv.ProcessDhcpv6Result(cntx, ipv6Addr, leaseTime)
}
// GetNniVlans to get nni vlans
@@ -611,20 +612,20 @@
}
// AddService to add service
-func (vpv *VoltPortVnet) AddService(service *VoltService) {
+func (vpv *VoltPortVnet) AddService(cntx context.Context, service *VoltService) {
vpv.services.Store(service.Name, service)
vpv.servicesCount.Inc()
logger.Infow(ctx, "Service added/updated to VPV", log.Fields{"Port": vpv.Port, "SVLAN": vpv.SVlan, "CVLAN": vpv.CVlan, "UNIVlan": vpv.UniVlan, "Service": service.Name, "Count": vpv.servicesCount.Load()})
}
// DelService to delete service
-func (vpv *VoltPortVnet) DelService(service *VoltService) {
+func (vpv *VoltPortVnet) DelService(cntx context.Context, service *VoltService) {
vpv.services.Delete(service.Name)
vpv.servicesCount.Dec()
// If the only Igmp Enabled service is removed, remove the Igmp trap flow along with it
if service.IgmpEnabled {
- if err := vpv.DelIgmpFlows(); err != nil {
+ if err := vpv.DelIgmpFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
@@ -635,26 +636,33 @@
}
// ProcessDhcpResult to process dhcp results
-func (vpv *VoltPortVnet) ProcessDhcpResult(res *layers.DHCPv4) {
+func (vpv *VoltPortVnet) ProcessDhcpResult(cntx context.Context, res *layers.DHCPv4) {
msgType := DhcpMsgType(res)
if msgType == layers.DHCPMsgTypeAck {
- vpv.ProcessDhcpSuccess(res)
+ vpv.ProcessDhcpSuccess(cntx, res)
} else if msgType == layers.DHCPMsgTypeNak {
vpv.DhcpStatus = DhcpStatusNacked
}
- vpv.WriteToDb()
+ vpv.WriteToDb(cntx)
+}
+
+// RangeOnServices to call a function on all services on the vpv
+func (vpv *VoltPortVnet) RangeOnServices(cntx context.Context, callback func(cntx context.Context, key, value interface{}) bool) {
+ vpv.services.Range(func(key, value interface{}) bool {
+ return callback(cntx, key, value)
+ })
}
// ProcessDhcpSuccess : Learn the IPv4 address allocated to the services and update the
// the services with the same. This also calls for adding flows
// for the services as the DHCP procedure is completed
-func (vpv *VoltPortVnet) ProcessDhcpSuccess(res *layers.DHCPv4) {
+func (vpv *VoltPortVnet) ProcessDhcpSuccess(cntx context.Context, res *layers.DHCPv4) {
vpv.DhcpStatus = DhcpStatusAcked
vpv.Ipv4Addr, _ = GetIpv4Addr(res)
logger.Infow(ctx, "Received IPv4 Address", log.Fields{"IP Address": vpv.Ipv4Addr.String()})
logger.Infow(ctx, "Services Configured", log.Fields{"Count": vpv.servicesCount.Load()})
- vpv.services.Range(vpv.updateIPv4AndProvisionFlows)
+ vpv.RangeOnServices(cntx, vpv.updateIPv4AndProvisionFlows)
vpv.ProcessDhcpv4Options(res)
}
@@ -675,17 +683,17 @@
// VNET. The same IPv6 address is also passed to the services. When a
// service is fetched all the associated information such as MAC address,
// IPv4 address and IPv6 addresses can be provided.
-func (vpv *VoltPortVnet) ProcessDhcpv6Result(ipv6Addr net.IP, leaseTime uint32) {
+func (vpv *VoltPortVnet) ProcessDhcpv6Result(cntx context.Context, ipv6Addr net.IP, leaseTime uint32) {
// TODO: Status based hanlding of flows
vpv.Dhcp6ExpiryTime = time.Now().Add((time.Duration(leaseTime) * time.Second))
vpv.Ipv6Addr = ipv6Addr
- vpv.services.Range(vpv.updateIPv6AndProvisionFlows)
- vpv.WriteToDb()
+ vpv.RangeOnServices(cntx, vpv.updateIPv6AndProvisionFlows)
+ vpv.WriteToDb(cntx)
}
// AddSvcUsMeterToDevice to add service upstream meter info to device
-func AddSvcUsMeterToDevice(key, value interface{}) bool {
+func AddSvcUsMeterToDevice(cntx context.Context, key, value interface{}) bool {
svc := value.(*VoltService)
logger.Infow(ctx, "Adding upstream meter profile to device", log.Fields{"ServiceName": svc.Name})
if device, _ := GetApplication().GetDeviceFromPort(svc.Port); device != nil {
@@ -697,7 +705,7 @@
}
// PushFlowsForPortVnet - triggers flow construction and push for provided VPV
-func (vpv *VoltPortVnet) PushFlowsForPortVnet(d *VoltDevice) {
+func (vpv *VoltPortVnet) PushFlowsForPortVnet(cntx context.Context, d *VoltDevice) {
vp := d.GetPort(vpv.Port)
@@ -717,7 +725,7 @@
// vpv.DsFlowsApplied = false
// vpv.UsFlowsApplied = false
vpv.VpvLock.Lock()
- vpv.PortUpInd(d, vpv.Port)
+ vpv.PortUpInd(cntx, d, vpv.Port)
vpv.VpvLock.Unlock()
}
@@ -726,7 +734,7 @@
// again here to apply the latest configuration if the configuration
// changed. Thus, a reboot of ONT forces the new configuration to get
// applied.
-func (vpv *VoltPortVnet) PortUpInd(device *VoltDevice, port string) {
+func (vpv *VoltPortVnet) PortUpInd(cntx context.Context, device *VoltDevice, port string) {
if vpv.DeleteInProgress {
logger.Errorw(ctx, "Ignoring VPV Port UP Ind, VPV deleteion In-Progress", log.Fields{"Device": device, "Port": port, "Vnet": vpv.VnetName})
@@ -760,16 +768,16 @@
logger.Infow(ctx, "Port Up - Trap Flows", log.Fields{"Device": device.Name, "Port": port})
// no HSIA flows for multicast service
if !vpv.McastService {
- vpv.services.Range(AddUsHsiaFlows)
+ vpv.RangeOnServices(cntx, AddUsHsiaFlows)
}
- vpv.AddTrapFlows()
+ vpv.AddTrapFlows(cntx)
if vpv.MacLearning == MacLearningNone || NonZeroMacAddress(vpv.MacAddr) {
logger.Infow(ctx, "Port Up - DS Flows", log.Fields{"Device": device.Name, "Port": port})
// US & DS DHCP, US HSIA flows are already installed
// install only DS HSIA flow here.
// no HSIA flows for multicast service
if !vpv.McastService {
- vpv.services.Range(AddDsHsiaFlows)
+ vpv.RangeOnServices(cntx, AddDsHsiaFlows)
}
}
@@ -781,50 +789,50 @@
// however is not seen as a real use case.
logger.Infow(ctx, "Port Up - Service Flows", log.Fields{"Device": device.Name, "Port": port})
if !vpv.McastService {
- vpv.services.Range(AddUsHsiaFlows)
+ vpv.RangeOnServices(cntx, AddUsHsiaFlows)
}
- vpv.AddTrapFlows()
+ vpv.AddTrapFlows(cntx)
if !vpv.McastService {
- vpv.services.Range(AddDsHsiaFlows)
+ vpv.RangeOnServices(cntx, AddDsHsiaFlows)
}
}
// Process IGMP proxy - install IGMP trap rules before DHCP trap rules
if vpv.IgmpEnabled {
logger.Infow(ctx, "Port Up - IGMP Flows", log.Fields{"Device": device.Name, "Port": port})
- vpv.services.Range(AddSvcUsMeterToDevice)
- if err := vpv.AddIgmpFlows(); err != nil {
+ vpv.RangeOnServices(cntx, AddSvcUsMeterToDevice)
+ if err := vpv.AddIgmpFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
if vpv.McastService {
- vpv.services.Range(PostAccessConfigSuccessInd)
+ vpv.RangeOnServices(cntx, PostAccessConfigSuccessInd)
}
}
- vpv.WriteToDb()
+ vpv.WriteToDb(cntx)
}
// PortDownInd : When the port status changes to down, we delete all configured flows
// The same indication is also passed to the services enqueued for them
// to take appropriate actions
-func (vpv *VoltPortVnet) PortDownInd(device string, port string) {
+func (vpv *VoltPortVnet) PortDownInd(cntx context.Context, device string, port string) {
logger.Infow(ctx, "VPV Port DOWN Ind, deleting all flows for services",
log.Fields{"service count": vpv.servicesCount.Load()})
- //vpv.services.Range(DelAllFlows)
- vpv.DelTrapFlows()
- vpv.DelHsiaFlows()
- vpv.WriteToDb()
- vpv.ClearServiceCounters()
+ //vpv.RangeOnServices(cntx, DelAllFlows)
+ vpv.DelTrapFlows(cntx)
+ vpv.DelHsiaFlows(cntx)
+ vpv.WriteToDb(cntx)
+ vpv.ClearServiceCounters(cntx)
}
// SetMacAddr : The MAC address is set when a MAC address is learnt through the
// packets received from the network. Currently, DHCP packets are
// only packets we learn the MAC address from
-func (vpv *VoltPortVnet) SetMacAddr(addr net.HardwareAddr) {
+func (vpv *VoltPortVnet) SetMacAddr(cntx context.Context, addr net.HardwareAddr) {
//Store Learnt MAC address and return if MACLearning is not enabled
vpv.LearntMacAddr = addr
@@ -852,15 +860,15 @@
// may have been changed
// Atleast one HSIA flow should be present in adapter to retain the TP and GEM
// hence delete one after the other
- vpv.services.Range(DelUsHsiaFlows)
+ vpv.RangeOnServices(cntx, DelUsHsiaFlows)
vpv.MacAddr = addr
- vpv.services.Range(vpv.setLearntMAC)
- vpv.services.Range(AddUsHsiaFlows)
- vpv.services.Range(DelDsHsiaFlows)
+ vpv.RangeOnServices(cntx, vpv.setLearntMAC)
+ vpv.RangeOnServices(cntx, AddUsHsiaFlows)
+ vpv.RangeOnServices(cntx, DelDsHsiaFlows)
GetApplication().DeleteMacInPortMap(vpv.MacAddr)
} else {
vpv.MacAddr = addr
- vpv.services.Range(vpv.setLearntMAC)
+ vpv.RangeOnServices(cntx, vpv.setLearntMAC)
logger.Infow(ctx, "MAC Address learnt from DHCP or ARP", log.Fields{"Learnt MAC": addr.String(), "Port": vpv.Port})
}
GetApplication().UpdateMacInPortMap(vpv.MacAddr, vpv.Port)
@@ -879,10 +887,10 @@
if vpv.FlowsApplied {
// no HSIA flows for multicast service
if !vpv.McastService {
- vpv.services.Range(AddDsHsiaFlows)
+ vpv.RangeOnServices(cntx, AddDsHsiaFlows)
}
}
- vpv.WriteToDb()
+ vpv.WriteToDb(cntx)
}
// MatchesVlans : If the VNET matches both S and C VLANs, return true. Else, return false
@@ -954,10 +962,10 @@
// AddSvc adds a service on the VNET on a port. The addition is
// triggered when NB requests for service addition
-func (vpv *VoltPortVnet) AddSvc(svc *VoltService) {
+func (vpv *VoltPortVnet) AddSvc(cntx context.Context, svc *VoltService) {
//vpv.services = append(vpv.services, svc)
- vpv.AddService(svc)
+ vpv.AddService(cntx, svc)
logger.Debugw(ctx, "Added Service to VPV", log.Fields{"Num of SVCs": vpv.servicesCount.Load(), "SVC": svc})
// Learn the circuit-id and remote-id from the service
@@ -1034,9 +1042,9 @@
//to which the serivce is associated
if vpv.FlowsApplied {
if NonZeroMacAddress(vpv.MacAddr) || svc.MacLearning == MacLearningNone {
- svc.AddHsiaFlows()
+ svc.AddHsiaFlows(cntx)
} else {
- if err:= svc.AddUsHsiaFlows(); err != nil {
+ if err:= svc.AddUsHsiaFlows(cntx); err != nil {
logger.Warnw(ctx, "Add US hsia flow failed", log.Fields{"service": svc.Name, "Error": err})
}
}
@@ -1047,71 +1055,71 @@
// service with Igmp Enabled needs to be installed
if svc.IgmpEnabled && vpv.FlowsApplied {
logger.Infow(ctx, "Add Service - IGMP Flows", log.Fields{"Device": vpv.Device, "Port": vpv.Port})
- if err := vpv.AddIgmpFlows(); err != nil {
+ if err := vpv.AddIgmpFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
if vpv.McastService {
//For McastService, send Service Activated indication once IGMP US flow is pushed
- vpv.services.Range(PostAccessConfigSuccessInd)
+ vpv.RangeOnServices(cntx, PostAccessConfigSuccessInd)
}
}
- vpv.WriteToDb()
+ vpv.WriteToDb(cntx)
}
// setLearntMAC to set learnt mac
-func (vpv *VoltPortVnet) setLearntMAC(key, value interface{}) bool {
+func (vpv *VoltPortVnet) setLearntMAC(cntx context.Context, key, value interface{}) bool {
svc := value.(*VoltService)
svc.SetMacAddr(vpv.MacAddr)
- svc.WriteToDb()
+ svc.WriteToDb(cntx)
return true
}
// PostAccessConfigSuccessInd for posting access config success indication
-func PostAccessConfigSuccessInd(key, value interface{}) bool {
+func PostAccessConfigSuccessInd(cntx context.Context, key, value interface{}) bool {
return true
}
// updateIPv4AndProvisionFlows to update ipv4 and provisional flows
-func (vpv *VoltPortVnet) updateIPv4AndProvisionFlows(key, value interface{}) bool {
+func (vpv *VoltPortVnet) updateIPv4AndProvisionFlows(cntx context.Context, key, value interface{}) bool {
svc := value.(*VoltService)
logger.Infow(ctx, "Updating Ipv4 address for service", log.Fields{"ServiceName": svc.Name})
svc.SetIpv4Addr(vpv.Ipv4Addr)
- svc.WriteToDb()
+ svc.WriteToDb(cntx)
return true
}
// updateIPv6AndProvisionFlows to update ipv6 and provisional flow
-func (vpv *VoltPortVnet) updateIPv6AndProvisionFlows(key, value interface{}) bool {
+func (vpv *VoltPortVnet) updateIPv6AndProvisionFlows(cntx context.Context, key, value interface{}) bool {
svc := value.(*VoltService)
svc.SetIpv6Addr(vpv.Ipv6Addr)
- svc.WriteToDb()
+ svc.WriteToDb(cntx)
return true
}
// AddUsHsiaFlows to add upstream hsia flows
-func AddUsHsiaFlows(key, value interface{}) bool {
+func AddUsHsiaFlows(cntx context.Context, key, value interface{}) bool {
svc := value.(*VoltService)
- if err:= svc.AddUsHsiaFlows(); err != nil {
+ if err:= svc.AddUsHsiaFlows(cntx); err != nil {
logger.Warnw(ctx, "Add US hsia flow failed", log.Fields{"service": svc.Name, "Error": err})
}
return true
}
// AddDsHsiaFlows to add downstream hsia flows
-func AddDsHsiaFlows(key, value interface{}) bool {
+func AddDsHsiaFlows(cntx context.Context, key, value interface{}) bool {
svc := value.(*VoltService)
- if err:= svc.AddDsHsiaFlows(); err != nil {
+ if err:= svc.AddDsHsiaFlows(cntx); err != nil {
logger.Warnw(ctx, "Add DS hsia flow failed", log.Fields{"service": svc.Name, "Error": err})
}
return true
}
// ClearFlagsInService to clear the flags used in service
-func ClearFlagsInService(key, value interface{}) bool {
+func ClearFlagsInService(cntx context.Context, key, value interface{}) bool {
svc := value.(*VoltService)
svc.ServiceLock.Lock()
svc.IgmpFlowsApplied = false
@@ -1123,50 +1131,50 @@
svc.PendingFlows = make(map[string]bool)
svc.AssociatedFlows = make(map[string]bool)
svc.ServiceLock.Unlock()
- svc.WriteToDb()
+ svc.WriteToDb(cntx)
logger.Debugw(ctx, "Cleared Flow Flags for service", log.Fields{"name": svc.Name})
return true
}
// DelDsHsiaFlows to delete hsia flows
-func DelDsHsiaFlows(key, value interface{}) bool {
+func DelDsHsiaFlows(cntx context.Context, key, value interface{}) bool {
svc := value.(*VoltService)
- if err:= svc.DelDsHsiaFlows(); err != nil {
+ if err:= svc.DelDsHsiaFlows(cntx); err != nil {
logger.Warnw(ctx, "Delete DS hsia flow failed", log.Fields{"service": svc.Name, "Error": err})
}
return true
}
// DelUsHsiaFlows to delete upstream hsia flows
-func DelUsHsiaFlows(key, value interface{}) bool {
+func DelUsHsiaFlows(cntx context.Context, key, value interface{}) bool {
svc := value.(*VoltService)
- if err:= svc.DelUsHsiaFlows(); err != nil {
+ if err:= svc.DelUsHsiaFlows(cntx); err != nil {
logger.Warnw(ctx, "Delete US hsia flow failed", log.Fields{"service": svc.Name, "Error": err})
}
return true
}
// ClearServiceCounters to clear the service counters
-func ClearServiceCounters(key, value interface{}) bool {
+func ClearServiceCounters(cntx context.Context, key, value interface{}) bool {
svc := value.(*VoltService)
//Delete the per service counter too
GetApplication().ServiceCounters.Delete(svc.Name)
if svc.IgmpEnabled && svc.EnableMulticastKPI {
- _ = db.DelAllServiceChannelCounter(svc.Name)
+ _ = db.DelAllServiceChannelCounter(cntx, svc.Name)
}
return true
}
//AddTrapFlows - Adds US & DS Trap flows
-func (vpv *VoltPortVnet) AddTrapFlows() {
+func (vpv *VoltPortVnet) AddTrapFlows(cntx context.Context) {
if !vpv.FlowsApplied || vgcRebooted {
if vpv.DhcpRelay {
- if err := vpv.AddUsDhcpFlows(); err != nil {
+ if err := vpv.AddUsDhcpFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
- if err := vpv.AddDsDhcpFlows(); err != nil {
+ if err := vpv.AddDsDhcpFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
@@ -1174,85 +1182,85 @@
log.Fields{"port": vpv.Port})
//vpv.updateICMPv6McGroup(true)
} else if vpv.ArpRelay {
- if err := vpv.AddUsArpFlows(); err != nil {
+ if err := vpv.AddUsArpFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
logger.Info(ctx, "ARP trap rules not added in downstream direction")
} else if vpv.PppoeIa {
- if err := vpv.AddUsPppoeFlows(); err != nil {
+ if err := vpv.AddUsPppoeFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
- if err := vpv.AddDsPppoeFlows(); err != nil {
+ if err := vpv.AddDsPppoeFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
}
vpv.FlowsApplied = true
- vpv.WriteToDb()
+ vpv.WriteToDb(cntx)
}
}
//DelTrapFlows - Removes all US & DS DHCP, IGMP trap flows.
-func (vpv *VoltPortVnet) DelTrapFlows() {
+func (vpv *VoltPortVnet) DelTrapFlows(cntx context.Context) {
// Delete HSIA & DHCP flows before deleting IGMP flows
if vpv.FlowsApplied || vgcRebooted {
if vpv.DhcpRelay {
- if err:= vpv.DelUsDhcpFlows(); err != nil {
+ if err:= vpv.DelUsDhcpFlows(cntx); err != nil {
logger.Warnw(ctx, "Delete US hsia flow failed", log.Fields{"port": vpv.Port, "SVlan": vpv.SVlan, "CVlan": vpv.CVlan,
"UniVlan": vpv.UniVlan, "Error": err})
}
logger.Infow(ctx, "ICMPv6 MC Group modification will not be triggered to rwcore for ",
log.Fields{"port": vpv.Port})
- if err := vpv.DelDsDhcpFlows(); err != nil {
+ if err := vpv.DelDsDhcpFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
//vpv.updateICMPv6McGroup(false)
} else if vpv.ArpRelay {
- if err := vpv.DelUsArpFlows(); err != nil {
+ if err := vpv.DelUsArpFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
} else if vpv.PppoeIa {
- if err := vpv.DelUsPppoeFlows(); err != nil {
+ if err := vpv.DelUsPppoeFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
- if err := vpv.DelDsPppoeFlows(); err != nil {
+ if err := vpv.DelDsPppoeFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
}
vpv.FlowsApplied = false
- vpv.WriteToDb()
+ vpv.WriteToDb(cntx)
}
- if err:= vpv.DelIgmpFlows(); err != nil {
+ if err:= vpv.DelIgmpFlows(cntx); err != nil {
logger.Warnw(ctx, "Delete igmp flow failed", log.Fields{"port": vpv.Port, "SVlan": vpv.SVlan, "CVlan": vpv.CVlan,
"UniVlan": vpv.UniVlan, "Error": err})
}
}
// DelHsiaFlows deletes the service flows
-func (vpv *VoltPortVnet) DelHsiaFlows() {
+func (vpv *VoltPortVnet) DelHsiaFlows(cntx context.Context) {
// no HSIA flows for multicast service
if !vpv.McastService {
- vpv.services.Range(DelUsHsiaFlows)
- vpv.services.Range(DelDsHsiaFlows)
+ vpv.RangeOnServices(cntx, DelUsHsiaFlows)
+ vpv.RangeOnServices(cntx, DelDsHsiaFlows)
}
}
//ClearServiceCounters - Removes all igmp counters for a service
-func (vpv *VoltPortVnet) ClearServiceCounters() {
+func (vpv *VoltPortVnet) ClearServiceCounters(cntx context.Context) {
//send flows deleted indication to submgr
- vpv.services.Range(ClearServiceCounters)
+ vpv.RangeOnServices(cntx, ClearServiceCounters)
}
// AddUsDhcpFlows pushes the DHCP flows to the VOLTHA via the controller
-func (vpv *VoltPortVnet) AddUsDhcpFlows() error {
+func (vpv *VoltPortVnet) AddUsDhcpFlows(cntx context.Context) error {
var vd *VoltDevice
device := vpv.Device
@@ -1269,7 +1277,7 @@
flows, err := vpv.BuildUsDhcpFlows()
if err == nil {
logger.Debugw(ctx, "Adding US DHCP flows", log.Fields{"Device": device})
- if err1 := vpv.PushFlows(vd, flows); err1 != nil {
+ if err1 := vpv.PushFlows(cntx, vd, flows); err1 != nil {
//push ind here ABHI
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err1)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
@@ -1302,7 +1310,7 @@
}
// AddDsDhcpFlows function pushes the DHCP flows to the VOLTHA via the controller
-func (vpv *VoltPortVnet) AddDsDhcpFlows() error {
+func (vpv *VoltPortVnet) AddDsDhcpFlows(cntx context.Context) error {
var vd *VoltDevice
device := vpv.Device
@@ -1322,7 +1330,7 @@
flows, err := vpv.BuildDsDhcpFlows()
if err == nil {
- if err1 := vpv.PushFlows(vd, flows); err1 != nil {
+ if err1 := vpv.PushFlows(cntx, vd, flows); err1 != nil {
//push ind here and procced
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err1)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
@@ -1358,13 +1366,13 @@
}
// DelDhcpFlows deletes both US & DS DHCP flows applied for this Vnet instantiated on the port
-func (vpv *VoltPortVnet) DelDhcpFlows() {
- if err := vpv.DelUsDhcpFlows(); err != nil {
+func (vpv *VoltPortVnet) DelDhcpFlows(cntx context.Context) {
+ if err := vpv.DelUsDhcpFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
- if err := vpv.DelDsDhcpFlows(); err != nil {
+ if err := vpv.DelDsDhcpFlows(cntx); err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
@@ -1373,13 +1381,13 @@
// DelUsDhcpFlows delete the DHCP flows applied for this Vnet instantiated on the port
// Write the status of the VPV to the DB once the delete is scheduled
// for dispatch
-func (vpv *VoltPortVnet) DelUsDhcpFlows() error {
+func (vpv *VoltPortVnet) DelUsDhcpFlows(cntx context.Context) error {
device, err := GetApplication().GetDeviceFromPort(vpv.Port)
if err != nil {
return err
}
- err = vpv.delDhcp4Flows(device)
+ err = vpv.delDhcp4Flows(cntx, device)
if err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
@@ -1393,10 +1401,10 @@
return nil
}
-func (vpv *VoltPortVnet) delDhcp4Flows(device *VoltDevice) error {
+func (vpv *VoltPortVnet) delDhcp4Flows(cntx context.Context, device *VoltDevice) error {
flows, err := vpv.BuildUsDhcpFlows()
if err == nil {
- return vpv.RemoveFlows(device, flows)
+ return vpv.RemoveFlows(cntx, device, flows)
}
logger.Errorw(ctx, "US DHCP Flow Delete Failed", log.Fields{"Reason": err.Error()})
return err
@@ -1415,12 +1423,12 @@
// DelDsDhcpFlows delete the DHCP flows applied for this Vnet instantiated on the port
// Write the status of the VPV to the DB once the delete is scheduled
// for dispatch
-func (vpv *VoltPortVnet) DelDsDhcpFlows() error {
+func (vpv *VoltPortVnet) DelDsDhcpFlows(cntx context.Context) error {
device, err := GetApplication().GetDeviceFromPort(vpv.Port)
if err != nil {
return err
}
- err = vpv.delDsDhcp4Flows(device)
+ err = vpv.delDsDhcp4Flows(cntx, device)
if err != nil {
statusCode, statusMessage := infraerrorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
@@ -1434,10 +1442,10 @@
return nil
}
-func (vpv *VoltPortVnet) delDsDhcp4Flows(device *VoltDevice) error {
+func (vpv *VoltPortVnet) delDsDhcp4Flows(cntx context.Context, device *VoltDevice) error {
flows, err := vpv.BuildDsDhcpFlows()
if err == nil {
- return vpv.RemoveFlows(device, flows)
+ return vpv.RemoveFlows(cntx, device, flows)
}
logger.Errorw(ctx, "DS DHCP Flow Delete Failed", log.Fields{"Reason": err.Error()})
return err
@@ -1454,7 +1462,7 @@
}*/
// AddUsArpFlows pushes the ARP flows to the VOLTHA via the controller
-func (vpv *VoltPortVnet) AddUsArpFlows() error {
+func (vpv *VoltPortVnet) AddUsArpFlows(cntx context.Context) error {
var vd *VoltDevice
device := vpv.Device
@@ -1471,7 +1479,7 @@
flows, err := vpv.BuildUsArpFlows()
if err == nil {
logger.Debugw(ctx, "Adding US ARP flows", log.Fields{"Device": device})
- if err1 := vpv.PushFlows(vd, flows); err1 != nil {
+ if err1 := vpv.PushFlows(cntx, vd, flows); err1 != nil {
return err1
}
} else {
@@ -1484,21 +1492,21 @@
// DelUsArpFlows delete the ARP flows applied for this Vnet instantiated on the port
// Write the status of the VPV to the DB once the delete is scheduled
// for dispatch
-func (vpv *VoltPortVnet) DelUsArpFlows() error {
+func (vpv *VoltPortVnet) DelUsArpFlows(cntx context.Context) error {
device, err := GetApplication().GetDeviceFromPort(vpv.Port)
if err != nil {
return err
}
flows, err := vpv.BuildUsArpFlows()
if err == nil {
- return vpv.RemoveFlows(device, flows)
+ return vpv.RemoveFlows(cntx, device, flows)
}
logger.Errorw(ctx, "US ARP Flow Delete Failed", log.Fields{"Reason": err.Error()})
return err
}
// AddUsPppoeFlows pushes the PPPoE flows to the VOLTHA via the controller
-func (vpv *VoltPortVnet) AddUsPppoeFlows() error {
+func (vpv *VoltPortVnet) AddUsPppoeFlows(cntx context.Context) error {
logger.Debugw(ctx, "Adding US PPPoE flows", log.Fields{"STAG": vpv.SVlan, "CTAG": vpv.CVlan, "Device": vpv.Device})
var vd *VoltDevice
@@ -1517,7 +1525,7 @@
if flows, err := vpv.BuildUsPppoeFlows(); err == nil {
logger.Debugw(ctx, "Adding US PPPoE flows", log.Fields{"Device": device})
- if err1 := vpv.PushFlows(vd, flows); err1 != nil {
+ if err1 := vpv.PushFlows(cntx, vd, flows); err1 != nil {
return err1
}
} else {
@@ -1528,7 +1536,7 @@
}
// AddDsPppoeFlows to add downstream pppoe flows
-func (vpv *VoltPortVnet) AddDsPppoeFlows() error {
+func (vpv *VoltPortVnet) AddDsPppoeFlows(cntx context.Context) error {
logger.Debugw(ctx, "Adding DS PPPoE flows", log.Fields{"STAG": vpv.SVlan, "CTAG": vpv.CVlan, "Device": vpv.Device})
var vd *VoltDevice
device := vpv.Device
@@ -1546,7 +1554,7 @@
flows, err := vpv.BuildDsPppoeFlows()
if err == nil {
- if err1 := vpv.PushFlows(vd, flows); err1 != nil {
+ if err1 := vpv.PushFlows(cntx, vd, flows); err1 != nil {
return err1
}
} else {
@@ -1559,7 +1567,7 @@
// DelUsPppoeFlows delete the PPPoE flows applied for this Vnet instantiated on the port
// Write the status of the VPV to the DB once the delete is scheduled
// for dispatch
-func (vpv *VoltPortVnet) DelUsPppoeFlows() error {
+func (vpv *VoltPortVnet) DelUsPppoeFlows(cntx context.Context) error {
logger.Debugw(ctx, "Deleting US PPPoE flows", log.Fields{"STAG": vpv.SVlan, "CTAG": vpv.CVlan, "Device": vpv.Device})
device, err := GetApplication().GetDeviceFromPort(vpv.Port)
if err != nil {
@@ -1567,7 +1575,7 @@
}
flows, err := vpv.BuildUsPppoeFlows()
if err == nil {
- return vpv.RemoveFlows(device, flows)
+ return vpv.RemoveFlows(cntx, device, flows)
}
logger.Errorw(ctx, "US PPPoE Flow Delete Failed", log.Fields{"Reason": err.Error()})
return err
@@ -1576,7 +1584,7 @@
// DelDsPppoeFlows delete the PPPoE flows applied for this Vnet instantiated on the port
// Write the status of the VPV to the DB once the delete is scheduled
// for dispatch
-func (vpv *VoltPortVnet) DelDsPppoeFlows() error {
+func (vpv *VoltPortVnet) DelDsPppoeFlows(cntx context.Context) error {
logger.Debugw(ctx, "Deleting DS PPPoE flows", log.Fields{"STAG": vpv.SVlan, "CTAG": vpv.CVlan, "Device": vpv.Device})
device, err := GetApplication().GetDeviceFromPort(vpv.Port)
if err != nil {
@@ -1584,14 +1592,14 @@
}
flows, err := vpv.BuildDsPppoeFlows()
if err == nil {
- return vpv.RemoveFlows(device, flows)
+ return vpv.RemoveFlows(cntx, device, flows)
}
logger.Errorw(ctx, "DS PPPoE Flow Delete Failed", log.Fields{"Reason": err.Error()})
return err
}
// AddIgmpFlows function pushes the IGMP flows to the VOLTHA via the controller
-func (vpv *VoltPortVnet) AddIgmpFlows() error {
+func (vpv *VoltPortVnet) AddIgmpFlows(cntx context.Context) error {
if !vpv.IgmpFlowsApplied || vgcRebooted {
if vpv.MvlanProfileName == "" {
@@ -1619,7 +1627,7 @@
vd.RegisterFlowAddEvent(cookie, fe)
}
}
- if err1 := cntlr.GetController().AddFlows(vpv.Port, device.Name, flows); err1 != nil {
+ if err1 := cntlr.GetController().AddFlows(cntx, vpv.Port, device.Name, flows); err1 != nil {
return err1
}
} else {
@@ -1627,7 +1635,7 @@
return err
}
vpv.IgmpFlowsApplied = true
- vpv.WriteToDb()
+ vpv.WriteToDb(cntx)
}
return nil
}
@@ -1635,7 +1643,7 @@
// DelIgmpFlows delete the IGMP flows applied for this Vnet instantiated on the port
// Write the status of the VPV to the DB once the delete is scheduled
// for dispatch
-func (vpv *VoltPortVnet) DelIgmpFlows() error {
+func (vpv *VoltPortVnet) DelIgmpFlows(cntx context.Context) error {
if vpv.IgmpFlowsApplied || vgcRebooted {
device, err := GetApplication().GetDeviceFromPort(vpv.Port)
@@ -1645,7 +1653,7 @@
}
flows, err := vpv.BuildIgmpFlows()
if err == nil {
- if err1 := vpv.RemoveFlows(device, flows); err1 != nil {
+ if err1 := vpv.RemoveFlows(cntx, device, flows); err1 != nil {
return err1
}
} else {
@@ -1653,7 +1661,7 @@
return err
}
vpv.IgmpFlowsApplied = false
- vpv.WriteToDb()
+ vpv.WriteToDb(cntx)
}
return nil
}
@@ -2111,21 +2119,21 @@
}
// WriteToDb for writing to database
-func (vpv *VoltPortVnet) WriteToDb() {
+func (vpv *VoltPortVnet) WriteToDb(cntx context.Context) {
if vpv.DeleteInProgress {
logger.Warnw(ctx, "Skipping Redis Update for VPV, VPV delete in progress", log.Fields{"Vnet": vpv.VnetName, "Port": vpv.Port})
return
}
- vpv.ForceWriteToDb()
+ vpv.ForceWriteToDb(cntx)
}
//ForceWriteToDb force commit a VPV to the DB
-func (vpv *VoltPortVnet) ForceWriteToDb() {
+func (vpv *VoltPortVnet) ForceWriteToDb(cntx context.Context) {
vpv.PendingFlowLock.RLock()
defer vpv.PendingFlowLock.RUnlock()
vpv.Version = database.PresentVersionMap[database.VpvPath]
if b, err := json.Marshal(vpv); err == nil {
- if err := db.PutVpv(vpv.Port, uint16(vpv.SVlan), uint16(vpv.CVlan), uint16(vpv.UniVlan), string(b)); err != nil {
+ if err := db.PutVpv(cntx, vpv.Port, uint16(vpv.SVlan), uint16(vpv.CVlan), uint16(vpv.UniVlan), string(b)); err != nil {
logger.Warnw(ctx, "VPV write to DB failed", log.Fields{"port": vpv.Port, "SVlan": vpv.SVlan, "CVlan": vpv.CVlan,
"UniVlan": vpv.UniVlan, "Error": err})
}
@@ -2133,24 +2141,24 @@
}
// DelFromDb for deleting from database
-func (vpv *VoltPortVnet) DelFromDb() {
+func (vpv *VoltPortVnet) DelFromDb(cntx context.Context) {
logger.Debugw(ctx, "Deleting VPV from DB", log.Fields{"Port": vpv.Port, "SVLAN": vpv.SVlan, "CVLAN": vpv.CVlan})
- _ = db.DelVpv(vpv.Port, uint16(vpv.SVlan), uint16(vpv.CVlan), uint16(vpv.UniVlan))
+ _ = db.DelVpv(cntx, vpv.Port, uint16(vpv.SVlan), uint16(vpv.CVlan), uint16(vpv.UniVlan))
}
// ClearAllServiceFlags to clear all service flags
-func (vpv *VoltPortVnet) ClearAllServiceFlags() {
- vpv.services.Range(ClearFlagsInService)
+func (vpv *VoltPortVnet) ClearAllServiceFlags(cntx context.Context) {
+ vpv.RangeOnServices(cntx, ClearFlagsInService)
}
// ClearAllVpvFlags to clear all vpv flags
-func (vpv *VoltPortVnet) ClearAllVpvFlags() {
+func (vpv *VoltPortVnet) ClearAllVpvFlags(cntx context.Context) {
vpv.PendingFlowLock.Lock()
vpv.FlowsApplied = false
vpv.IgmpFlowsApplied = false
vpv.PendingDeleteFlow = make(map[string]bool)
vpv.PendingFlowLock.Unlock()
- vpv.WriteToDb()
+ vpv.WriteToDb(cntx)
logger.Debugw(ctx, "Cleared Flow Flags for VPV",
log.Fields{"device": vpv.Device, "port": vpv.Port,
"svlan": vpv.SVlan, "cvlan": vpv.CVlan, "univlan": vpv.UniVlan})
@@ -2183,9 +2191,9 @@
}
// RestoreVpvsFromDb to restore vpvs from database
-func (va *VoltApplication) RestoreVpvsFromDb() {
+func (va *VoltApplication) RestoreVpvsFromDb(cntx context.Context) {
// VNETS must be learnt first
- vpvs, _ := db.GetVpvs()
+ vpvs, _ := db.GetVpvs(cntx)
for hash, vpv := range vpvs {
b, ok := vpv.Value.([]byte)
if !ok {
@@ -2214,7 +2222,7 @@
}
// AddVnetToPort to add vnet to port
-func (va *VoltApplication) AddVnetToPort(port string, vvnet *VoltVnet, vs *VoltService) *VoltPortVnet {
+func (va *VoltApplication) AddVnetToPort(cntx context.Context, port string, vvnet *VoltVnet, vs *VoltService) *VoltPortVnet {
// The VNET is not on the port and is to be added
logger.Debugw(ctx, "Adding VNET to Port", log.Fields{"Port": port, "VNET": vvnet.Name})
vpv := NewVoltPortVnet(vvnet)
@@ -2234,7 +2242,7 @@
defer vpv.VpvLock.Unlock()
// Add the service that is causing the VNET to be added to the port
- vpv.AddSvc(vs)
+ vpv.AddSvc(cntx, vs)
// Process the PORT UP if the port is already up
d, err := va.GetDeviceFromPort(port)
@@ -2248,17 +2256,17 @@
} else {
logger.Infow(ctx, "Checking UNI port state", log.Fields{"State": p.State})
if d.State == controller.DeviceStateUP && p.State == PortStateUp {
- vpv.PortUpInd(d, port)
+ vpv.PortUpInd(cntx, d, port)
}
}
}
}
- vpv.WriteToDb()
+ vpv.WriteToDb(cntx)
return vpv
}
// DelVnetFromPort for deleting vnet from port
-func (va *VoltApplication) DelVnetFromPort(port string, vpv *VoltPortVnet) {
+func (va *VoltApplication) DelVnetFromPort(cntx context.Context, port string, vpv *VoltPortVnet) {
//Delete DHCP Session
delDhcpSessions(vpv.LearntMacAddr, vpv.SVlan, vpv.CVlan, vpv.DHCPv6DUID)
@@ -2283,18 +2291,18 @@
vpvs = append(vpvs[0:i], vpvs[i+1:]...)
vpv.DeleteInProgress = true
- vpv.ForceWriteToDb()
+ vpv.ForceWriteToDb(cntx)
va.VnetsByPort.Store(port, vpvs)
- vpv.DelTrapFlows()
- vpv.DelHsiaFlows()
+ vpv.DelTrapFlows(cntx)
+ vpv.DelHsiaFlows(cntx)
va.DisassociateVpvsFromDevice(vpv.Device, vpv)
vpv.PendingFlowLock.RLock()
if len(vpv.PendingDeleteFlow) == 0 {
- vpv.DelFromDb()
+ vpv.DelFromDb(cntx)
}
if vnet := va.GetVnetByName(vpv.VnetName); vnet != nil {
- vnet.disassociatePortFromVnet(vpv.Device, vpv.Port)
+ vnet.disassociatePortFromVnet(cntx, vpv.Device, vpv.Port)
}
vpv.PendingFlowLock.RUnlock()
return
@@ -2303,9 +2311,9 @@
}
// RestoreVnetsFromDb to restore vnet from port
-func (va *VoltApplication) RestoreVnetsFromDb() {
+func (va *VoltApplication) RestoreVnetsFromDb(cntx context.Context) {
// VNETS must be learnt first
- vnets, _ := db.GetVnets()
+ vnets, _ := db.GetVnets(cntx)
for _, net := range vnets {
b, ok := net.Value.([]byte)
if !ok {
@@ -2319,7 +2327,7 @@
continue
}
logger.Debugw(ctx, "Retrieved VNET", log.Fields{"VNET": vnet.VnetConfig})
- if err := va.AddVnet(vnet.VnetConfig, &vnet.VnetOper); err != nil {
+ if err := va.AddVnet(cntx, vnet.VnetConfig, &vnet.VnetOper); err != nil {
logger.Warnw(ctx, "Add Vnet Failed", log.Fields{"Config": vnet.VnetConfig, "Error": err})
}
@@ -2454,7 +2462,7 @@
}
// PushDevFlowForVlan to push icmpv6 flows for vlan
-func (va *VoltApplication) PushDevFlowForVlan(vnet *VoltVnet) {
+func (va *VoltApplication) PushDevFlowForVlan(cntx context.Context, vnet *VoltVnet) {
logger.Infow(ctx, "PushDevFlowForVlan", log.Fields{"SVlan": vnet.SVlan, "CVlan": vnet.CVlan})
pushflow := func(key interface{}, value interface{}) bool {
device := value.(*VoltDevice)
@@ -2492,7 +2500,7 @@
//Pushing ICMPv6 Flow
flow := BuildICMPv6Flow(portID, vnet)
- err = cntlr.GetController().AddFlows(device.NniPort, device.Name, flow)
+ err = cntlr.GetController().AddFlows(cntx, device.NniPort, device.Name, flow)
if err != nil {
logger.Warnw(ctx, "Configuring ICMPv6 Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -2501,7 +2509,7 @@
// Pushing ARP Flow
flow = BuildDSArpFlow(portID, vnet)
- err = cntlr.GetController().AddFlows(device.NniPort, device.Name, flow)
+ err = cntlr.GetController().AddFlows(cntx, device.NniPort, device.Name, flow)
if err != nil {
logger.Warnw(ctx, "Configuring ARP Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -2518,7 +2526,7 @@
}
// PushDevFlowForDevice to push icmpv6 flows for device
-func (va *VoltApplication) PushDevFlowForDevice(device *VoltDevice) {
+func (va *VoltApplication) PushDevFlowForDevice(cntx context.Context, device *VoltDevice) {
logger.Infow(ctx, "PushDevFlowForDevice", log.Fields{"device": device})
logger.Debugw(ctx, "Configuring ICMPv6 Group for device ", log.Fields{"Device": device.Name})
@@ -2545,7 +2553,7 @@
return true
}
flow := BuildICMPv6Flow(nniPortID, vnet)
- err = cntlr.GetController().AddFlows(device.NniPort, device.Name, flow)
+ err = cntlr.GetController().AddFlows(cntx, device.NniPort, device.Name, flow)
if err != nil {
logger.Warnw(ctx, "Configuring ICMPv6 Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -2553,7 +2561,7 @@
logger.Infow(ctx, "ICMP Flow Added to Queue", log.Fields{"flow": flow})
flow = BuildDSArpFlow(nniPortID, vnet)
- err = cntlr.GetController().AddFlows(device.NniPort, device.Name, flow)
+ err = cntlr.GetController().AddFlows(cntx, device.NniPort, device.Name, flow)
if err != nil {
logger.Warnw(ctx, "Configuring ARP Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -2569,7 +2577,7 @@
}
// DeleteDevFlowForVlan to delete icmpv6 flow for vlan
-func (va *VoltApplication) DeleteDevFlowForVlan(vnet *VoltVnet) {
+func (va *VoltApplication) DeleteDevFlowForVlan(cntx context.Context, vnet *VoltVnet) {
logger.Infow(ctx, "DeleteDevFlowForVlan", log.Fields{"SVlan": vnet.SVlan, "CVlan": vnet.CVlan})
delflows := func(key interface{}, value interface{}) bool {
device := value.(*VoltDevice)
@@ -2591,7 +2599,7 @@
//Pushing ICMPv6 Flow
flow := BuildICMPv6Flow(portID, vnet)
flow.ForceAction = true
- err := vnet.RemoveFlows(device, flow)
+ err := vnet.RemoveFlows(cntx, device, flow)
if err != nil {
logger.Warnw(ctx, "De-Configuring ICMPv6 Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -2601,7 +2609,7 @@
//Pushing ARP Flow
flow = BuildDSArpFlow(portID, vnet)
flow.ForceAction = true
- err = vnet.RemoveFlows(device, flow)
+ err = vnet.RemoveFlows(cntx, device, flow)
if err != nil {
logger.Warnw(ctx, "De-Configuring ARP Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -2616,7 +2624,7 @@
}
// DeleteDevFlowForDevice to delete icmpv6 flow for device
-func (va *VoltApplication) DeleteDevFlowForDevice(device *VoltDevice) {
+func (va *VoltApplication) DeleteDevFlowForDevice(cntx context.Context, device *VoltDevice) {
logger.Infow(ctx, "DeleteDevFlowForDevice", log.Fields{"Device": device})
delicmpv6 := func(key, value interface{}) bool {
vnet := value.(*VoltVnet)
@@ -2638,7 +2646,7 @@
}
flow := BuildICMPv6Flow(nniPortID, vnet)
flow.ForceAction = true
- err = vnet.RemoveFlows(device, flow)
+ err = vnet.RemoveFlows(cntx, device, flow)
if err != nil {
logger.Warnw(ctx, "De-Configuring ICMPv6 Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -2646,7 +2654,7 @@
flow = BuildDSArpFlow(nniPortID, vnet)
flow.ForceAction = true
- err = vnet.RemoveFlows(device, flow)
+ err = vnet.RemoveFlows(cntx, device, flow)
if err != nil {
logger.Warnw(ctx, "De-Configuring ARP Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -2666,7 +2674,7 @@
}
// DeleteDevFlowForVlanFromDevice to delete icmpv6 flow for vlan from device
-func (va *VoltApplication) DeleteDevFlowForVlanFromDevice(vnet *VoltVnet, deviceSerialNum string) {
+func (va *VoltApplication) DeleteDevFlowForVlanFromDevice(cntx context.Context, vnet *VoltVnet, deviceSerialNum string) {
logger.Infow(ctx, "DeleteDevFlowForVlanFromDevice", log.Fields{"Device-serialNum": deviceSerialNum, "SVlan": vnet.SVlan, "CVlan": vnet.CVlan})
delflows := func(key interface{}, value interface{}) bool {
device := value.(*VoltDevice)
@@ -2701,14 +2709,14 @@
}
flow := BuildICMPv6Flow(portID, vnet)
flow.ForceAction = true
- if err := vnet.RemoveFlows(device, flow); err != nil {
+ if err := vnet.RemoveFlows(cntx, device, flow); err != nil {
logger.Warnw(ctx, "Delete Flow Failed", log.Fields{"Device": device, "Flow": flow, "Error": err})
}
logger.Infow(ctx, "ICMP Flow Delete Added to Queue", log.Fields{"flow": flow})
flow = BuildDSArpFlow(portID, vnet)
flow.ForceAction = true
- if err := vnet.RemoveFlows(device, flow); err != nil {
+ if err := vnet.RemoveFlows(cntx, device, flow); err != nil {
logger.Warnw(ctx, "Delete Flow Failed", log.Fields{"Device": device, "Flow": flow, "Error": err})
}
logger.Infow(ctx, "ARP Flow Delete Added to Queue", log.Fields{"flow": flow})
@@ -2859,7 +2867,7 @@
}
//PushFlows - Triggers flow addition after registering for flow indication event
-func (vpv *VoltPortVnet) PushFlows(device *VoltDevice, flow *of.VoltFlow) error {
+func (vpv *VoltPortVnet) PushFlows(cntx context.Context, device *VoltDevice, flow *of.VoltFlow) error {
for cookie := range flow.SubFlows {
cookie := strconv.FormatUint(cookie, 10)
@@ -2870,7 +2878,7 @@
}
device.RegisterFlowAddEvent(cookie, fe)
}
- return cntlr.GetController().AddFlows(vpv.Port, device.Name, flow)
+ return cntlr.GetController().AddFlows(cntx, vpv.Port, device.Name, flow)
}
//FlowInstallFailure - Process flow failure indication and triggers HSIA failure for all associated services
@@ -2886,7 +2894,7 @@
}
//RemoveFlows - Triggers flow deletion after registering for flow indication event
-func (vpv *VoltPortVnet) RemoveFlows(device *VoltDevice, flow *of.VoltFlow) error {
+func (vpv *VoltPortVnet) RemoveFlows(cntx context.Context, device *VoltDevice, flow *of.VoltFlow) error {
vpv.PendingFlowLock.Lock()
defer vpv.PendingFlowLock.Unlock()
@@ -2902,11 +2910,11 @@
device.RegisterFlowDelEvent(cookie, fe)
vpv.PendingDeleteFlow[cookie] = true
}
- return cntlr.GetController().DelFlows(vpv.Port, device.Name, flow)
+ return cntlr.GetController().DelFlows(cntx, vpv.Port, device.Name, flow)
}
//CheckAndDeleteVpv - remove VPV from DB is there are no pending flows to be removed
-func (vpv *VoltPortVnet) CheckAndDeleteVpv() {
+func (vpv *VoltPortVnet) CheckAndDeleteVpv(cntx context.Context) {
vpv.PendingFlowLock.RLock()
defer vpv.PendingFlowLock.RUnlock()
if !vpv.DeleteInProgress {
@@ -2914,24 +2922,24 @@
}
if len(vpv.PendingDeleteFlow) == 0 && !vpv.FlowsApplied {
logger.Infow(ctx, "All Flows removed for VPV. Triggering VPV Deletion from DB", log.Fields{"VPV Port": vpv.Port, "Device": vpv.Device, "Vnet": vpv.VnetName})
- vpv.DelFromDb()
+ vpv.DelFromDb(cntx)
logger.Infow(ctx, "Deleted VPV from DB/Cache successfully", log.Fields{"VPV Port": vpv.Port, "Device": vpv.Device, "Vnet": vpv.VnetName})
}
}
//FlowRemoveSuccess - Process flow success indication
-func (vpv *VoltPortVnet) FlowRemoveSuccess(cookie string, device string) {
+func (vpv *VoltPortVnet) FlowRemoveSuccess(cntx context.Context, cookie string, device string) {
vpv.PendingFlowLock.Lock()
logger.Infow(ctx, "VPV Flow Remove Success Notification", log.Fields{"Port": vpv.Port, "Cookie": cookie, "Device": device})
delete(vpv.PendingDeleteFlow, cookie)
vpv.PendingFlowLock.Unlock()
- vpv.CheckAndDeleteVpv()
- vpv.WriteToDb()
+ vpv.CheckAndDeleteVpv(cntx)
+ vpv.WriteToDb(cntx)
}
//FlowRemoveFailure - Process flow failure indication and triggers Del HSIA failure for all associated services
-func (vpv *VoltPortVnet) FlowRemoveFailure(cookie string, device string, errorCode uint32, errReason string) {
+func (vpv *VoltPortVnet) FlowRemoveFailure(cntx context.Context, cookie string, device string, errorCode uint32, errReason string) {
vpv.PendingFlowLock.Lock()
logger.Errorw(ctx, "VPV Flow Remove Failure Notification", log.Fields{"Port": vpv.Port, "Cookie": cookie, "ErrorCode": errorCode, "ErrorReason": errReason, "Device": device})
@@ -2947,15 +2955,15 @@
if vpv.DeleteInProgress {
delete(vpv.PendingDeleteFlow, cookie)
vpv.PendingFlowLock.Unlock()
- vpv.CheckAndDeleteVpv()
+ vpv.CheckAndDeleteVpv(cntx)
} else {
vpv.PendingFlowLock.Unlock()
- vpv.WriteToDb()
+ vpv.WriteToDb(cntx)
}
}
//RemoveFlows - Triggers flow deletion after registering for flow indication event
-func (vv *VoltVnet) RemoveFlows(device *VoltDevice, flow *of.VoltFlow) error {
+func (vv *VoltVnet) RemoveFlows(cntx context.Context, device *VoltDevice, flow *of.VoltFlow) error {
vv.VnetLock.Lock()
defer vv.VnetLock.Unlock()
@@ -2978,12 +2986,12 @@
flowMap[cookie] = true
vv.PendingDeleteFlow[device.Name] = flowMap
}
- vv.WriteToDb()
- return cntlr.GetController().DelFlows(device.NniPort, device.Name, flow)
+ vv.WriteToDb(cntx)
+ return cntlr.GetController().DelFlows(cntx, device.NniPort, device.Name, flow)
}
//CheckAndDeleteVnet - remove Vnet from DB is there are no pending flows to be removed
-func (vv *VoltVnet) CheckAndDeleteVnet(device string) {
+func (vv *VoltVnet) CheckAndDeleteVnet(cntx context.Context, device string) {
if !vv.DeleteInProgress {
return
}
@@ -2991,7 +2999,7 @@
if len(vv.PendingDeleteFlow[device]) == 0 && !vv.isAssociatedPortsPresent() {
logger.Warnw(ctx, "Deleting Vnet : All flows removed", log.Fields{"Name": vv.Name, "AssociatedPorts": vv.AssociatedPorts, "Device": device})
GetApplication().deleteVnetConfig(vv)
- _ = db.DelVnet(vv.Name)
+ _ = db.DelVnet(cntx, vv.Name)
logger.Infow(ctx, "Deleted Vnet from DB/Cache successfully", log.Fields{"Device": device, "Vnet": vv.Name})
} else {
logger.Warnw(ctx, "Skipping Del Vnet", log.Fields{"Name": vv.Name, "AssociatedPorts": vv.AssociatedPorts, "PendingDelFlows": vv.PendingDeleteFlow[device]})
@@ -3000,7 +3008,7 @@
}
//FlowRemoveSuccess - Process flow success indication
-func (vv *VoltVnet) FlowRemoveSuccess(cookie string, device string) {
+func (vv *VoltVnet) FlowRemoveSuccess(cntx context.Context, cookie string, device string) {
vv.VnetLock.Lock()
defer vv.VnetLock.Unlock()
@@ -3014,14 +3022,14 @@
if d := GetApplication().GetDevice(device); d != nil {
_, present := d.ConfiguredVlanForDeviceFlows.Get(VnetKey(vv.SVlan, vv.CVlan, 0))
if !present && len(vv.PendingDeleteFlow[device]) == 0 {
- vv.CheckAndDeleteVnet(device)
+ vv.CheckAndDeleteVnet(cntx, device)
}
}
- vv.WriteToDb()
+ vv.WriteToDb(cntx)
}
//FlowRemoveFailure - Process flow failure indication
-func (vv *VoltVnet) FlowRemoveFailure(cookie string, device string, errorCode uint32, errReason string) {
+func (vv *VoltVnet) FlowRemoveFailure(cntx context.Context, cookie string, device string, errorCode uint32, errReason string) {
vv.VnetLock.Lock()
defer vv.VnetLock.Unlock()
@@ -3032,7 +3040,7 @@
if vv.DeleteInProgress {
delete(vv.PendingDeleteFlow[device], cookie)
- vv.CheckAndDeleteVnet(device)
+ vv.CheckAndDeleteVnet(cntx, device)
}
return
}
@@ -3107,7 +3115,7 @@
}
//TriggerAssociatedFlowDelete - Re-trigger delete for pending delete flows
-func (vv *VoltVnet) TriggerAssociatedFlowDelete(device string) bool {
+func (vv *VoltVnet) TriggerAssociatedFlowDelete(cntx context.Context, device string) bool {
vv.VnetLock.Lock()
cookieList := []uint64{}
flowMap := vv.PendingDeleteFlow[device]
@@ -3129,7 +3137,7 @@
subFlow.Cookie = cookie
flow.SubFlows[cookie] = subFlow
logger.Infow(ctx, "Retriggering Vnet Delete Flow", log.Fields{"Device": device, "Vnet": vv.Name, "Cookie": cookie})
- if err := vv.RemoveFlows(vd, flow); err != nil {
+ if err := vv.RemoveFlows(cntx, vd, flow); err != nil {
logger.Warnw(ctx, "Vnet Delete Flow Failed", log.Fields{"Device": device, "Vnet": vv.Name, "Cookie": cookie, "Error": err})
}
}