First Commit of Voltha-Go-Controller from Radisys
Change-Id: I8e2e908e7ab09a4fe3d86849da18b6d69dcf4ab0
diff --git a/internal/pkg/tasks/task_intf.go b/internal/pkg/tasks/task_intf.go
new file mode 100644
index 0000000..bd9b256
--- /dev/null
+++ b/internal/pkg/tasks/task_intf.go
@@ -0,0 +1,33 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package tasks
+
+import (
+ "context"
+)
+
+// Each task must support this interface to be exercised
+// by the implementation to execute the tasks similarly
+// across all tasks
+
+// Task interface
+type Task interface {
+ TaskID() uint8
+ Name() string
+ Timestamp() string
+ Start(context.Context, uint8) error
+ Stop()
+}
diff --git a/internal/pkg/tasks/tasks.go b/internal/pkg/tasks/tasks.go
new file mode 100644
index 0000000..414c4f2
--- /dev/null
+++ b/internal/pkg/tasks/tasks.go
@@ -0,0 +1,309 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package tasks
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "time"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+var logger log.CLogger
+
+var (
+ // ErrCxtCancelError error
+ ErrCxtCancelError = errors.New("Context Cancelled")
+ // ErrTaskCancelError error
+ ErrTaskCancelError = errors.New("Task Cancelled")
+
+ ctx = context.TODO()
+)
+
+// TaskSet implements a set of dependent tasks into a single unit. The
+// tasks are added in the order they are expected to be executed. If any
+// of the tasks fails, the remaining tasks are not executed.
+
+// TaskSet structure
+type TaskSet struct {
+ name string
+ taskID uint8
+ timestamp string
+ queued []Task
+}
+
+// NewTaskSet is constructor for TaskSet
+func NewTaskSet(name string) *TaskSet {
+ var ts TaskSet
+ ts.name = name
+ tstamp := (time.Now()).Format(time.RFC3339Nano)
+ ts.timestamp = tstamp
+ return &ts
+}
+
+// Name to return name of the task
+func (ts *TaskSet) Name() string {
+ return ts.name
+}
+
+// TaskID to return task id of the task
+func (ts *TaskSet) TaskID() uint8 {
+ return ts.taskID
+}
+
+// Timestamp to return timestamp for the task
+func (ts *TaskSet) Timestamp() string {
+ return ts.timestamp
+}
+
+// AddTask to add task
+func (ts *TaskSet) AddTask(task Task) {
+ logger.Debugw(ctx, "Adding Task to TaskSet", log.Fields{"SetName": ts.name, "TaskName": task.Name()})
+ ts.queued = append(ts.queued, task)
+}
+
+// Start to start the task
+func (ts *TaskSet) Start(ctx context.Context, taskID uint8) error {
+ logger.Debug(ctx, "Starting Execution TaskSet", log.Fields{"SetName": ts.name})
+ ts.taskID = taskID
+ for len(ts.queued) != 0 {
+ task := ts.queued[0]
+ logger.Infow(ctx, "Starting Execution of task", log.Fields{"TaskName": task.Name()})
+ err := task.Start(ctx, ts.taskID)
+ if err != nil {
+ return err
+ }
+ task = ts.popTask()
+ logger.Infow(ctx, "Execution of task completed", log.Fields{"TaskName": task.Name()})
+ }
+ logger.Debug(ctx, "Exiting Execution of TaskSet")
+ return nil
+}
+
+// popTask is used internally to remove the task that is
+// is just completed.
+func (ts *TaskSet) popTask() Task {
+ var task Task
+ task, ts.queued = ts.queued[0], ts.queued[1:]
+ return task
+}
+
+// Stop is used internally to remove the task that is
+// is just completed.
+func (ts *TaskSet) Stop() {
+ var task Task
+ // Stop all the tasks and clean up
+ for size := len(ts.queued); size > 0; size = len(ts.queued) {
+ // Pop out the first task and clean up resources
+ task, ts.queued = ts.queued[0], ts.queued[1:]
+ task.Stop()
+ }
+}
+
+//***************************************************************************
+// Task Execution Environment
+// ------------------------------
+// The section below helps create an execution environment for tasks
+// of a single ONU. Addition and in sequence execution of tasks is
+// the main goal.
+
+// queued - holds tasks yet to be executed and the current in progress
+// taskID - This variable is used to generate unique task id for each task
+// currentTask - This holds the value of task being executed
+// timout - This variable sets the timeout value for all of the messages
+// stop - This provides a way of stopping the execution of next task
+
+// Tasks structure
+type Tasks struct {
+ queued []Task
+ taskID uint8
+ stop bool
+ totalTasks uint16
+ failedTasks uint16
+ lock sync.RWMutex
+ ctx context.Context
+}
+
+// NewTasks is constructor for Tasks
+func NewTasks(ctx context.Context) *Tasks {
+ var ts Tasks
+ ts.taskID = 0xff
+ ts.stop = false
+ ts.queued = []Task{}
+ ts.totalTasks = 0
+ ts.failedTasks = 0
+ ts.ctx = ctx
+ return &ts
+}
+
+// Initialize is used to initialize the embedded tasks structure within
+// each ONU.
+func (ts *Tasks) Initialize(ctx context.Context) {
+
+ //Send signal to stop any task which are being executed
+ ts.StopAll()
+ ts.taskID = 0xff
+ ts.ctx = ctx
+}
+
+// CheckAndInitialize is used to initialize the embedded tasks structure within
+// NNI and resets taskID only when there are no pending tasks
+func (ts *Tasks) CheckAndInitialize(ctx context.Context) {
+
+ ts.lock.Lock()
+ logger.Infow(ctx, "Queued Tasks", log.Fields{"Count": len(ts.queued)})
+ if len(ts.queued) == 0 {
+ ts.lock.Unlock()
+ ts.Initialize(ctx)
+ return
+ }
+ ts.ctx = ctx
+ ts.lock.Unlock()
+}
+
+// getNewTaskId generates a unique task-id for each new task. The
+// transaction-ids are generated for the task-ids.
+func (ts *Tasks) getNewTaskID() uint8 {
+ ts.taskID++
+ return ts.taskID
+}
+
+// GetContext to get context of the task
+func (ts *Tasks) GetContext() context.Context {
+ return ts.ctx
+}
+
+// AddTask adds a task and executes it if there is no task
+// pending execution. The execution happens on a seperate thread.
+// The tasks are maintained per ONU. This structure is instantiated
+// one per ONU
+func (ts *Tasks) AddTask(task Task) {
+ ts.lock.Lock()
+ defer ts.lock.Unlock()
+
+ // logger.Infow(ctx, "Adding Task", log.Fields{"TaskName": task.Name()})
+ ts.queued = append(ts.queued, task)
+ if ts.queued[0] == task {
+ go ts.executeTasks()
+ }
+}
+
+// TotalTasks returns the total number of tasks completed by the
+// the execution of the tasks.
+func (ts *Tasks) TotalTasks() uint16 {
+ return ts.totalTasks
+}
+
+// StopAll stops the execution of the tasks and cleans up
+// everything associated with the tasks
+func (ts *Tasks) StopAll() {
+ ts.lock.Lock()
+ defer ts.lock.Unlock()
+
+ ts.stop = true
+ logger.Infow(ctx, "Stopping all tasks in queue", log.Fields{"TaskCount": len(ts.queued)})
+
+ if len(ts.queued) > 0 {
+ ts.queued = ts.queued[:1]
+ logger.Warnw(ctx, "Skipping Current Task", log.Fields{"Task": ts.queued[0].Name()})
+ }
+ ts.stop = false
+}
+
+// popTask is used internally to remove the task that is
+// is just completed.
+func (ts *Tasks) popTask() (Task, int) {
+ ts.lock.Lock()
+ defer ts.lock.Unlock()
+
+ var task Task
+ queueLen := len(ts.queued)
+ if queueLen > 0 {
+ task = ts.queued[0]
+ ts.queued = append(ts.queued[:0], ts.queued[0+1:]...)
+ } else {
+ logger.Errorw(ctx, "Trying to remove task from empty Task List", log.Fields{"#task ": queueLen})
+ }
+
+ return task, len(ts.queued)
+}
+
+// NumPendingTasks returns the count of tasks that are either in progress or
+// yet to be executed. The first in the list is the in progress
+// task.
+func (ts *Tasks) NumPendingTasks() uint16 {
+ return uint16(len(ts.queued))
+}
+
+// GetTaskList returns the list of tasks that are either in progress or
+// yet to be executed. The first in the list is the in progress
+// task.
+func (ts *Tasks) GetTaskList() []Task {
+ taskList := []Task{}
+ return append(taskList, ts.queued...)
+}
+
+// CurrentTask returns the task that is currently running. This can be
+// used for verifying upon unforseen failures for debugging from
+// with the code
+func (ts *Tasks) CurrentTask() Task {
+ return ts.queued[0]
+}
+
+// executeTasks executes the pending tasks one by one. The tasks are attempted
+// one after another to avoid two tasks simultaneously operating on the
+// same ONU.
+func (ts *Tasks) executeTasks() {
+ // logger.Debug(ctx, "Starting Execution of tasks")
+ for (len(ts.queued) != 0) && (!ts.stop) {
+ task := ts.queued[0]
+ taskID := ts.getNewTaskID()
+ // logger.Infow(ctx, "Starting Execution of task", log.Fields{"TaskName": task.Name()})
+ ts.totalTasks++
+
+ err := task.Start(ts.ctx, taskID)
+ if err == ErrTaskCancelError {
+ logger.Warnw(ctx, "Previous task cancelled. Exiting current task queue execution thread", log.Fields{"TaskCount": len(ts.queued)})
+ return
+ }
+ _, pending := ts.popTask()
+
+ if err != nil {
+ ts.failedTasks++
+ }
+ if err == ErrCxtCancelError {
+ // TODO - This needs correction
+ ts.StopAll()
+ return
+ }
+
+ if pending == 0 {
+ break
+ }
+ }
+ // logger.Debug(ctx, "Exiting Execution of tasks")
+}
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}