blob: cfe35f0f15d46e57bb1923a31d1564a96dd304a2 [file] [log] [blame]
/*
* Copyright 2022-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tasks
import (
"context"
"errors"
"sync"
"time"
"voltha-go-controller/log"
)
var logger log.CLogger
var (
// ErrCxtCancelError error
ErrCxtCancelError = errors.New("Context Canceled")
// ErrTaskCancelError error
ErrTaskCancelError = errors.New("Task Canceled")
ctx = context.TODO()
)
// TaskSet implements a set of dependent tasks into a single unit. The
// tasks are added in the order they are expected to be executed. If any
// of the tasks fails, the remaining tasks are not executed.
// TaskSet structure
type TaskSet struct {
name string
timestamp string
queued []Task
taskID uint8
}
// NewTaskSet is constructor for TaskSet
func NewTaskSet(name string) *TaskSet {
var ts TaskSet
ts.name = name
tstamp := (time.Now()).Format(time.RFC3339Nano)
ts.timestamp = tstamp
return &ts
}
// Name to return name of the task
func (ts *TaskSet) Name() string {
return ts.name
}
// TaskID to return task id of the task
func (ts *TaskSet) TaskID() uint8 {
return ts.taskID
}
// Timestamp to return timestamp for the task
func (ts *TaskSet) Timestamp() string {
return ts.timestamp
}
// AddTask to add task
func (ts *TaskSet) AddTask(task Task) {
logger.Debugw(ctx, "Adding Task to TaskSet", log.Fields{"SetName": ts.name, "TaskName": task.Name()})
ts.queued = append(ts.queued, task)
}
// Start to start the task
func (ts *TaskSet) Start(ctx context.Context, taskID uint8) error {
logger.Debugw(ctx, "Starting Execution TaskSet", log.Fields{"SetName": ts.name})
ts.taskID = taskID
for len(ts.queued) != 0 {
task := ts.queued[0]
logger.Infow(ctx, "Starting Execution of task", log.Fields{"TaskName": task.Name()})
err := task.Start(ctx, ts.taskID)
if err != nil {
return err
}
task = ts.popTask()
logger.Infow(ctx, "Execution of task completed", log.Fields{"TaskName": task.Name()})
}
logger.Debug(ctx, "Exiting Execution of TaskSet")
return nil
}
// popTask is used internally to remove the task that is
// is just completed.
func (ts *TaskSet) popTask() Task {
var task Task
task, ts.queued = ts.queued[0], ts.queued[1:]
return task
}
// Stop is used internally to remove the task that is
// is just completed.
func (ts *TaskSet) Stop() {
var task Task
// Stop all the tasks and clean up
for size := len(ts.queued); size > 0; size = len(ts.queued) {
// Pop out the first task and clean up resources
task, ts.queued = ts.queued[0], ts.queued[1:]
task.Stop()
}
}
//***************************************************************************
// Task Execution Environment
// ------------------------------
// The section below helps create an execution environment for tasks
// of a single ONU. Addition and in sequence execution of tasks is
// the main goal.
// queued - holds tasks yet to be executed and the current in progress
// taskID - This variable is used to generate unique task id for each task
// currentTask - This holds the value of task being executed
// timout - This variable sets the timeout value for all of the messages
// stop - This provides a way of stopping the execution of next task
// Tasks structure
type Tasks struct {
ctx context.Context
queued []Task
lock sync.RWMutex
totalTasks uint16
failedTasks uint16
taskID uint8
stop bool
}
// NewTasks is constructor for Tasks
func NewTasks(ctx context.Context) *Tasks {
var ts Tasks
ts.taskID = 0xff
ts.stop = false
ts.queued = []Task{}
ts.totalTasks = 0
ts.failedTasks = 0
ts.ctx = ctx
return &ts
}
// Initialize is used to initialize the embedded tasks structure within
// each ONU.
func (ts *Tasks) Initialize(ctx context.Context) {
//Send signal to stop any task which are being executed
ts.StopAll()
ts.taskID = 0xff
ts.ctx = ctx
}
// CheckAndInitialize is used to initialize the embedded tasks structure within
// NNI and resets taskID only when there are no pending tasks
func (ts *Tasks) CheckAndInitialize(ctx context.Context) {
ts.lock.Lock()
logger.Infow(ctx, "Queued Tasks", log.Fields{"Count": len(ts.queued)})
if len(ts.queued) == 0 {
ts.lock.Unlock()
ts.Initialize(ctx)
return
}
ts.ctx = ctx
ts.lock.Unlock()
}
// getNewTaskId generates a unique task-id for each new task. The
// transaction-ids are generated for the task-ids.
func (ts *Tasks) getNewTaskID() uint8 {
ts.taskID++
return ts.taskID
}
// GetContext to get context of the task
func (ts *Tasks) GetContext() context.Context {
return ts.ctx
}
// AddTask adds a task and executes it if there is no task
// pending execution. The execution happens on a separate thread.
// The tasks are maintained per ONU. This structure is instantiated
// one per ONU
func (ts *Tasks) AddTask(task Task) {
ts.lock.Lock()
defer ts.lock.Unlock()
// logger.Infow(ctx, "Adding Task", log.Fields{"TaskName": task.Name()})
ts.queued = append(ts.queued, task)
if ts.queued[0] == task {
go ts.executeTasks()
}
}
// TotalTasks returns the total number of tasks completed by the
// the execution of the tasks.
func (ts *Tasks) TotalTasks() uint16 {
return ts.totalTasks
}
// StopAll stops the execution of the tasks and cleans up
// everything associated with the tasks
func (ts *Tasks) StopAll() {
ts.lock.Lock()
defer ts.lock.Unlock()
ts.stop = true
logger.Infow(ctx, "Stopping all tasks in queue", log.Fields{"TaskCount": len(ts.queued)})
if len(ts.queued) > 0 {
ts.queued = ts.queued[:1]
logger.Warnw(ctx, "Skipping Current Task", log.Fields{"Task": ts.queued[0].Name()})
}
ts.stop = false
}
// popTask is used internally to remove the task that is
// is just completed.
func (ts *Tasks) popTask() (Task, int) {
ts.lock.Lock()
defer ts.lock.Unlock()
var task Task
queueLen := len(ts.queued)
if queueLen > 0 {
task = ts.queued[0]
ts.queued = append(ts.queued[:0], ts.queued[0+1:]...)
} else {
logger.Errorw(ctx, "Trying to remove task from empty Task List", log.Fields{"#task ": queueLen})
}
return task, len(ts.queued)
}
// NumPendingTasks returns the count of tasks that are either in progress or
// yet to be executed. The first in the list is the in progress
// task.
func (ts *Tasks) NumPendingTasks() uint16 {
return uint16(len(ts.queued))
}
// GetTaskList returns the list of tasks that are either in progress or
// yet to be executed. The first in the list is the in progress
// task.
func (ts *Tasks) GetTaskList() []Task {
taskList := []Task{}
return append(taskList, ts.queued...)
}
// CurrentTask returns the task that is currently running. This can be
// used for verifying upon unforeseen failures for debugging from
// with the code
func (ts *Tasks) CurrentTask() Task {
return ts.queued[0]
}
// executeTasks executes the pending tasks one by one. The tasks are attempted
// one after another to avoid two tasks simultaneously operating on the
// same ONU.
func (ts *Tasks) executeTasks() {
// logger.Debug(ctx, "Starting Execution of tasks")
for (len(ts.queued) != 0) && (!ts.stop) {
task := ts.queued[0]
taskID := ts.getNewTaskID()
// logger.Infow(ctx, "Starting Execution of task", log.Fields{"TaskName": task.Name()})
ts.totalTasks++
err := task.Start(ts.ctx, taskID)
if err == ErrTaskCancelError {
logger.Warnw(ctx, "Previous task canceled. Exiting current task queue execution thread", log.Fields{"TaskCount": len(ts.queued)})
return
}
_, pending := ts.popTask()
if err != nil {
ts.failedTasks++
}
if err == ErrCxtCancelError {
// TODO - This needs correction
ts.StopAll()
return
}
if pending == 0 {
break
}
}
// logger.Debug(ctx, "Exiting Execution of tasks")
}
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
logger, err = log.AddPackageWithDefaultParam()
if err != nil {
panic(err)
}
}