First Commit of Voltha-Go-Controller from Radisys
Change-Id: I8e2e908e7ab09a4fe3d86849da18b6d69dcf4ab0
diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go
new file mode 100644
index 0000000..ae34133
--- /dev/null
+++ b/internal/pkg/controller/controller.go
@@ -0,0 +1,523 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package controller
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "time"
+
+ "encoding/hex"
+
+ "voltha-go-controller/database"
+ errorCodes "voltha-go-controller/internal/pkg/errorcodes"
+ "voltha-go-controller/internal/pkg/intf"
+ "voltha-go-controller/internal/pkg/of"
+ "voltha-go-controller/internal/pkg/tasks"
+ "voltha-go-controller/internal/pkg/util"
+ "voltha-go-controller/internal/pkg/vpagent"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+var logger log.CLogger
+var ctx = context.TODO()
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}
+
+var db database.DBIntf
+
+var deviceTableSyncDuration = 15 * time.Minute
+
+//SetDeviceTableSyncDuration - sets interval between device table sync up activity
+// duration - in minutes
+func SetDeviceTableSyncDuration(duration int) {
+ deviceTableSyncDuration = time.Duration(duration) * time.Minute
+}
+
+// VoltController structure
+type VoltController struct {
+ rebootLock sync.Mutex
+ rebootInProgressDevices map[string]string
+ devices map[string]*Device
+ deviceLock sync.RWMutex
+ vagent map[string]*vpagent.VPAgent
+ ctx context.Context
+ app intf.App
+ RebootFlow bool
+ BlockedDeviceList *util.ConcurrentMap
+ deviceTaskQueue *util.ConcurrentMap
+}
+
+var vcontroller *VoltController
+
+// NewController is the constructor for VoltController
+func NewController(ctx context.Context, app intf.App) intf.IVPClientAgent {
+ var controller VoltController
+
+ controller.rebootInProgressDevices = make(map[string]string)
+ controller.devices = make(map[string]*Device)
+ controller.deviceLock = sync.RWMutex{}
+ controller.ctx = ctx
+ controller.app = app
+ controller.BlockedDeviceList = util.NewConcurrentMap()
+ controller.deviceTaskQueue = util.NewConcurrentMap()
+ db = database.GetDatabase()
+ vcontroller = &controller
+ return &controller
+}
+
+// AddDevice to add device
+func (v *VoltController) AddDevice(config *intf.VPClientCfg) intf.IVPClient {
+
+ d := NewDevice(config.DeviceID, config.SerialNum, config.VolthaClient, config.SouthBoundID)
+ v.devices[config.DeviceID] = d
+ v.app.AddDevice(d.ID, d.SerialNum, config.SouthBoundID)
+
+ d.RestoreMetersFromDb()
+ d.RestoreGroupsFromDb()
+ d.RestoreFlowsFromDb()
+ d.RestorePortsFromDb()
+ d.ConnectInd(context.TODO(), intf.DeviceDisc)
+ d.packetOutChannel = config.PacketOutChannel
+
+ logger.Warnw(ctx, "Added device", log.Fields{"Device": config.DeviceID, "SerialNo": d.SerialNum, "State": d.State})
+
+ return d
+}
+
+// DelDevice to delete device
+func (v *VoltController) DelDevice(id string) {
+ d, ok := v.devices[id]
+ if ok {
+ delete(v.devices, id)
+ d.Delete()
+ }
+ v.app.DelDevice(id)
+ d.cancel() // To stop the device tables sync routine
+ logger.Warnw(ctx, "Deleted device", log.Fields{"Device": id})
+}
+
+//AddControllerTask - add task to controller queue
+func (v *VoltController) AddControllerTask(device string, task tasks.Task) {
+ var taskQueueIntf interface{}
+ var taskQueue *tasks.Tasks
+ var found bool
+ if taskQueueIntf, found = v.deviceTaskQueue.Get(device); !found {
+ taskQueue = tasks.NewTasks(context.TODO())
+ v.deviceTaskQueue.Set(device, taskQueue)
+ } else {
+ taskQueue = taskQueueIntf.(*tasks.Tasks)
+ }
+ taskQueue.AddTask(task)
+ logger.Warnw(ctx, "Task Added to Controller Task List", log.Fields{"Len": taskQueue.NumPendingTasks(), "Total": taskQueue.TotalTasks()})
+}
+
+//AddNewDevice - called when new device is discovered. This will be
+//processed as part of controller queue
+func (v *VoltController) AddNewDevice(config *intf.VPClientCfg) {
+ adt := NewAddDeviceTask(config)
+ v.AddControllerTask(config.DeviceID, adt)
+}
+
+// GetDevice to get device info
+func (v *VoltController) GetDevice(id string) (*Device, error) {
+ d, ok := v.devices[id]
+ if ok {
+ return d, nil
+ }
+ return nil, errorCodes.ErrDeviceNotFound
+}
+
+// IsRebootInProgressForDevice to check if reboot is in progress for the device
+func (v *VoltController) IsRebootInProgressForDevice(device string) bool {
+ v.rebootLock.Lock()
+ defer v.rebootLock.Unlock()
+ _, ok := v.rebootInProgressDevices[device]
+ return ok
+}
+
+// SetRebootInProgressForDevice to set reboot in progress for the device
+func (v *VoltController) SetRebootInProgressForDevice(device string) bool {
+ v.rebootLock.Lock()
+ defer v.rebootLock.Unlock()
+ _, ok := v.rebootInProgressDevices[device]
+ if ok {
+ return true
+ }
+ v.rebootInProgressDevices[device] = device
+ logger.Warnw(ctx, "Setted Reboot-In-Progress flag", log.Fields{"Device": device})
+
+ d, err := v.GetDevice(device)
+ if err == nil {
+ d.ResetCache()
+ } else {
+ logger.Errorw(ctx, "Failed to get device", log.Fields{"Device": device, "Error": err})
+ }
+
+ return true
+}
+
+// ReSetRebootInProgressForDevice to reset reboot in progress for the device
+func (v *VoltController) ReSetRebootInProgressForDevice(device string) bool {
+ v.rebootLock.Lock()
+ defer v.rebootLock.Unlock()
+ _, ok := v.rebootInProgressDevices[device]
+ if !ok {
+ return true
+ }
+ delete(v.rebootInProgressDevices, device)
+ logger.Warnw(ctx, "Resetted Reboot-In-Progress flag", log.Fields{"Device": device})
+ return true
+}
+
+// DeviceRebootInd is device reboot indication
+func (v *VoltController) DeviceRebootInd(dID string, srNo string, sbID string) {
+ v.app.DeviceRebootInd(dID, srNo, sbID)
+ _ = db.DelAllRoutesForDevice(dID)
+ _ = db.DelAllGroup(dID)
+ _ = db.DelAllMeter(dID)
+ _ = db.DelAllPONCounters(dID)
+}
+
+// DeviceDisableInd is device deactivation indication
+func (v *VoltController) DeviceDisableInd(dID string) {
+ v.app.DeviceDisableInd(dID)
+}
+
+//TriggerPendingProfileDeleteReq - trigger pending profile delete requests
+func (v *VoltController) TriggerPendingProfileDeleteReq(device string) {
+ v.app.TriggerPendingProfileDeleteReq(device)
+}
+
+//TriggerPendingMigrateServicesReq - trigger pending services migration requests
+func (v *VoltController) TriggerPendingMigrateServicesReq(device string) {
+ v.app.TriggerPendingMigrateServicesReq(device)
+}
+
+// SetAuditFlags to set the audit flags
+func (v *VoltController) SetAuditFlags(device *Device) {
+ v.app.SetRebootFlag(true)
+ device.auditInProgress = true
+}
+
+// ResetAuditFlags to reset the audit flags
+func (v *VoltController) ResetAuditFlags(device *Device) {
+ v.app.SetRebootFlag(false)
+ device.auditInProgress = false
+}
+
+//ProcessFlowModResultIndication - send flow mod result notification
+func (v *VoltController) ProcessFlowModResultIndication(flowStatus intf.FlowStatus) {
+ v.app.ProcessFlowModResultIndication(flowStatus)
+}
+
+// AddVPAgent to add the vpagent
+func (v *VoltController) AddVPAgent(vep string, vpa *vpagent.VPAgent) {
+ v.vagent[vep] = vpa
+}
+
+// VPAgent to get vpagent info
+func (v *VoltController) VPAgent(vep string) (*vpagent.VPAgent, error) {
+ vpa, ok := v.vagent[vep]
+ if ok {
+ return vpa, nil
+ }
+ return nil, errors.New("VPA Not Registered")
+}
+
+// PacketOutReq for packet out request
+func (v *VoltController) PacketOutReq(device string, inport string, outport string, pkt []byte, isCustomPkt bool) error {
+ logger.Debugw(ctx, "Packet Out Req", log.Fields{"Device": device, "OutPort": outport})
+ d, err := v.GetDevice(device)
+ if err != nil {
+ return err
+ }
+ logger.Debugw(ctx, "Packet Out Pkt", log.Fields{"Pkt": hex.EncodeToString(pkt)})
+ return d.PacketOutReq(inport, outport, pkt, isCustomPkt)
+}
+
+// AddFlows to add flows
+func (v *VoltController) AddFlows(port string, device string, flow *of.VoltFlow) error {
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ return err
+ }
+ devPort := d.GetPortByName(port)
+ if devPort == nil {
+ logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+ return errorCodes.ErrPortNotFound
+ }
+ if d.ctx == nil {
+ //FIXME: Application should know the context before it could submit task. Handle at application level
+ logger.Errorw(ctx, "Context is missing. AddFlow Operation Not added to Task", log.Fields{"Device": device})
+ return errorCodes.ErrInvalidParamInRequest
+ }
+
+ var isMigrationRequired bool
+ if flow.MigrateCookie {
+ // flow migration to new cookie must be done only during the audit. Migration for all subflows must be done if
+ // atlease one subflow with old cookie found in the device.
+ for _, subFlow := range flow.SubFlows {
+ if isMigrationRequired = d.IsFlowPresentWithOldCookie(subFlow); isMigrationRequired {
+ break
+ }
+ }
+ }
+
+ if isMigrationRequired {
+ // In this case, the flow is updated in local cache and db here.
+ // Actual flow deletion and addition at voltha will happen during flow tables audit.
+ for _, subFlow := range flow.SubFlows {
+ logger.Debugw(ctx, "Cookie Migration Required", log.Fields{"OldCookie": subFlow.OldCookie, "NewCookie": subFlow.Cookie})
+ if err := d.DelFlowWithOldCookie(subFlow); err != nil {
+ logger.Errorw(ctx, "Delete flow with old cookie failed", log.Fields{"Error": err, "OldCookie": subFlow.OldCookie})
+ }
+ if err := d.AddFlow(subFlow); err != nil {
+ logger.Errorw(ctx, "Flow Add Failed", log.Fields{"Error": err, "Cookie": subFlow.Cookie})
+ }
+ }
+ } else {
+ flow.Command = of.CommandAdd
+ d.UpdateFlows(flow, devPort)
+ for cookie := range flow.SubFlows {
+ logger.Debugw(ctx, "Flow Add added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
+ }
+ }
+ return nil
+}
+
+// DelFlows to delete flows
+func (v *VoltController) DelFlows(port string, device string, flow *of.VoltFlow) error {
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ return err
+ }
+ devPort := d.GetPortByName(port)
+ if devPort == nil {
+ logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+ return errorCodes.ErrPortNotFound
+ }
+ if d.ctx == nil {
+ //FIXME: Application should know the context before it could submit task. Handle at application level
+ logger.Errorw(ctx, "Context is missing. DelFlow Operation Not added to Task", log.Fields{"Device": device})
+ return errorCodes.ErrInvalidParamInRequest
+ }
+
+ var isMigrationRequired bool
+ if flow.MigrateCookie {
+ // flow migration to new cookie must be done only during the audit. Migration for all subflows must be done if
+ // atlease one subflow with old cookie found in the device.
+ for _, subFlow := range flow.SubFlows {
+ if isMigrationRequired = d.IsFlowPresentWithOldCookie(subFlow); isMigrationRequired {
+ break
+ }
+ }
+ }
+
+ if isMigrationRequired {
+ // In this case, the flow is deleted from local cache and db here.
+ // Actual flow deletion at voltha will happen during flow tables audit.
+ for _, subFlow := range flow.SubFlows {
+ logger.Debugw(ctx, "Old Cookie delete Required", log.Fields{"OldCookie": subFlow.OldCookie})
+ if err := d.DelFlowWithOldCookie(subFlow); err != nil {
+ logger.Errorw(ctx, "DelFlowWithOldCookie failed", log.Fields{"OldCookie": subFlow.OldCookie, "Error": err})
+ }
+ }
+ } else {
+ flow.Command = of.CommandDel
+ d.UpdateFlows(flow, devPort)
+ for cookie := range flow.SubFlows {
+ logger.Debugw(ctx, "Flow Del added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
+ }
+ }
+ return nil
+}
+
+// GroupUpdate for group update
+func (v *VoltController) GroupUpdate(port string, device string, group *of.Group) error {
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ return err
+ }
+
+ devPort := d.GetPortByName(port)
+ if devPort == nil {
+ logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+ return errorCodes.ErrPortNotFound
+ }
+
+ if d.ctx == nil {
+ //FIXME: Application should know the context before it could submit task. Handle at application level
+ logger.Errorw(ctx, "Context is missing. GroupMod Operation Not added to task", log.Fields{"Device": device})
+ return errorCodes.ErrInvalidParamInRequest
+ }
+
+ d.UpdateGroup(group, devPort)
+ return nil
+}
+
+// ModMeter to get mod meter info
+func (v *VoltController) ModMeter(port string, device string, command of.MeterCommand, meter *of.Meter) error {
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ return err
+ }
+
+ devPort := d.GetPortByName(port)
+ if devPort == nil {
+ logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+ return errorCodes.ErrPortNotFound
+ }
+
+ d.ModMeter(command, meter, devPort)
+ return nil
+}
+
+// PortAddInd for port add indication
+func (v *VoltController) PortAddInd(device string, id uint32, name string) {
+ v.app.PortAddInd(device, id, name)
+}
+
+// PortDelInd for port delete indication
+func (v *VoltController) PortDelInd(device string, port string) {
+ v.app.PortDelInd(device, port)
+}
+
+// PortUpdateInd for port update indication
+func (v *VoltController) PortUpdateInd(device string, name string, id uint32) {
+ v.app.PortUpdateInd(device, name, id)
+}
+
+// PortUpInd for port up indication
+func (v *VoltController) PortUpInd(device string, port string) {
+ v.app.PortUpInd(device, port)
+}
+
+// PortDownInd for port down indication
+func (v *VoltController) PortDownInd(device string, port string) {
+ v.app.PortDownInd(device, port)
+}
+
+// DeviceUpInd for device up indication
+func (v *VoltController) DeviceUpInd(device string) {
+ v.app.DeviceUpInd(device)
+}
+
+// DeviceDownInd for device down indication
+func (v *VoltController) DeviceDownInd(device string) {
+ v.app.DeviceDownInd(device)
+}
+
+// PacketInInd for packet in indication
+func (v *VoltController) PacketInInd(device string, port string, data []byte) {
+ v.app.PacketInInd(device, port, data)
+}
+
+// GetPortState to get port status
+func (v *VoltController) GetPortState(device string, name string) (PortState, error) {
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ return PortStateDown, err
+ }
+ return d.GetPortState(name)
+}
+
+// UpdateMvlanProfiles for update mvlan profiles
+func (v *VoltController) UpdateMvlanProfiles(device string) {
+ v.app.UpdateMvlanProfilesForDevice(device)
+}
+
+// GetController to get controller
+func GetController() *VoltController {
+ return vcontroller
+}
+
+/*
+// PostIndication to post indication
+func (v *VoltController) PostIndication(device string, task interface{}) error {
+ var srvTask AddServiceIndTask
+ var portTask AddPortIndTask
+ var taskCommon tasks.Task
+ var isSvcTask bool
+
+ switch data := task.(type) {
+ case *AddServiceIndTask:
+ srvTask = *data
+ taskCommon = data
+ isSvcTask = true
+ case *AddPortIndTask:
+ portTask = *data
+ taskCommon = data
+ }
+
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ //It means device itself it not present so just post the indication directly
+ if isSvcTask {
+ msgbus.PostAccessConfigInd(srvTask.result, d.SerialNum, srvTask.indicationType, srvTask.serviceName, 0, srvTask.reason, srvTask.trigger, srvTask.portState)
+ } else {
+ msgbus.ProcessPortInd(portTask.indicationType, d.SerialNum, portTask.portName, portTask.accessConfig, portTask.serviceList)
+ }
+ return err
+ }
+ if taskCommon != nil {
+ d.AddTask(taskCommon)
+ }
+ return nil
+}
+*/
+
+// GetTaskList to get the task list
+func (v *VoltController) GetTaskList(device string) []tasks.Task {
+ d, err := v.GetDevice(device)
+ if err != nil || d.ctx == nil {
+ logger.Errorw(ctx, "Device Not Connected/Found", log.Fields{"Device": device, "Dev Obj": d})
+ return []tasks.Task{}
+ }
+ return d.GetTaskList()
+
+}
+
+// AddBlockedDevices to add devices to blocked devices list
+func (v *VoltController) AddBlockedDevices(deviceSerialNumber string) {
+ v.BlockedDeviceList.Set(deviceSerialNumber, deviceSerialNumber)
+}
+
+// DelBlockedDevices to remove device from blocked device list
+func (v *VoltController) DelBlockedDevices(deviceSerialNumber string) {
+ v.BlockedDeviceList.Remove(deviceSerialNumber)
+}
+
+// IsBlockedDevice to check if device is blocked
+func (v *VoltController) IsBlockedDevice(deviceSerialNumber string) bool {
+ _, ifPresent := v.BlockedDeviceList.Get(deviceSerialNumber)
+ return ifPresent
+}