blob: 9abbd429a05c49b2867d94d11a1fa33200826c52 [file] [log] [blame]
/*
* Copyright 2022-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"context"
"errors"
"sync"
"time"
"encoding/hex"
"voltha-go-controller/database"
errorCodes "voltha-go-controller/internal/pkg/errorcodes"
"voltha-go-controller/internal/pkg/intf"
"voltha-go-controller/internal/pkg/of"
"voltha-go-controller/internal/pkg/tasks"
"voltha-go-controller/internal/pkg/util"
"voltha-go-controller/internal/pkg/vpagent"
"voltha-go-controller/log"
)
var logger log.CLogger
var ctx = context.TODO()
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
logger, err = log.AddPackageWithDefaultParam()
if err != nil {
panic(err)
}
}
var db database.DBIntf
var deviceTableSyncDuration = 15 * time.Minute
//SetDeviceTableSyncDuration - sets interval between device table sync up activity
// duration - in minutes
func SetDeviceTableSyncDuration(duration int) {
deviceTableSyncDuration = time.Duration(duration) * time.Minute
}
// VoltController structure
type VoltController struct {
rebootLock sync.Mutex
rebootInProgressDevices map[string]string
devices map[string]*Device
deviceLock sync.RWMutex
vagent map[string]*vpagent.VPAgent
ctx context.Context
app intf.App
RebootFlow bool
BlockedDeviceList *util.ConcurrentMap
deviceTaskQueue *util.ConcurrentMap
}
var vcontroller *VoltController
// NewController is the constructor for VoltController
func NewController(ctx context.Context, app intf.App) intf.IVPClientAgent {
var controller VoltController
controller.rebootInProgressDevices = make(map[string]string)
controller.devices = make(map[string]*Device)
controller.deviceLock = sync.RWMutex{}
controller.ctx = ctx
controller.app = app
controller.BlockedDeviceList = util.NewConcurrentMap()
controller.deviceTaskQueue = util.NewConcurrentMap()
db = database.GetDatabase()
vcontroller = &controller
return &controller
}
// AddDevice to add device
func (v *VoltController) AddDevice(config *intf.VPClientCfg) intf.IVPClient {
d := NewDevice(config.DeviceID, config.SerialNum, config.VolthaClient, config.SouthBoundID)
v.devices[config.DeviceID] = d
v.app.AddDevice(d.ID, d.SerialNum, config.SouthBoundID)
d.RestoreMetersFromDb()
d.RestoreGroupsFromDb()
d.RestoreFlowsFromDb()
d.RestorePortsFromDb()
d.ConnectInd(context.TODO(), intf.DeviceDisc)
d.packetOutChannel = config.PacketOutChannel
logger.Warnw(ctx, "Added device", log.Fields{"Device": config.DeviceID, "SerialNo": d.SerialNum, "State": d.State})
return d
}
// DelDevice to delete device
func (v *VoltController) DelDevice(id string) {
d, ok := v.devices[id]
if ok {
delete(v.devices, id)
d.Delete()
}
v.app.DelDevice(id)
d.cancel() // To stop the device tables sync routine
logger.Warnw(ctx, "Deleted device", log.Fields{"Device": id})
}
//AddControllerTask - add task to controller queue
func (v *VoltController) AddControllerTask(device string, task tasks.Task) {
var taskQueueIntf interface{}
var taskQueue *tasks.Tasks
var found bool
if taskQueueIntf, found = v.deviceTaskQueue.Get(device); !found {
taskQueue = tasks.NewTasks(context.TODO())
v.deviceTaskQueue.Set(device, taskQueue)
} else {
taskQueue = taskQueueIntf.(*tasks.Tasks)
}
taskQueue.AddTask(task)
logger.Warnw(ctx, "Task Added to Controller Task List", log.Fields{"Len": taskQueue.NumPendingTasks(), "Total": taskQueue.TotalTasks()})
}
//AddNewDevice - called when new device is discovered. This will be
//processed as part of controller queue
func (v *VoltController) AddNewDevice(config *intf.VPClientCfg) {
adt := NewAddDeviceTask(config)
v.AddControllerTask(config.DeviceID, adt)
}
// GetDevice to get device info
func (v *VoltController) GetDevice(id string) (*Device, error) {
d, ok := v.devices[id]
if ok {
return d, nil
}
return nil, errorCodes.ErrDeviceNotFound
}
// IsRebootInProgressForDevice to check if reboot is in progress for the device
func (v *VoltController) IsRebootInProgressForDevice(device string) bool {
v.rebootLock.Lock()
defer v.rebootLock.Unlock()
_, ok := v.rebootInProgressDevices[device]
return ok
}
// SetRebootInProgressForDevice to set reboot in progress for the device
func (v *VoltController) SetRebootInProgressForDevice(device string) bool {
v.rebootLock.Lock()
defer v.rebootLock.Unlock()
_, ok := v.rebootInProgressDevices[device]
if ok {
return true
}
v.rebootInProgressDevices[device] = device
logger.Warnw(ctx, "Setted Reboot-In-Progress flag", log.Fields{"Device": device})
d, err := v.GetDevice(device)
if err == nil {
d.ResetCache()
} else {
logger.Errorw(ctx, "Failed to get device", log.Fields{"Device": device, "Error": err})
}
return true
}
// ReSetRebootInProgressForDevice to reset reboot in progress for the device
func (v *VoltController) ReSetRebootInProgressForDevice(device string) bool {
v.rebootLock.Lock()
defer v.rebootLock.Unlock()
_, ok := v.rebootInProgressDevices[device]
if !ok {
return true
}
delete(v.rebootInProgressDevices, device)
logger.Warnw(ctx, "Resetted Reboot-In-Progress flag", log.Fields{"Device": device})
return true
}
// DeviceRebootInd is device reboot indication
func (v *VoltController) DeviceRebootInd(dID string, srNo string, sbID string) {
v.app.DeviceRebootInd(dID, srNo, sbID)
_ = db.DelAllRoutesForDevice(dID)
_ = db.DelAllGroup(dID)
_ = db.DelAllMeter(dID)
_ = db.DelAllPONCounters(dID)
}
// DeviceDisableInd is device deactivation indication
func (v *VoltController) DeviceDisableInd(dID string) {
v.app.DeviceDisableInd(dID)
}
//TriggerPendingProfileDeleteReq - trigger pending profile delete requests
func (v *VoltController) TriggerPendingProfileDeleteReq(device string) {
v.app.TriggerPendingProfileDeleteReq(device)
}
//TriggerPendingMigrateServicesReq - trigger pending services migration requests
func (v *VoltController) TriggerPendingMigrateServicesReq(device string) {
v.app.TriggerPendingMigrateServicesReq(device)
}
// SetAuditFlags to set the audit flags
func (v *VoltController) SetAuditFlags(device *Device) {
v.app.SetRebootFlag(true)
device.auditInProgress = true
}
// ResetAuditFlags to reset the audit flags
func (v *VoltController) ResetAuditFlags(device *Device) {
v.app.SetRebootFlag(false)
device.auditInProgress = false
}
//ProcessFlowModResultIndication - send flow mod result notification
func (v *VoltController) ProcessFlowModResultIndication(flowStatus intf.FlowStatus) {
v.app.ProcessFlowModResultIndication(flowStatus)
}
// AddVPAgent to add the vpagent
func (v *VoltController) AddVPAgent(vep string, vpa *vpagent.VPAgent) {
v.vagent[vep] = vpa
}
// VPAgent to get vpagent info
func (v *VoltController) VPAgent(vep string) (*vpagent.VPAgent, error) {
vpa, ok := v.vagent[vep]
if ok {
return vpa, nil
}
return nil, errors.New("VPA Not Registered")
}
// PacketOutReq for packet out request
func (v *VoltController) PacketOutReq(device string, inport string, outport string, pkt []byte, isCustomPkt bool) error {
logger.Debugw(ctx, "Packet Out Req", log.Fields{"Device": device, "OutPort": outport})
d, err := v.GetDevice(device)
if err != nil {
return err
}
logger.Debugw(ctx, "Packet Out Pkt", log.Fields{"Pkt": hex.EncodeToString(pkt)})
return d.PacketOutReq(inport, outport, pkt, isCustomPkt)
}
// AddFlows to add flows
func (v *VoltController) AddFlows(port string, device string, flow *of.VoltFlow) error {
d, err := v.GetDevice(device)
if err != nil {
logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
return err
}
devPort := d.GetPortByName(port)
if devPort == nil {
logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
return errorCodes.ErrPortNotFound
}
if d.ctx == nil {
//FIXME: Application should know the context before it could submit task. Handle at application level
logger.Errorw(ctx, "Context is missing. AddFlow Operation Not added to Task", log.Fields{"Device": device})
return errorCodes.ErrInvalidParamInRequest
}
var isMigrationRequired bool
if flow.MigrateCookie {
// flow migration to new cookie must be done only during the audit. Migration for all subflows must be done if
// atlease one subflow with old cookie found in the device.
for _, subFlow := range flow.SubFlows {
if isMigrationRequired = d.IsFlowPresentWithOldCookie(subFlow); isMigrationRequired {
break
}
}
}
if isMigrationRequired {
// In this case, the flow is updated in local cache and db here.
// Actual flow deletion and addition at voltha will happen during flow tables audit.
for _, subFlow := range flow.SubFlows {
logger.Debugw(ctx, "Cookie Migration Required", log.Fields{"OldCookie": subFlow.OldCookie, "NewCookie": subFlow.Cookie})
if err := d.DelFlowWithOldCookie(subFlow); err != nil {
logger.Errorw(ctx, "Delete flow with old cookie failed", log.Fields{"Error": err, "OldCookie": subFlow.OldCookie})
}
if err := d.AddFlow(subFlow); err != nil {
logger.Errorw(ctx, "Flow Add Failed", log.Fields{"Error": err, "Cookie": subFlow.Cookie})
}
}
} else {
flow.Command = of.CommandAdd
d.UpdateFlows(flow, devPort)
for cookie := range flow.SubFlows {
logger.Debugw(ctx, "Flow Add added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
}
}
return nil
}
// DelFlows to delete flows
func (v *VoltController) DelFlows(port string, device string, flow *of.VoltFlow) error {
d, err := v.GetDevice(device)
if err != nil {
logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
return err
}
devPort := d.GetPortByName(port)
if devPort == nil {
logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
return errorCodes.ErrPortNotFound
}
if d.ctx == nil {
//FIXME: Application should know the context before it could submit task. Handle at application level
logger.Errorw(ctx, "Context is missing. DelFlow Operation Not added to Task", log.Fields{"Device": device})
return errorCodes.ErrInvalidParamInRequest
}
var isMigrationRequired bool
if flow.MigrateCookie {
// flow migration to new cookie must be done only during the audit. Migration for all subflows must be done if
// atlease one subflow with old cookie found in the device.
for _, subFlow := range flow.SubFlows {
if isMigrationRequired = d.IsFlowPresentWithOldCookie(subFlow); isMigrationRequired {
break
}
}
}
if isMigrationRequired {
// In this case, the flow is deleted from local cache and db here.
// Actual flow deletion at voltha will happen during flow tables audit.
for _, subFlow := range flow.SubFlows {
logger.Debugw(ctx, "Old Cookie delete Required", log.Fields{"OldCookie": subFlow.OldCookie})
if err := d.DelFlowWithOldCookie(subFlow); err != nil {
logger.Errorw(ctx, "DelFlowWithOldCookie failed", log.Fields{"OldCookie": subFlow.OldCookie, "Error": err})
}
}
} else {
flow.Command = of.CommandDel
d.UpdateFlows(flow, devPort)
for cookie := range flow.SubFlows {
logger.Debugw(ctx, "Flow Del added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
}
}
return nil
}
// GroupUpdate for group update
func (v *VoltController) GroupUpdate(port string, device string, group *of.Group) error {
d, err := v.GetDevice(device)
if err != nil {
logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
return err
}
devPort := d.GetPortByName(port)
if devPort == nil {
logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
return errorCodes.ErrPortNotFound
}
if d.ctx == nil {
//FIXME: Application should know the context before it could submit task. Handle at application level
logger.Errorw(ctx, "Context is missing. GroupMod Operation Not added to task", log.Fields{"Device": device})
return errorCodes.ErrInvalidParamInRequest
}
d.UpdateGroup(group, devPort)
return nil
}
// ModMeter to get mod meter info
func (v *VoltController) ModMeter(port string, device string, command of.MeterCommand, meter *of.Meter) error {
d, err := v.GetDevice(device)
if err != nil {
logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
return err
}
devPort := d.GetPortByName(port)
if devPort == nil {
logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
return errorCodes.ErrPortNotFound
}
d.ModMeter(command, meter, devPort)
return nil
}
// PortAddInd for port add indication
func (v *VoltController) PortAddInd(device string, id uint32, name string) {
v.app.PortAddInd(device, id, name)
}
// PortDelInd for port delete indication
func (v *VoltController) PortDelInd(device string, port string) {
v.app.PortDelInd(device, port)
}
// PortUpdateInd for port update indication
func (v *VoltController) PortUpdateInd(device string, name string, id uint32) {
v.app.PortUpdateInd(device, name, id)
}
// PortUpInd for port up indication
func (v *VoltController) PortUpInd(device string, port string) {
v.app.PortUpInd(device, port)
}
// PortDownInd for port down indication
func (v *VoltController) PortDownInd(device string, port string) {
v.app.PortDownInd(device, port)
}
// DeviceUpInd for device up indication
func (v *VoltController) DeviceUpInd(device string) {
v.app.DeviceUpInd(device)
}
// DeviceDownInd for device down indication
func (v *VoltController) DeviceDownInd(device string) {
v.app.DeviceDownInd(device)
}
// PacketInInd for packet in indication
func (v *VoltController) PacketInInd(device string, port string, data []byte) {
v.app.PacketInInd(device, port, data)
}
// GetPortState to get port status
func (v *VoltController) GetPortState(device string, name string) (PortState, error) {
d, err := v.GetDevice(device)
if err != nil {
logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
return PortStateDown, err
}
return d.GetPortState(name)
}
// UpdateMvlanProfiles for update mvlan profiles
func (v *VoltController) UpdateMvlanProfiles(device string) {
v.app.UpdateMvlanProfilesForDevice(device)
}
// GetController to get controller
func GetController() *VoltController {
return vcontroller
}
/*
// PostIndication to post indication
func (v *VoltController) PostIndication(device string, task interface{}) error {
var srvTask AddServiceIndTask
var portTask AddPortIndTask
var taskCommon tasks.Task
var isSvcTask bool
switch data := task.(type) {
case *AddServiceIndTask:
srvTask = *data
taskCommon = data
isSvcTask = true
case *AddPortIndTask:
portTask = *data
taskCommon = data
}
d, err := v.GetDevice(device)
if err != nil {
logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
//It means device itself it not present so just post the indication directly
if isSvcTask {
msgbus.PostAccessConfigInd(srvTask.result, d.SerialNum, srvTask.indicationType, srvTask.serviceName, 0, srvTask.reason, srvTask.trigger, srvTask.portState)
} else {
msgbus.ProcessPortInd(portTask.indicationType, d.SerialNum, portTask.portName, portTask.accessConfig, portTask.serviceList)
}
return err
}
if taskCommon != nil {
d.AddTask(taskCommon)
}
return nil
}
*/
// GetTaskList to get the task list
func (v *VoltController) GetTaskList(device string) []tasks.Task {
d, err := v.GetDevice(device)
if err != nil || d.ctx == nil {
logger.Errorw(ctx, "Device Not Connected/Found", log.Fields{"Device": device, "Dev Obj": d})
return []tasks.Task{}
}
return d.GetTaskList()
}
// AddBlockedDevices to add devices to blocked devices list
func (v *VoltController) AddBlockedDevices(deviceSerialNumber string) {
v.BlockedDeviceList.Set(deviceSerialNumber, deviceSerialNumber)
}
// DelBlockedDevices to remove device from blocked device list
func (v *VoltController) DelBlockedDevices(deviceSerialNumber string) {
v.BlockedDeviceList.Remove(deviceSerialNumber)
}
// IsBlockedDevice to check if device is blocked
func (v *VoltController) IsBlockedDevice(deviceSerialNumber string) bool {
_, ifPresent := v.BlockedDeviceList.Get(deviceSerialNumber)
return ifPresent
}