Naveen Sampath | 04696f7 | 2022-06-13 15:19:14 +0530 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2022-present Open Networking Foundation |
| 3 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | * you may not use this file except in compliance with the License. |
| 5 | * You may obtain a copy of the License at |
| 6 | * |
| 7 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | * |
| 9 | * Unless required by applicable law or agreed to in writing, software |
| 10 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | * See the License for the specific language governing permissions and |
| 13 | * limitations under the License. |
| 14 | */ |
| 15 | |
| 16 | package tasks |
| 17 | |
| 18 | import ( |
| 19 | "context" |
| 20 | "errors" |
| 21 | "sync" |
| 22 | "time" |
| 23 | |
Tinoj Joseph | 1d10832 | 2022-07-13 10:07:39 +0530 | [diff] [blame] | 24 | "voltha-go-controller/log" |
Naveen Sampath | 04696f7 | 2022-06-13 15:19:14 +0530 | [diff] [blame] | 25 | ) |
| 26 | |
| 27 | var logger log.CLogger |
| 28 | |
| 29 | var ( |
| 30 | // ErrCxtCancelError error |
| 31 | ErrCxtCancelError = errors.New("Context Cancelled") |
| 32 | // ErrTaskCancelError error |
| 33 | ErrTaskCancelError = errors.New("Task Cancelled") |
| 34 | |
| 35 | ctx = context.TODO() |
| 36 | ) |
| 37 | |
| 38 | // TaskSet implements a set of dependent tasks into a single unit. The |
| 39 | // tasks are added in the order they are expected to be executed. If any |
| 40 | // of the tasks fails, the remaining tasks are not executed. |
| 41 | |
| 42 | // TaskSet structure |
| 43 | type TaskSet struct { |
| 44 | name string |
| 45 | taskID uint8 |
| 46 | timestamp string |
| 47 | queued []Task |
| 48 | } |
| 49 | |
| 50 | // NewTaskSet is constructor for TaskSet |
| 51 | func NewTaskSet(name string) *TaskSet { |
| 52 | var ts TaskSet |
| 53 | ts.name = name |
| 54 | tstamp := (time.Now()).Format(time.RFC3339Nano) |
| 55 | ts.timestamp = tstamp |
| 56 | return &ts |
| 57 | } |
| 58 | |
| 59 | // Name to return name of the task |
| 60 | func (ts *TaskSet) Name() string { |
| 61 | return ts.name |
| 62 | } |
| 63 | |
| 64 | // TaskID to return task id of the task |
| 65 | func (ts *TaskSet) TaskID() uint8 { |
| 66 | return ts.taskID |
| 67 | } |
| 68 | |
| 69 | // Timestamp to return timestamp for the task |
| 70 | func (ts *TaskSet) Timestamp() string { |
| 71 | return ts.timestamp |
| 72 | } |
| 73 | |
| 74 | // AddTask to add task |
| 75 | func (ts *TaskSet) AddTask(task Task) { |
| 76 | logger.Debugw(ctx, "Adding Task to TaskSet", log.Fields{"SetName": ts.name, "TaskName": task.Name()}) |
| 77 | ts.queued = append(ts.queued, task) |
| 78 | } |
| 79 | |
| 80 | // Start to start the task |
| 81 | func (ts *TaskSet) Start(ctx context.Context, taskID uint8) error { |
Tinoj Joseph | 1d10832 | 2022-07-13 10:07:39 +0530 | [diff] [blame] | 82 | logger.Debugw(ctx, "Starting Execution TaskSet", log.Fields{"SetName": ts.name}) |
Naveen Sampath | 04696f7 | 2022-06-13 15:19:14 +0530 | [diff] [blame] | 83 | ts.taskID = taskID |
| 84 | for len(ts.queued) != 0 { |
| 85 | task := ts.queued[0] |
| 86 | logger.Infow(ctx, "Starting Execution of task", log.Fields{"TaskName": task.Name()}) |
| 87 | err := task.Start(ctx, ts.taskID) |
| 88 | if err != nil { |
| 89 | return err |
| 90 | } |
| 91 | task = ts.popTask() |
| 92 | logger.Infow(ctx, "Execution of task completed", log.Fields{"TaskName": task.Name()}) |
| 93 | } |
| 94 | logger.Debug(ctx, "Exiting Execution of TaskSet") |
| 95 | return nil |
| 96 | } |
| 97 | |
| 98 | // popTask is used internally to remove the task that is |
| 99 | // is just completed. |
| 100 | func (ts *TaskSet) popTask() Task { |
| 101 | var task Task |
| 102 | task, ts.queued = ts.queued[0], ts.queued[1:] |
| 103 | return task |
| 104 | } |
| 105 | |
| 106 | // Stop is used internally to remove the task that is |
| 107 | // is just completed. |
| 108 | func (ts *TaskSet) Stop() { |
| 109 | var task Task |
| 110 | // Stop all the tasks and clean up |
| 111 | for size := len(ts.queued); size > 0; size = len(ts.queued) { |
| 112 | // Pop out the first task and clean up resources |
| 113 | task, ts.queued = ts.queued[0], ts.queued[1:] |
| 114 | task.Stop() |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | //*************************************************************************** |
| 119 | // Task Execution Environment |
| 120 | // ------------------------------ |
| 121 | // The section below helps create an execution environment for tasks |
| 122 | // of a single ONU. Addition and in sequence execution of tasks is |
| 123 | // the main goal. |
| 124 | |
| 125 | // queued - holds tasks yet to be executed and the current in progress |
| 126 | // taskID - This variable is used to generate unique task id for each task |
| 127 | // currentTask - This holds the value of task being executed |
| 128 | // timout - This variable sets the timeout value for all of the messages |
| 129 | // stop - This provides a way of stopping the execution of next task |
| 130 | |
| 131 | // Tasks structure |
| 132 | type Tasks struct { |
| 133 | queued []Task |
| 134 | taskID uint8 |
| 135 | stop bool |
| 136 | totalTasks uint16 |
| 137 | failedTasks uint16 |
| 138 | lock sync.RWMutex |
| 139 | ctx context.Context |
| 140 | } |
| 141 | |
| 142 | // NewTasks is constructor for Tasks |
| 143 | func NewTasks(ctx context.Context) *Tasks { |
| 144 | var ts Tasks |
| 145 | ts.taskID = 0xff |
| 146 | ts.stop = false |
| 147 | ts.queued = []Task{} |
| 148 | ts.totalTasks = 0 |
| 149 | ts.failedTasks = 0 |
| 150 | ts.ctx = ctx |
| 151 | return &ts |
| 152 | } |
| 153 | |
| 154 | // Initialize is used to initialize the embedded tasks structure within |
| 155 | // each ONU. |
| 156 | func (ts *Tasks) Initialize(ctx context.Context) { |
| 157 | |
| 158 | //Send signal to stop any task which are being executed |
| 159 | ts.StopAll() |
| 160 | ts.taskID = 0xff |
| 161 | ts.ctx = ctx |
| 162 | } |
| 163 | |
| 164 | // CheckAndInitialize is used to initialize the embedded tasks structure within |
| 165 | // NNI and resets taskID only when there are no pending tasks |
| 166 | func (ts *Tasks) CheckAndInitialize(ctx context.Context) { |
| 167 | |
| 168 | ts.lock.Lock() |
| 169 | logger.Infow(ctx, "Queued Tasks", log.Fields{"Count": len(ts.queued)}) |
| 170 | if len(ts.queued) == 0 { |
| 171 | ts.lock.Unlock() |
| 172 | ts.Initialize(ctx) |
| 173 | return |
| 174 | } |
| 175 | ts.ctx = ctx |
| 176 | ts.lock.Unlock() |
| 177 | } |
| 178 | |
| 179 | // getNewTaskId generates a unique task-id for each new task. The |
| 180 | // transaction-ids are generated for the task-ids. |
| 181 | func (ts *Tasks) getNewTaskID() uint8 { |
| 182 | ts.taskID++ |
| 183 | return ts.taskID |
| 184 | } |
| 185 | |
| 186 | // GetContext to get context of the task |
| 187 | func (ts *Tasks) GetContext() context.Context { |
| 188 | return ts.ctx |
| 189 | } |
| 190 | |
| 191 | // AddTask adds a task and executes it if there is no task |
| 192 | // pending execution. The execution happens on a seperate thread. |
| 193 | // The tasks are maintained per ONU. This structure is instantiated |
| 194 | // one per ONU |
| 195 | func (ts *Tasks) AddTask(task Task) { |
| 196 | ts.lock.Lock() |
| 197 | defer ts.lock.Unlock() |
| 198 | |
| 199 | // logger.Infow(ctx, "Adding Task", log.Fields{"TaskName": task.Name()}) |
| 200 | ts.queued = append(ts.queued, task) |
| 201 | if ts.queued[0] == task { |
| 202 | go ts.executeTasks() |
| 203 | } |
| 204 | } |
| 205 | |
| 206 | // TotalTasks returns the total number of tasks completed by the |
| 207 | // the execution of the tasks. |
| 208 | func (ts *Tasks) TotalTasks() uint16 { |
| 209 | return ts.totalTasks |
| 210 | } |
| 211 | |
| 212 | // StopAll stops the execution of the tasks and cleans up |
| 213 | // everything associated with the tasks |
| 214 | func (ts *Tasks) StopAll() { |
| 215 | ts.lock.Lock() |
| 216 | defer ts.lock.Unlock() |
| 217 | |
| 218 | ts.stop = true |
| 219 | logger.Infow(ctx, "Stopping all tasks in queue", log.Fields{"TaskCount": len(ts.queued)}) |
| 220 | |
| 221 | if len(ts.queued) > 0 { |
| 222 | ts.queued = ts.queued[:1] |
| 223 | logger.Warnw(ctx, "Skipping Current Task", log.Fields{"Task": ts.queued[0].Name()}) |
| 224 | } |
| 225 | ts.stop = false |
| 226 | } |
| 227 | |
| 228 | // popTask is used internally to remove the task that is |
| 229 | // is just completed. |
| 230 | func (ts *Tasks) popTask() (Task, int) { |
| 231 | ts.lock.Lock() |
| 232 | defer ts.lock.Unlock() |
| 233 | |
| 234 | var task Task |
| 235 | queueLen := len(ts.queued) |
| 236 | if queueLen > 0 { |
| 237 | task = ts.queued[0] |
| 238 | ts.queued = append(ts.queued[:0], ts.queued[0+1:]...) |
| 239 | } else { |
| 240 | logger.Errorw(ctx, "Trying to remove task from empty Task List", log.Fields{"#task ": queueLen}) |
| 241 | } |
| 242 | |
| 243 | return task, len(ts.queued) |
| 244 | } |
| 245 | |
| 246 | // NumPendingTasks returns the count of tasks that are either in progress or |
| 247 | // yet to be executed. The first in the list is the in progress |
| 248 | // task. |
| 249 | func (ts *Tasks) NumPendingTasks() uint16 { |
| 250 | return uint16(len(ts.queued)) |
| 251 | } |
| 252 | |
| 253 | // GetTaskList returns the list of tasks that are either in progress or |
| 254 | // yet to be executed. The first in the list is the in progress |
| 255 | // task. |
| 256 | func (ts *Tasks) GetTaskList() []Task { |
| 257 | taskList := []Task{} |
| 258 | return append(taskList, ts.queued...) |
| 259 | } |
| 260 | |
| 261 | // CurrentTask returns the task that is currently running. This can be |
| 262 | // used for verifying upon unforseen failures for debugging from |
| 263 | // with the code |
| 264 | func (ts *Tasks) CurrentTask() Task { |
| 265 | return ts.queued[0] |
| 266 | } |
| 267 | |
| 268 | // executeTasks executes the pending tasks one by one. The tasks are attempted |
| 269 | // one after another to avoid two tasks simultaneously operating on the |
| 270 | // same ONU. |
| 271 | func (ts *Tasks) executeTasks() { |
| 272 | // logger.Debug(ctx, "Starting Execution of tasks") |
| 273 | for (len(ts.queued) != 0) && (!ts.stop) { |
| 274 | task := ts.queued[0] |
| 275 | taskID := ts.getNewTaskID() |
| 276 | // logger.Infow(ctx, "Starting Execution of task", log.Fields{"TaskName": task.Name()}) |
| 277 | ts.totalTasks++ |
| 278 | |
| 279 | err := task.Start(ts.ctx, taskID) |
| 280 | if err == ErrTaskCancelError { |
| 281 | logger.Warnw(ctx, "Previous task cancelled. Exiting current task queue execution thread", log.Fields{"TaskCount": len(ts.queued)}) |
| 282 | return |
| 283 | } |
| 284 | _, pending := ts.popTask() |
| 285 | |
| 286 | if err != nil { |
| 287 | ts.failedTasks++ |
| 288 | } |
| 289 | if err == ErrCxtCancelError { |
| 290 | // TODO - This needs correction |
| 291 | ts.StopAll() |
| 292 | return |
| 293 | } |
| 294 | |
| 295 | if pending == 0 { |
| 296 | break |
| 297 | } |
| 298 | } |
| 299 | // logger.Debug(ctx, "Exiting Execution of tasks") |
| 300 | } |
| 301 | |
| 302 | func init() { |
| 303 | // Setup this package so that it's log level can be modified at run time |
| 304 | var err error |
Tinoj Joseph | 1d10832 | 2022-07-13 10:07:39 +0530 | [diff] [blame] | 305 | logger, err = log.AddPackageWithDefaultParam() |
Naveen Sampath | 04696f7 | 2022-06-13 15:19:14 +0530 | [diff] [blame] | 306 | if err != nil { |
| 307 | panic(err) |
| 308 | } |
| 309 | } |