Adding remove flows on disable flag
Change-Id: Ia31266c3a959bb1049051de7c6e108e3a6cc6d20
diff --git a/database/common.go b/database/common.go
index 653fccc..ea8f3b6 100644
--- a/database/common.go
+++ b/database/common.go
@@ -63,6 +63,7 @@
PortAlarmDataPath string = DevicePortPath + "portalarmdata/"
SubAlarmDataPath string = DevicePath + "sub-alarm-data/"
ServicesMigrateReqPath string = DevicePath + "migrateServicesReq/"
+ OltFlowServicePath string = "olt-flow-service/"
)
//PresentVersionMap - map of present version for all database tables
@@ -97,6 +98,7 @@
PortAlarmDataPath: "v1",
SubAlarmDataPath: "v1",
ServicesMigrateReqPath: "v1",
+ OltFlowServicePath: "v1",
}
//PreviousVersionMap - map of previous version for all database tables
@@ -131,6 +133,7 @@
PortAlarmDataPath: "v1",
SubAlarmDataPath: "v1",
ServicesMigrateReqPath: "v1",
+ OltFlowServicePath: "v1",
}
//DBVersionMap - Version of tables present in DB
diff --git a/database/database.go b/database/database.go
index 664dce9..cf673eb 100644
--- a/database/database.go
+++ b/database/database.go
@@ -1042,7 +1042,22 @@
logger.Infow(ctx, "Deleting all the Update Vnet Requests for device", log.Fields{"device": deviceID})
return nil
}
+// PutOltFlowService to add OltFlowService info
+func (db *Database) PutOltFlowService(ctx context.Context, value string) error {
+ key := GetKeyPath(OltFlowServicePath)
+ if err := db.kvc.Put(ctx, key, value); err != nil {
+ logger.Warnw(ctx, "Put OltFlowService failed", log.Fields{"key": key})
+ return err
+ }
+ return nil
+}
+
+// GetOltFlowService to get OltFlowService info
+func (db *Database) GetOltFlowService(ctx context.Context) (string, error) {
+ key := GetKeyPath(OltFlowServicePath)
+ return db.Get(ctx, key)
+}
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
diff --git a/database/dbintf.go b/database/dbintf.go
index 8a6a43b..5de5761 100644
--- a/database/dbintf.go
+++ b/database/dbintf.go
@@ -153,6 +153,8 @@
GetAllMigrateServicesReq(ctx context.Context, deviceID string) (map[string]*kvstore.KVPair, error)
DelMigrateServicesReq(ctx context.Context, deviceID string, vlan string) error
DelAllMigrateServicesReq(ctx context.Context, deviceID string) error
+ PutOltFlowService(ctx context.Context, value string) error
+ GetOltFlowService(ctx context.Context) (string, error)
}
//GetDatabase - returns databse operation based on configuration
diff --git a/internal/pkg/application/application.go b/internal/pkg/application/application.go
index 04f4a6a..fa9e649 100644
--- a/internal/pkg/application/application.go
+++ b/internal/pkg/application/application.go
@@ -328,6 +328,19 @@
return nil
}
+// GetPortByPortID to get port information from the device.
+func (d *VoltDevice) GetPortNameFromPortID(portID uint32) string {
+ portName := ""
+ d.Ports.Range(func(key, value interface{}) bool {
+ vp := value.(*VoltPort)
+ if vp.ID == portID {
+ portName = vp.Name
+ }
+ return true
+ })
+ return portName
+}
+
// DelPort to delete port from the device
func (d *VoltDevice) DelPort(port string) {
if _, ok := d.Ports.Load(port); ok {
@@ -429,6 +442,7 @@
VoltPortVnetsToDelete map[*VoltPortVnet]bool
PortAlarmProfileCache map[string]map[string]int // [portAlarmID][ThresholdLevelString]ThresholdLevel
vendorID string
+ OltFlowServiceConfig OltFlowService
}
// PonPortCfg contains NB port config and activeIGMPChannels count
@@ -681,6 +695,8 @@
va.RestoreIgmpGroupsFromDb(cntx)
logger.Info(ctx, "Reading Upgrade status from DB")
va.RestoreUpgradeStatus(cntx)
+ logger.Info(ctx, "Reading OltFlowService from DB")
+ va.RestoreOltFlowService(cntx)
logger.Info(ctx, "Reconciled from DB")
}
@@ -815,6 +831,19 @@
logger.Infow(ctx, "Port state is UP. Trigerring Port Down Ind before deleting", log.Fields{"Port": p})
va.PortDownInd(cntx, device, port)
}
+ // if RemoveFlowsOnDisable is flase, then flows will be existing till port delete. Remove the flows now
+ if !va.OltFlowServiceConfig.RemoveFlowsOnDisable {
+ vpvs, ok := va.VnetsByPort.Load(port)
+ if !ok || nil == vpvs || len(vpvs.([]*VoltPortVnet)) == 0 {
+ logger.Infow(ctx, "No VNETs on port", log.Fields{"Device": device, "Port": port})
+ } else {
+ for _, vpv := range vpvs.([]*VoltPortVnet) {
+ vpv.VpvLock.Lock()
+ vpv.PortDownInd(cntx, device, port, true)
+ vpv.VpvLock.Unlock()
+ }
+ }
+ }
va.portLock.Lock()
defer va.portLock.Unlock()
d.DelPort(port)
@@ -1264,7 +1293,7 @@
for _, vpv := range vpvs.([]*VoltPortVnet) {
vpv.VpvLock.Lock()
logger.Warnw(ctx, "Removing existing VPVs/Services flows for for Subscriber: UNI Detected on wrong PON", log.Fields{"Port": vpv.Port, "Vnet": vpv.VnetName})
- vpv.PortDownInd(cntx, device, port)
+ vpv.PortDownInd(cntx, device, port, false)
if vpv.IgmpEnabled {
va.ReceiverDownInd(cntx, device, port)
}
@@ -1395,7 +1424,7 @@
*/
for _, vpv := range vpvs.([]*VoltPortVnet) {
vpv.VpvLock.Lock()
- vpv.PortDownInd(cntx, device, port)
+ vpv.PortDownInd(cntx, device, port, false)
if vpv.IgmpEnabled {
va.ReceiverDownInd(cntx, device, port)
}
@@ -2060,3 +2089,38 @@
}
}
}
+
+type OltFlowService struct {
+ EnableDhcpOnNni bool `json:"enableDhcpOnNni"`
+ DefaultTechProfileId int `json:"defaultTechProfileId"`
+ EnableIgmpOnNni bool `json:"enableIgmpOnNni"`
+ EnableEapol bool `json:"enableEapol"`
+ EnableDhcpV6 bool `json:"enableDhcpV6"`
+ EnableDhcpV4 bool `json:"enableDhcpV4"`
+ RemoveFlowsOnDisable bool `json:"removeFlowsOnDisable"`
+}
+
+func (va *VoltApplication) UpdateOltFlowService(cntx context.Context, oltFlowService OltFlowService) {
+ logger.Infow(ctx, "UpdateOltFlowService", log.Fields{"oldValue": va.OltFlowServiceConfig, "newValue": oltFlowService})
+ va.OltFlowServiceConfig = oltFlowService
+ b, err := json.Marshal(va.OltFlowServiceConfig)
+ if err != nil {
+ logger.Warnw(ctx, "Failed to Marshal OltFlowServiceConfig", log.Fields{"OltFlowServiceConfig": va.OltFlowServiceConfig})
+ return
+ }
+ _ = db.PutOltFlowService(cntx, string(b))
+}
+// RestoreOltFlowService to read from the DB and restore olt flow service config
+func (va *VoltApplication) RestoreOltFlowService(cntx context.Context) {
+ oltflowService, err := db.GetOltFlowService(cntx)
+ if err != nil {
+ logger.Warnw(ctx, "Failed to Get OltFlowServiceConfig from DB", log.Fields{"Error": err})
+ return
+ }
+ err = json.Unmarshal([]byte(oltflowService), &va.OltFlowServiceConfig)
+ if err != nil {
+ logger.Warn(ctx, "Unmarshal of oltflowService failed")
+ return
+ }
+ logger.Infow(ctx, "updated OltFlowServiceConfig from DB", log.Fields{"OltFlowServiceConfig": va.OltFlowServiceConfig})
+}
diff --git a/internal/pkg/application/service.go b/internal/pkg/application/service.go
index d710298..4ddd45d 100644
--- a/internal/pkg/application/service.go
+++ b/internal/pkg/application/service.go
@@ -2094,18 +2094,6 @@
// DeactivateService to activate pre-provisioned service
func (va *VoltApplication) DeactivateService(cntx context.Context, deviceID, portNo string, sVlan, cVlan of.VlanType, tpID uint16) error {
logger.Infow(ctx, "Service Deactivate Request ", log.Fields{"Device": deviceID, "Port": portNo})
- device, err := va.GetDeviceFromPort(portNo)
- if err != nil {
- logger.Errorw(ctx, "Error Getting Device", log.Fields{"Reason": err.Error(), "Port": portNo})
- return errorCodes.ErrPortNotFound
- }
- // If device id is not provided check only port number
- if deviceID == DeviceAny {
- deviceID = device.Name
- } else if deviceID != device.Name {
- logger.Errorw(ctx, "Wrong Device ID in request", log.Fields{"Device": deviceID, "Port": portNo})
- return errorCodes.ErrDeviceNotFound
- }
va.ServiceByName.Range(func(key, value interface{}) bool {
vs := value.(*VoltService)
// If svlan if provided, then the tags and tpID of service has to be matching
@@ -2118,11 +2106,18 @@
vs.IsActivated = false
va.ServiceByName.Store(vs.Name, vs)
vs.WriteToDb(cntx)
+ device, err := va.GetDeviceFromPort(portNo)
+ if err != nil {
+ // Even if the port/device does not exists at this point in time, the deactivate request is succss.
+ // So no error is returned
+ logger.Infow(ctx, "Error Getting Device", log.Fields{"Reason": err.Error(), "Port": portNo})
+ return true
+ }
p := device.GetPort(vs.Port)
- if p != nil && p.State == PortStateUp {
+ if p != nil && (p.State == PortStateUp || !va.OltFlowServiceConfig.RemoveFlowsOnDisable){
if vpv := va.GetVnetByPort(vs.Port, vs.SVlan, vs.CVlan, vs.UniVlan); vpv != nil {
// Port down call internally deletes all the flows
- vpv.PortDownInd(cntx, deviceID, portNo)
+ vpv.PortDownInd(cntx, deviceID, portNo, true)
if vpv.IgmpEnabled {
va.ReceiverDownInd(cntx, deviceID, portNo)
}
@@ -2135,6 +2130,7 @@
})
return nil
}
+
/* GetServicePbit to get first set bit in the pbit map
returns -1 : If configured to match on all pbits
returns 8 : If no pbits are configured
diff --git a/internal/pkg/application/vnets.go b/internal/pkg/application/vnets.go
index 85de6dc..dffbb00 100644
--- a/internal/pkg/application/vnets.go
+++ b/internal/pkg/application/vnets.go
@@ -817,8 +817,12 @@
// PortDownInd : When the port status changes to down, we delete all configured flows
// The same indication is also passed to the services enqueued for them
// to take appropriate actions
-func (vpv *VoltPortVnet) PortDownInd(cntx context.Context, device string, port string) {
+func (vpv *VoltPortVnet) PortDownInd(cntx context.Context, device string, port string, nbRequest bool) {
+ if !nbRequest && !GetApplication().OltFlowServiceConfig.RemoveFlowsOnDisable {
+ logger.Info(ctx, "VPV Port DOWN Ind, Not deleting flows since RemoveOnDisable is disabled")
+ return
+ }
logger.Infow(ctx, "VPV Port DOWN Ind, deleting all flows for services",
log.Fields{"service count": vpv.servicesCount.Load()})
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
index ea4ccb3..8d185a3 100644
--- a/internal/pkg/controller/device.go
+++ b/internal/pkg/controller/device.go
@@ -496,10 +496,11 @@
if p.State == PortStateUp {
GetController().PortDownInd(cntx, d.ID, p.Name)
}
+ GetController().PortDelInd(cntx, d.ID, p.Name)
+
d.portLock.Lock()
defer d.portLock.Unlock()
- GetController().PortDelInd(cntx, d.ID, p.Name)
delete(d.PortsByID, p.ID)
delete(d.PortsByName, p.Name)
d.DelPortFromDb(cntx, p.ID)
diff --git a/voltha-go-controller/nbi/rest.go b/voltha-go-controller/nbi/rest.go
index 53d0a06..7edafdc 100644
--- a/voltha-go-controller/nbi/rest.go
+++ b/voltha-go-controller/nbi/rest.go
@@ -55,6 +55,7 @@
MetersByIdPath string = "/meters/{id}"
GroupsPath string = "/groups"
GroupsByIdPath string = "/groups/{id}"
+ OltFlowServicePath string = "/oltflowservice"
)
// RestStart to execute for API
@@ -86,6 +87,7 @@
mu.HandleFunc(MetersByIdPath, (&onos_nbi.MetersHandle{}).MeterServeHTTP)
mu.HandleFunc(GroupsPath, (&onos_nbi.GroupsHandle{}).GroupServeHTTP)
mu.HandleFunc(GroupsByIdPath, (&onos_nbi.GroupsHandle{}).GroupServeHTTP)
+ mu.HandleFunc(OltFlowServicePath, (&onos_nbi.OltFlowServiceHandle{}).ServeHTTP)
err := http.ListenAndServe(":8181", mu)
logger.Infow(ctx, "Rest Server Started", log.Fields{"Error": err})
diff --git a/voltha-go-controller/onos_nbi/oltapprestadapter.go b/voltha-go-controller/onos_nbi/oltapprestadapter.go
index d781ddf..05f5e16 100644
--- a/voltha-go-controller/onos_nbi/oltapprestadapter.go
+++ b/voltha-go-controller/onos_nbi/oltapprestadapter.go
@@ -108,7 +108,26 @@
}
if len(deviceID) > 0 && len(portNo) > 0 {
- if err := app.GetApplication().ActivateService(cntx, deviceID, portNo, of.VlanNone, of.VlanNone, 0); err != nil {
+ va := app.GetApplication()
+ port, err := strconv.Atoi(portNo)
+ if err != nil {
+ logger.Warnw(ctx, "Wrong port number value", log.Fields{"portNo": portNo})
+ http.Error(w, err.Error(), http.StatusConflict)
+ return
+ }
+ device := va.GetDevice(deviceID)
+ if device == nil {
+ logger.Warnw(ctx, "Device does not exists", log.Fields{"deviceID": deviceID})
+ http.Error(w, err.Error(), http.StatusConflict)
+ return
+ }
+ portName := device.GetPortNameFromPortID(uint32(port))
+ if len(portName) == 0 {
+ logger.Warnw(ctx, "Port does not exists", log.Fields{"deviceID": deviceID})
+ http.Error(w, err.Error(), http.StatusConflict)
+ return
+ }
+ if err := va.ActivateService(cntx, deviceID, portName, of.VlanNone, of.VlanNone, 0); err != nil {
logger.Warnw(ctx, "ActivateService Failed", log.Fields{ "deviceID": deviceID, "Port": portNo})
http.Error(w, err.Error(), http.StatusBadRequest)
}
@@ -129,7 +148,26 @@
}
if len(deviceID) > 0 && len(portNo) > 0 {
- if err := app.GetApplication().DeactivateService(cntx, deviceID, portNo, of.VlanNone, of.VlanNone, 0); err != nil {
+ va := app.GetApplication()
+ port, err := strconv.Atoi(portNo)
+ if err != nil {
+ logger.Warnw(ctx, "Wrong port number value", log.Fields{"portNo": portNo})
+ http.Error(w, err.Error(), http.StatusConflict)
+ return
+ }
+ device := va.GetDevice(deviceID)
+ if device == nil {
+ logger.Warnw(ctx, "Device does not exists", log.Fields{"deviceID": deviceID})
+ http.Error(w, err.Error(), http.StatusConflict)
+ return
+ }
+ portName := device.GetPortNameFromPortID(uint32(port))
+ if len(portName) == 0 {
+ logger.Warnw(ctx, "Port does not exists", log.Fields{"deviceID": deviceID})
+ http.Error(w, err.Error(), http.StatusConflict)
+ return
+ }
+ if err := va.DeactivateService(cntx, deviceID, portName, of.VlanNone, of.VlanNone, 0); err != nil {
logger.Warnw(ctx, "DeactivateService Failed", log.Fields{ "deviceID": deviceID, "Port": portNo})
http.Error(w, err.Error(), http.StatusBadRequest)
}
diff --git a/voltha-go-controller/onos_nbi/oltflowservice.go b/voltha-go-controller/onos_nbi/oltflowservice.go
new file mode 100644
index 0000000..fb19e68
--- /dev/null
+++ b/voltha-go-controller/onos_nbi/oltflowservice.go
@@ -0,0 +1,61 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package onos_nbi
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "net/http"
+
+ app "voltha-go-controller/internal/pkg/application"
+ "voltha-go-controller/log"
+)
+
+// OltFlowServiceHandle handles OltFlowService Requests
+type OltFlowServiceHandle struct {
+}
+
+// ServeHTTP to serve HTTP requests
+func (oh *OltFlowServiceHandle) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ logger.Infow(ctx, "Received-northbound-request", log.Fields{"Method": r.Method, "URL": r.URL})
+ switch r.Method {
+ case "POST":
+ oh.configureOltFlowService(context.Background(), w, r)
+ default:
+ logger.Warnw(ctx, "Unsupported Method", log.Fields{"Method": r.Method})
+ }
+}
+
+func (oh *OltFlowServiceHandle) configureOltFlowService(cntx context.Context, w http.ResponseWriter, r *http.Request) {
+
+ // Get the payload to process the request
+ d := new(bytes.Buffer)
+ if _, err := d.ReadFrom(r.Body); err != nil {
+ logger.Warnw(ctx, "Error reading buffer", log.Fields{"Reason": err.Error()})
+ return
+ }
+
+ // Unmarshal the request into service configuration structure
+ req := &app.OltFlowService{}
+ if err := json.Unmarshal(d.Bytes(), req); err != nil {
+ logger.Warnw(ctx, "Unmarshal Failed", log.Fields{"Reason": err.Error()})
+ http.Error(w, err.Error(), http.StatusConflict)
+ return
+ }
+ app.GetApplication().UpdateOltFlowService(cntx, *req)
+}
+