blob: 129b9358a83db346b59d219e355b14bb1dd4e12c [file] [log] [blame]
/*
* Copyright 2022-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
infraerror "voltha-go-controller/internal/pkg/errorcodes"
"voltha-go-controller/database"
"voltha-go-controller/internal/pkg/holder"
"voltha-go-controller/internal/pkg/intf"
"voltha-go-controller/internal/pkg/of"
//"voltha-go-controller/internal/pkg/vpagent"
"voltha-go-controller/internal/pkg/tasks"
"voltha-go-controller/internal/pkg/util"
"voltha-go-controller/log"
ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
"github.com/opencord/voltha-protos/v5/go/voltha"
)
// PortState type
type PortState string
const (
// PortStateDown constant
PortStateDown PortState = "DOWN"
// PortStateUp constant
PortStateUp PortState = "UP"
// DefaultMaxFlowQueues constant
DefaultMaxFlowQueues = 67
//ErrDuplicateFlow - indicates flow already exists in DB
ErrDuplicateFlow string = "duplicate flow"
//Unknown_Port_ID - indicates that the port id is unknown
Unknown_Port_ID = "unknown port id"
//Duplicate_Port - indicates the port is already exist in controller
Duplicate_Port = "duplicate port"
)
// DevicePort structure
type DevicePort struct {
Name string
State PortState
Version string
HwAddr string
tasks.Tasks
CurrSpeed uint32
MaxSpeed uint32
ID uint32
}
// NewDevicePort is the constructor for DevicePort
func NewDevicePort(mp *ofp.OfpPort) *DevicePort {
var port DevicePort
port.ID = mp.PortNo
port.Name = mp.Name
//port.HwAddr = strings.Trim(strings.Join(strings.Fields(fmt.Sprint("%02x", mp.HwAddr)), ":"), "[]")
port.HwAddr = strings.Trim(strings.ReplaceAll(fmt.Sprintf("%02x", mp.HwAddr), " ", ":"), "[]")
port.CurrSpeed = mp.CurrSpeed
port.MaxSpeed = mp.MaxSpeed
port.State = PortStateDown
return &port
}
// UniIDFlowQueue structure which maintains flows in queue.
type UniIDFlowQueue struct {
tasks.Tasks
ID uint32
}
// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
var flowQueue UniIDFlowQueue
flowQueue.ID = id
return &flowQueue
}
// DeviceState type
type DeviceState string
const (
// DeviceStateUNKNOWN constant
DeviceStateUNKNOWN DeviceState = "UNKNOWN"
// DeviceStateINIT constant
DeviceStateINIT DeviceState = "INIT"
// DeviceStateUP constant
DeviceStateUP DeviceState = "UP"
// DeviceStateDOWN constant
DeviceStateDOWN DeviceState = "DOWN"
// DeviceStateREBOOTED constant
DeviceStateREBOOTED DeviceState = "REBOOTED"
// DeviceStateDISABLED constant
DeviceStateDISABLED DeviceState = "DISABLED"
// DeviceStateDELETED constant
DeviceStateDELETED DeviceState = "DELETED"
)
type DeviceInterface interface {
SetFlowHash(cntx context.Context, hash uint32)
}
// Device structure
type Device struct {
ctx context.Context
cancel context.CancelFunc
vclientHolder *holder.VolthaServiceClientHolder
packetOutChannel chan *ofp.PacketOut
PortsByName map[string]*DevicePort
flows map[uint64]*of.VoltSubFlow
PortsByID map[uint32]*DevicePort
meters map[uint32]*of.Meter
flowQueue map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
SouthBoundID string
MfrDesc string
HwDesc string
SwDesc string
ID string
SerialNum string
State DeviceState
TimeStamp time.Time
groups sync.Map //map[uint32]*of.Group -> [GroupId : Group]
tasks.Tasks
portLock sync.RWMutex
flowLock sync.RWMutex
meterLock sync.RWMutex
flowQueueLock sync.RWMutex
flowHash uint32
auditInProgress bool
deviceAuditInProgress bool
}
// NewDevice is the constructor for Device
func NewDevice(cntx context.Context, id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID, mfr, hwDesc, swDesc string) *Device {
var device Device
device.ID = id
device.SerialNum = slno
device.State = DeviceStateDOWN
device.PortsByID = make(map[uint32]*DevicePort)
device.PortsByName = make(map[string]*DevicePort)
device.vclientHolder = vclientHldr
device.flows = make(map[uint64]*of.VoltSubFlow)
device.meters = make(map[uint32]*of.Meter)
device.flowQueue = make(map[uint32]*UniIDFlowQueue)
// Get the flowhash from db and update the flowhash variable in the device.
device.SouthBoundID = southBoundID
device.MfrDesc = mfr
device.HwDesc = hwDesc
device.SwDesc = swDesc
device.TimeStamp = time.Now()
flowHash, err := db.GetFlowHash(cntx, id)
if err != nil {
device.flowHash = DefaultMaxFlowQueues
} else {
var hash uint32
err = json.Unmarshal([]byte(flowHash), &hash)
if err != nil {
logger.Errorw(ctx, "Failed to unmarshall flowhash", log.Fields{"data": flowHash})
} else {
device.flowHash = hash
}
}
logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
return &device
}
// ResetCache to reset cache
func (d *Device) ResetCache() {
logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
d.flows = make(map[uint64]*of.VoltSubFlow)
d.meters = make(map[uint32]*of.Meter)
d.groups = sync.Map{}
}
// GetFlow - Get the flow from device obj
func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
d.flowLock.RLock()
defer d.flowLock.RUnlock()
logger.Debugw(ctx, "Get Flow", log.Fields{"Cookie": cookie})
flow, ok := d.flows[cookie]
return flow, ok
}
// GetAllFlows - Get the flow from device obj
func (d *Device) GetAllFlows() []*of.VoltSubFlow {
d.flowLock.RLock()
defer d.flowLock.RUnlock()
var flows []*of.VoltSubFlow
logger.Debugw(ctx, "Get All Flows", log.Fields{"deviceID": d.ID})
for _, f := range d.flows {
flows = append(flows, f)
}
return flows
}
// GetAllPendingFlows - Get the flow from device obj
func (d *Device) GetAllPendingFlows() []*of.VoltSubFlow {
d.flowLock.RLock()
defer d.flowLock.RUnlock()
var flows []*of.VoltSubFlow
logger.Debugw(ctx, "Get All Pending Flows", log.Fields{"deviceID": d.ID})
for _, f := range d.flows {
if f.State == of.FlowAddPending {
flows = append(flows, f)
}
}
return flows
}
// AddFlow - Adds the flow to the device and also to the database
func (d *Device) AddFlow(cntx context.Context, flow *of.VoltSubFlow) error {
d.flowLock.Lock()
defer d.flowLock.Unlock()
logger.Debugw(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
if _, ok := d.flows[flow.Cookie]; ok {
return errors.New(ErrDuplicateFlow)
}
d.flows[flow.Cookie] = flow
d.AddFlowToDb(cntx, flow)
return nil
}
// AddFlowToDb is the utility to add the flow to the device
func (d *Device) AddFlowToDb(cntx context.Context, flow *of.VoltSubFlow) {
if b, err := json.Marshal(flow); err == nil {
if err = db.PutFlow(cntx, d.ID, flow.Cookie, string(b)); err != nil {
logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
}
}
}
// DelFlow - Deletes the flow from the device and the database
func (d *Device) DelFlow(cntx context.Context, flow *of.VoltSubFlow) error {
d.flowLock.Lock()
defer d.flowLock.Unlock()
if _, ok := d.flows[flow.Cookie]; ok {
delete(d.flows, flow.Cookie)
d.DelFlowFromDb(cntx, flow.Cookie)
return nil
}
return errors.New("flow does not exist")
}
// DelFlowFromDb is utility to delete the flow from the device
func (d *Device) DelFlowFromDb(cntx context.Context, flowID uint64) {
_ = db.DelFlow(cntx, d.ID, flowID)
}
// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
d.flowLock.RLock()
defer d.flowLock.RUnlock()
if _, ok := d.flows[flow.Cookie]; ok {
return false
} else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
if _, ok := d.flows[flow.OldCookie]; ok {
logger.Debugw(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
return true
}
}
return false
}
// DelFlowWithOldCookie is to delete flow with old cookie.
func (d *Device) DelFlowWithOldCookie(cntx context.Context, flow *of.VoltSubFlow) error {
d.flowLock.Lock()
defer d.flowLock.Unlock()
if _, ok := d.flows[flow.OldCookie]; ok {
logger.Debugw(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
log.Fields{"OldCookie": flow.OldCookie})
delete(d.flows, flow.OldCookie)
d.DelFlowFromDb(cntx, flow.OldCookie)
return nil
}
return errors.New("flow does not exist")
}
// RestoreFlowsFromDb to restore flows from database
func (d *Device) RestoreFlowsFromDb(cntx context.Context) {
flows, _ := db.GetFlows(cntx, d.ID)
for _, flow := range flows {
b, ok := flow.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
d.CreateFlowFromString(b)
}
}
// CreateFlowFromString to create flow from string
func (d *Device) CreateFlowFromString(b []byte) {
var flow of.VoltSubFlow
if err := json.Unmarshal(b, &flow); err == nil {
if _, ok := d.flows[flow.Cookie]; !ok {
logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
d.flows[flow.Cookie] = &flow
} else {
logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
}
} else {
logger.Warn(ctx, "Unmarshal failed")
}
}
// ----------------------------------------------------------
// Database related functionality
// Group operations at the device which include update and delete
// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
func (d *Device) UpdateGroupEntry(cntx context.Context, group *of.Group) {
logger.Debugw(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
d.groups.Store(group.GroupID, group)
d.AddGroupToDb(cntx, group)
}
// AddGroupToDb - Utility to add the group to the device DB
func (d *Device) AddGroupToDb(cntx context.Context, group *of.Group) {
if b, err := json.Marshal(group); err == nil {
logger.Debugw(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
if err = db.PutGroup(cntx, d.ID, group.GroupID, string(b)); err != nil {
logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
}
}
}
// DelGroupEntry - Deletes the group from the device and the database
func (d *Device) DelGroupEntry(cntx context.Context, group *of.Group) {
if _, ok := d.groups.Load(group.GroupID); ok {
d.groups.Delete(group.GroupID)
d.DelGroupFromDb(cntx, group.GroupID)
}
}
// DelGroupFromDb - Utility to delete the Group from the device
func (d *Device) DelGroupFromDb(cntx context.Context, groupID uint32) {
_ = db.DelGroup(cntx, d.ID, groupID)
}
// RestoreGroupsFromDb - restores all groups from DB
func (d *Device) RestoreGroupsFromDb(cntx context.Context) {
logger.Info(ctx, "Restoring Groups")
groups, _ := db.GetGroups(cntx, d.ID)
for _, group := range groups {
b, ok := group.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
d.CreateGroupFromString(b)
}
}
// CreateGroupFromString - Forms group struct from json string
func (d *Device) CreateGroupFromString(b []byte) {
var group of.Group
if err := json.Unmarshal(b, &group); err == nil {
if _, ok := d.groups.Load(group.GroupID); !ok {
logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
d.groups.Store(group.GroupID, &group)
} else {
logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
}
} else {
logger.Warn(ctx, "Unmarshal failed")
}
}
// AddMeter to add meter
func (d *Device) AddMeter(cntx context.Context, meter *of.Meter) error {
d.meterLock.Lock()
defer d.meterLock.Unlock()
if _, ok := d.meters[meter.ID]; ok {
return errors.New("duplicate meter")
}
d.meters[meter.ID] = meter
go d.AddMeterToDb(cntx, meter)
return nil
}
// UpdateMeter to update meter
func (d *Device) UpdateMeter(cntx context.Context, meter *of.Meter) error {
d.meterLock.Lock()
defer d.meterLock.Unlock()
if _, ok := d.meters[meter.ID]; ok {
d.meters[meter.ID] = meter
d.AddMeterToDb(cntx, meter)
} else {
return errors.New("meter not found for updation")
}
return nil
}
// GetMeter to get meter
func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
d.meterLock.RLock()
defer d.meterLock.RUnlock()
if m, ok := d.meters[id]; ok {
return m, nil
}
return nil, errors.New("meter not found")
}
// DelMeter to delete meter
func (d *Device) DelMeter(cntx context.Context, meter *of.Meter) bool {
d.meterLock.Lock()
defer d.meterLock.Unlock()
if _, ok := d.meters[meter.ID]; ok {
delete(d.meters, meter.ID)
go d.DelMeterFromDb(cntx, meter.ID)
return true
}
return false
}
// AddMeterToDb is utility to add the Group to the device
func (d *Device) AddMeterToDb(cntx context.Context, meter *of.Meter) {
if b, err := json.Marshal(meter); err == nil {
if err = db.PutDeviceMeter(cntx, d.ID, meter.ID, string(b)); err != nil {
logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
}
}
}
// DelMeterFromDb to delete meter from db
func (d *Device) DelMeterFromDb(cntx context.Context, id uint32) {
_ = db.DelDeviceMeter(cntx, d.ID, id)
}
// RestoreMetersFromDb to restore meters from db
func (d *Device) RestoreMetersFromDb(cntx context.Context) {
meters, _ := db.GetDeviceMeters(cntx, d.ID)
for _, meter := range meters {
b, ok := meter.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
d.CreateMeterFromString(b)
}
}
// CreateMeterFromString to create meter from string
func (d *Device) CreateMeterFromString(b []byte) {
var meter of.Meter
if err := json.Unmarshal(b, &meter); err == nil {
if _, ok := d.meters[meter.ID]; !ok {
logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
d.meters[meter.ID] = &meter
} else {
logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
}
} else {
logger.Warnw(ctx, "Unmarshal failed", log.Fields{"error": err, "meter": string(b)})
}
}
// VolthaClient to get voltha client
func (d *Device) VolthaClient() voltha.VolthaServiceClient {
return d.vclientHolder.Get()
}
// AddPort to add the port as requested by the device/VOLTHA
// Inform the application if the port is successfully added
func (d *Device) AddPort(cntx context.Context, mp *ofp.OfpPort) error {
d.portLock.Lock()
defer d.portLock.Unlock()
id := mp.PortNo
name := mp.Name
if _, ok := d.PortsByID[id]; ok {
return errors.New(Duplicate_Port)
}
if _, ok := d.PortsByName[name]; ok {
return errors.New(Duplicate_Port)
}
p := NewDevicePort(mp)
d.PortsByID[id] = p
d.PortsByName[name] = p
d.WritePortToDb(cntx, p)
GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
return nil
}
// DelPort to delete the port as requested by the device/VOLTHA
// Inform the application if the port is successfully deleted
func (d *Device) DelPort(cntx context.Context, id uint32) error {
p := d.GetPortByID(id)
if p == nil {
return errors.New("unknown port")
}
if p.State == PortStateUp {
GetController().PortDownInd(cntx, d.ID, p.Name)
}
GetController().PortDelInd(cntx, d.ID, p.Name)
d.portLock.Lock()
defer d.portLock.Unlock()
delete(d.PortsByID, p.ID)
delete(d.PortsByName, p.Name)
d.DelPortFromDb(cntx, p.ID)
logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
return nil
}
// UpdatePortByName is utility to update the port by Name
func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
d.portLock.Lock()
defer d.portLock.Unlock()
p, ok := d.PortsByName[name]
if !ok {
return
}
delete(d.PortsByID, p.ID)
p.ID = port
d.PortsByID[port] = p
d.WritePortToDb(cntx, p)
GetController().PortUpdateInd(d.ID, p.Name, p.ID)
logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
}
// GetPortName to get the name of the port by its id
func (d *Device) GetPortName(id uint32) (string, error) {
d.portLock.RLock()
defer d.portLock.RUnlock()
if p, ok := d.PortsByID[id]; ok {
return p.Name, nil
}
logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
return "", errors.New(Unknown_Port_ID)
}
// GetPortByID is utility to retrieve the port by ID
func (d *Device) GetPortByID(id uint32) *DevicePort {
d.portLock.RLock()
defer d.portLock.RUnlock()
p, ok := d.PortsByID[id]
if ok {
return p
}
return nil
}
// GetPortByName is utility to retrieve the port by Name
func (d *Device) GetPortByName(name string) *DevicePort {
d.portLock.RLock()
defer d.portLock.RUnlock()
p, ok := d.PortsByName[name]
if ok {
return p
}
return nil
}
// GetPortState to get the state of the port by name
func (d *Device) GetPortState(name string) (PortState, error) {
d.portLock.RLock()
defer d.portLock.RUnlock()
if p, ok := d.PortsByName[name]; ok {
return p.State, nil
}
return PortStateDown, errors.New(Unknown_Port_ID)
}
// GetPortID to get the port-id by the port name
func (d *Device) GetPortID(name string) (uint32, error) {
d.portLock.RLock()
defer d.portLock.RUnlock()
if p, ok := d.PortsByName[name]; ok {
return p.ID, nil
}
return 0, errors.New(Unknown_Port_ID)
}
// WritePortToDb to add the port to the database
func (d *Device) WritePortToDb(ctx context.Context, port *DevicePort) {
port.Version = database.PresentVersionMap[database.DevicePortPath]
if b, err := json.Marshal(port); err == nil {
if err = db.PutPort(ctx, d.ID, port.ID, string(b)); err != nil {
logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
}
}
}
// DelPortFromDb to delete port from database
func (d *Device) DelPortFromDb(cntx context.Context, id uint32) {
_ = db.DelPort(cntx, d.ID, id)
}
// RestorePortsFromDb to restore ports from database
func (d *Device) RestorePortsFromDb(cntx context.Context) {
ports, _ := db.GetPorts(cntx, d.ID)
for _, port := range ports {
b, ok := port.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
d.CreatePortFromString(cntx, b)
}
}
// CreatePortFromString to create port from string
func (d *Device) CreatePortFromString(cntx context.Context, b []byte) {
var port DevicePort
if err := json.Unmarshal(b, &port); err == nil {
if _, ok := d.PortsByID[port.ID]; !ok {
logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
d.PortsByID[port.ID] = &port
d.PortsByName[port.Name] = &port
GetController().PortAddInd(cntx, d.ID, port.ID, port.Name)
} else {
logger.Warnw(ctx, Duplicate_Port, log.Fields{"ID": port.ID})
}
} else {
logger.Warnw(ctx, "Unmarshal failed", log.Fields{"port": string(b)})
}
}
// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
// resources should have been removed by this time. It is an error
// scenario if the OLT has resources associated with it.
func (d *Device) Delete() {
d.StopAll()
}
// Stop to stop the task
func (d *Device) Stop() {
}
// ConnectInd is called when the connection between VGC and the VOLTHA is
// restored. This will perform audit of the device post reconnection
func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
ctx1, cancel := context.WithCancel(ctx)
d.cancel = cancel
d.ctx = ctx1
d.Tasks.Initialize(ctx1)
logger.Debugw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
d.State = DeviceStateUP
d.TimeStamp = time.Now()
GetController().DeviceUpInd(d.ID)
logger.Debugw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
t := NewAuditDevice(d, AuditEventDeviceDisc)
d.Tasks.AddTask(t)
t1 := NewAuditTablesTask(d)
d.Tasks.AddTask(t1)
t2 := NewPendingProfilesTask(d)
d.Tasks.AddTask(t2)
go d.synchronizeDeviceTables()
}
func (d *Device) synchronizeDeviceTables() {
tick := time.NewTicker(GetController().GetDeviceTableSyncDuration())
loop:
for {
select {
case <-d.ctx.Done():
logger.Warnw(d.ctx, "Context Done. Canceling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
break loop
case <-tick.C:
t1 := NewAuditTablesTask(d)
d.Tasks.AddTask(t1)
}
}
tick.Stop()
}
// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
func (d *Device) DeviceUpInd() {
logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
d.State = DeviceStateUP
d.TimeStamp = time.Now()
GetController().DeviceUpInd(d.ID)
logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
t := NewAuditDevice(d, AuditEventDeviceDisc)
d.Tasks.AddTask(t)
t1 := NewAuditTablesTask(d)
d.Tasks.AddTask(t1)
t2 := NewPendingProfilesTask(d)
d.Tasks.AddTask(t2)
}
// DeviceDownInd is called when the logical device state changes to Down.
func (d *Device) DeviceDownInd() {
logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
d.State = DeviceStateDOWN
d.TimeStamp = time.Now()
GetController().DeviceDownInd(d.ID)
}
// DeviceRebootInd is called when the logical device is rebooted.
func (d *Device) DeviceRebootInd(cntx context.Context) {
logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
if d.State == DeviceStateREBOOTED {
d.State = DeviceStateREBOOTED
logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
return
}
d.State = DeviceStateREBOOTED
d.TimeStamp = time.Now()
GetController().SetRebootInProgressForDevice(d.ID)
GetController().DeviceRebootInd(cntx, d.ID, d.SerialNum, d.SouthBoundID)
d.ReSetAllPortStates(cntx)
}
// DeviceDisabledInd is called when the logical device is disabled
func (d *Device) DeviceDisabledInd(cntx context.Context) {
logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
d.State = DeviceStateDISABLED
d.TimeStamp = time.Now()
GetController().DeviceDisableInd(cntx, d.ID)
}
// ReSetAllPortStates - Set all logical device port status to DOWN
func (d *Device) ReSetAllPortStates(cntx context.Context) {
logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
d.portLock.Lock()
defer d.portLock.Unlock()
for _, port := range d.PortsByID {
if port.State != PortStateDown {
logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
GetController().PortDownInd(cntx, d.ID, port.Name)
port.State = PortStateDown
d.WritePortToDb(cntx, port)
}
}
}
// ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
func (d *Device) ReSetAllPortStatesInDb(cntx context.Context) {
logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
d.portLock.Lock()
defer d.portLock.Unlock()
for _, port := range d.PortsByID {
if port.State != PortStateDown {
logger.Debugw(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
port.State = PortStateDown
d.WritePortToDb(cntx, port)
}
}
}
// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
// to update only when the port state is DOWN
func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
if p := d.GetPortByName(portName); p != nil {
if p.ID != port {
logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
if p.State != PortStateDown {
logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
return
}
d.UpdatePortByName(cntx, portName, port)
logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
}
d.ProcessPortState(cntx, port, state)
}
}
// ***Operations Performed on Port state Transitions***
//
// |-----------------------------------------------------------------------------|
// | State | Action |
// |--------------------|--------------------------------------------------------|
// | UP | UNI - Trigger Flow addition for service configured |
// | | NNI - Trigger Flow addition for vnets & mvlan profiles |
// | | |
// | DOWN | UNI - Trigger Flow deletion for service configured |
// | | NNI - Trigger Flow deletion for vnets & mvlan profiles |
// | | |
// |-----------------------------------------------------------------------------|
//
// ProcessPortState deals with the change in port status and taking action
// based on the new state and the old state
func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32) {
if d.State != DeviceStateUP && !util.IsNniPort(port) {
logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
return
}
if p := d.GetPortByID(port); p != nil {
logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
// Avoid blind initialization as the current tasks in the queue will be lost
// Eg: Service Del followed by Port Down - The flows will be dangling
// Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
p.Tasks.CheckAndInitialize(d.ctx)
if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
// Transition from DOWN to UP
logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
GetController().PortUpInd(cntx, d.ID, p.Name)
p.State = PortStateUp
d.WritePortToDb(cntx, p)
} else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
// Transition from UP to Down
logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
GetController().PortDownInd(cntx, d.ID, p.Name)
p.State = PortStateDown
d.WritePortToDb(cntx, p)
} else {
logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
}
}
}
// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
func (d *Device) ProcessPortStateAfterReboot(cntx context.Context, port uint32, state uint32) {
if d.State != DeviceStateUP && !util.IsNniPort(port) {
logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
return
}
if p := d.GetPortByID(port); p != nil {
logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State})
p.Tasks.Initialize(d.ctx)
if p.State == PortStateUp {
logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
GetController().PortUpInd(cntx, d.ID, p.Name)
} else if p.State == PortStateDown {
logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
GetController().PortDownInd(cntx, d.ID, p.Name)
}
}
}
// ChangeEvent : Change event brings in ports related changes such as addition/deletion
// or modification where the port status change up/down is indicated to the
// controller
func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
cet := NewChangeEventTask(d.ctx, event, d)
d.AddTask(cet)
return nil
}
// PacketIn handle the incoming packet-in and deliver to the application for the
// actual processing
func (d *Device) PacketIn(cntx context.Context, pkt *ofp.PacketIn) {
logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
return
}
data := pkt.PacketIn.Data
port := PacketInGetPort(pkt.PacketIn)
if pName, err := d.GetPortName(port); err == nil {
GetController().PacketInInd(cntx, d.ID, pName, data)
} else {
logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
}
}
// PacketInGetPort to get the port on which the packet-in is reported
func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
for _, field := range pkt.Match.OxmFields {
if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
return port.Port
}
}
}
}
}
return 0
}
// PacketOutReq receives the packet out request from the application via the
// controller. The interface from the application uses name as the identity.
func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
inp, err := d.GetPortID(inport)
if err != nil {
return errors.New("unknown inport")
}
outp, err1 := d.GetPortID(outport)
if err1 != nil {
return errors.New("unknown outport")
}
logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
return d.SendPacketOut(outp, inp, data, isCustomPkt)
}
// SendPacketOut is responsible for building the OF structure and send the
// packet-out to the VOLTHA
func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
pout := &ofp.PacketOut{}
pout.Id = d.ID
opout := &ofp.OfpPacketOut{}
pout.PacketOut = opout
opout.InPort = inport
opout.Data = data
opout.Actions = []*ofp.OfpAction{
{
Type: ofp.OfpActionType_OFPAT_OUTPUT,
Action: &ofp.OfpAction_Output{
Output: &ofp.OfpActionOutput{
Port: outport,
MaxLen: 65535,
},
},
},
}
d.packetOutChannel <- pout
return nil
}
// UpdateFlows receives the flows in the form that is implemented
// in the VGC and transforms them to the OF format. This is handled
// as a port of the task that is enqueued to do the same.
func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
t := NewAddFlowsTask(d.ctx, flow, d)
logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
// check if port isNni , if yes flows will be added to device port queues.
if util.IsNniPort(devPort.ID) {
// Adding the flows to device port queues.
devPort.AddTask(t)
return
}
// If the flowHash is enabled then add the flows to the flowhash generated queues.
flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
if flowQueue != nil {
logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
flowQueue.AddTask(t)
logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
} else {
//FlowThrotling disabled, add to the device port queue
devPort.AddTask(t)
return
}
}
// UpdateGroup to update group info
func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
task := NewModGroupTask(d.ctx, group, d)
logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
devPort.AddTask(task)
}
// ModMeter for mod meter task
func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
if command == of.MeterCommandAdd {
if _, err := d.GetMeter(meter.ID); err == nil {
logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
return
}
}
t := NewModMeterTask(d.ctx, command, meter, d)
devPort.AddTask(t)
}
func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
d.flowQueueLock.RLock()
// If flowhash is 0 that means flowhash throttling is disabled, return nil
if d.flowHash == 0 {
d.flowQueueLock.RUnlock()
return nil
}
flowHashID := id % uint32(d.flowHash)
if value, found := d.flowQueue[uint32(flowHashID)]; found {
d.flowQueueLock.RUnlock()
return value
}
d.flowQueueLock.RUnlock()
logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
return d.addFlowQueueForUniID(id)
}
func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
d.flowQueueLock.Lock()
defer d.flowQueueLock.Unlock()
flowHashID := id % uint32(d.flowHash)
flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
flowQueue.Tasks.Initialize(d.ctx)
d.flowQueue[flowHashID] = flowQueue
return flowQueue
}
// SetFlowHash sets the device flow hash and writes to the DB.
func (d *Device) SetFlowHash(cntx context.Context, hash uint32) {
d.flowQueueLock.Lock()
defer d.flowQueueLock.Unlock()
d.flowHash = hash
d.writeFlowHashToDB(cntx)
}
func (d *Device) writeFlowHashToDB(cntx context.Context) {
hash, err := json.Marshal(d.flowHash)
if err != nil {
logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash, "error": err})
return
}
if err := db.PutFlowHash(cntx, d.ID, string(hash)); err != nil {
logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash, "error": err})
}
}
// isSBOperAllowed - determines if the SB operation is allowed based on device state & force flag
func (d *Device) isSBOperAllowed(forceAction bool) bool {
if d.State == DeviceStateUP {
return true
}
if d.State == DeviceStateDISABLED && forceAction {
return true
}
return false
}
func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
flow, _ := d.GetFlow(cookie)
d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
}
func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
statusCode, statusMsg := infraerror.GetErrorInfo(err)
success := isFlowOperSuccess(statusCode, oper)
updateFlow := func(cookie uint64, state int, reason string) {
if dbFlow, ok := d.GetFlow(cookie); ok {
dbFlow.State = uint8(state)
dbFlow.ErrorReason = reason
d.AddFlowToDb(cntx, dbFlow)
}
}
// Update flow results
// Add - Update Success or Failure status with reason
// Del - Delete entry from DB on success else update error reason
if oper == of.CommandAdd {
state := of.FlowAddSuccess
reason := ""
if !success {
state = of.FlowAddFailure
reason = statusMsg
}
updateFlow(cookie, state, reason)
logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
} else {
if success && flow != nil {
if err := d.DelFlow(cntx, flow); err != nil {
logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
}
} else if !success {
updateFlow(cookie, of.FlowDelFailure, statusMsg)
}
}
flowResult := intf.FlowStatus{
Cookie: strconv.FormatUint(cookie, 10),
Device: d.ID,
FlowModType: oper,
Flow: flow,
Status: statusCode,
Reason: statusMsg,
AdditionalData: bwDetails,
}
logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
GetController().ProcessFlowModResultIndication(cntx, flowResult)
}