blob: cfe35f0f15d46e57bb1923a31d1564a96dd304a2 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package tasks
17
18import (
19 "context"
20 "errors"
21 "sync"
22 "time"
23
Tinoj Joseph1d108322022-07-13 10:07:39 +053024 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053025)
26
27var logger log.CLogger
28
29var (
30 // ErrCxtCancelError error
vinokuma926cb3e2023-03-29 11:41:06 +053031 ErrCxtCancelError = errors.New("Context Canceled")
Naveen Sampath04696f72022-06-13 15:19:14 +053032 // ErrTaskCancelError error
vinokuma926cb3e2023-03-29 11:41:06 +053033 ErrTaskCancelError = errors.New("Task Canceled")
Naveen Sampath04696f72022-06-13 15:19:14 +053034
35 ctx = context.TODO()
36)
37
38// TaskSet implements a set of dependent tasks into a single unit. The
39// tasks are added in the order they are expected to be executed. If any
40// of the tasks fails, the remaining tasks are not executed.
41
42// TaskSet structure
43type TaskSet struct {
44 name string
Naveen Sampath04696f72022-06-13 15:19:14 +053045 timestamp string
46 queued []Task
vinokuma926cb3e2023-03-29 11:41:06 +053047 taskID uint8
Naveen Sampath04696f72022-06-13 15:19:14 +053048}
49
50// NewTaskSet is constructor for TaskSet
51func NewTaskSet(name string) *TaskSet {
52 var ts TaskSet
53 ts.name = name
54 tstamp := (time.Now()).Format(time.RFC3339Nano)
55 ts.timestamp = tstamp
56 return &ts
57}
58
59// Name to return name of the task
60func (ts *TaskSet) Name() string {
61 return ts.name
62}
63
64// TaskID to return task id of the task
65func (ts *TaskSet) TaskID() uint8 {
66 return ts.taskID
67}
68
69// Timestamp to return timestamp for the task
70func (ts *TaskSet) Timestamp() string {
71 return ts.timestamp
72}
73
74// AddTask to add task
75func (ts *TaskSet) AddTask(task Task) {
76 logger.Debugw(ctx, "Adding Task to TaskSet", log.Fields{"SetName": ts.name, "TaskName": task.Name()})
77 ts.queued = append(ts.queued, task)
78}
79
80// Start to start the task
81func (ts *TaskSet) Start(ctx context.Context, taskID uint8) error {
Tinoj Joseph1d108322022-07-13 10:07:39 +053082 logger.Debugw(ctx, "Starting Execution TaskSet", log.Fields{"SetName": ts.name})
Naveen Sampath04696f72022-06-13 15:19:14 +053083 ts.taskID = taskID
84 for len(ts.queued) != 0 {
85 task := ts.queued[0]
86 logger.Infow(ctx, "Starting Execution of task", log.Fields{"TaskName": task.Name()})
87 err := task.Start(ctx, ts.taskID)
88 if err != nil {
89 return err
90 }
91 task = ts.popTask()
92 logger.Infow(ctx, "Execution of task completed", log.Fields{"TaskName": task.Name()})
93 }
94 logger.Debug(ctx, "Exiting Execution of TaskSet")
95 return nil
96}
97
98// popTask is used internally to remove the task that is
99// is just completed.
100func (ts *TaskSet) popTask() Task {
101 var task Task
102 task, ts.queued = ts.queued[0], ts.queued[1:]
103 return task
104}
105
106// Stop is used internally to remove the task that is
107// is just completed.
108func (ts *TaskSet) Stop() {
109 var task Task
110 // Stop all the tasks and clean up
111 for size := len(ts.queued); size > 0; size = len(ts.queued) {
112 // Pop out the first task and clean up resources
113 task, ts.queued = ts.queued[0], ts.queued[1:]
114 task.Stop()
115 }
116}
117
118//***************************************************************************
119// Task Execution Environment
120// ------------------------------
121// The section below helps create an execution environment for tasks
122// of a single ONU. Addition and in sequence execution of tasks is
123// the main goal.
124
125// queued - holds tasks yet to be executed and the current in progress
126// taskID - This variable is used to generate unique task id for each task
127// currentTask - This holds the value of task being executed
128// timout - This variable sets the timeout value for all of the messages
129// stop - This provides a way of stopping the execution of next task
130
131// Tasks structure
132type Tasks struct {
vinokuma926cb3e2023-03-29 11:41:06 +0530133 ctx context.Context
Naveen Sampath04696f72022-06-13 15:19:14 +0530134 queued []Task
vinokuma926cb3e2023-03-29 11:41:06 +0530135 lock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530136 totalTasks uint16
137 failedTasks uint16
vinokuma926cb3e2023-03-29 11:41:06 +0530138 taskID uint8
139 stop bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530140}
141
142// NewTasks is constructor for Tasks
143func NewTasks(ctx context.Context) *Tasks {
144 var ts Tasks
145 ts.taskID = 0xff
146 ts.stop = false
147 ts.queued = []Task{}
148 ts.totalTasks = 0
149 ts.failedTasks = 0
150 ts.ctx = ctx
151 return &ts
152}
153
154// Initialize is used to initialize the embedded tasks structure within
155// each ONU.
156func (ts *Tasks) Initialize(ctx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530157 //Send signal to stop any task which are being executed
158 ts.StopAll()
159 ts.taskID = 0xff
160 ts.ctx = ctx
161}
162
163// CheckAndInitialize is used to initialize the embedded tasks structure within
164// NNI and resets taskID only when there are no pending tasks
165func (ts *Tasks) CheckAndInitialize(ctx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530166 ts.lock.Lock()
167 logger.Infow(ctx, "Queued Tasks", log.Fields{"Count": len(ts.queued)})
168 if len(ts.queued) == 0 {
169 ts.lock.Unlock()
170 ts.Initialize(ctx)
171 return
172 }
173 ts.ctx = ctx
174 ts.lock.Unlock()
175}
176
177// getNewTaskId generates a unique task-id for each new task. The
178// transaction-ids are generated for the task-ids.
179func (ts *Tasks) getNewTaskID() uint8 {
180 ts.taskID++
181 return ts.taskID
182}
183
184// GetContext to get context of the task
185func (ts *Tasks) GetContext() context.Context {
186 return ts.ctx
187}
188
189// AddTask adds a task and executes it if there is no task
vinokuma926cb3e2023-03-29 11:41:06 +0530190// pending execution. The execution happens on a separate thread.
Naveen Sampath04696f72022-06-13 15:19:14 +0530191// The tasks are maintained per ONU. This structure is instantiated
192// one per ONU
193func (ts *Tasks) AddTask(task Task) {
194 ts.lock.Lock()
195 defer ts.lock.Unlock()
196
197 // logger.Infow(ctx, "Adding Task", log.Fields{"TaskName": task.Name()})
198 ts.queued = append(ts.queued, task)
199 if ts.queued[0] == task {
200 go ts.executeTasks()
201 }
202}
203
204// TotalTasks returns the total number of tasks completed by the
205// the execution of the tasks.
206func (ts *Tasks) TotalTasks() uint16 {
207 return ts.totalTasks
208}
209
210// StopAll stops the execution of the tasks and cleans up
211// everything associated with the tasks
212func (ts *Tasks) StopAll() {
213 ts.lock.Lock()
214 defer ts.lock.Unlock()
215
216 ts.stop = true
217 logger.Infow(ctx, "Stopping all tasks in queue", log.Fields{"TaskCount": len(ts.queued)})
218
219 if len(ts.queued) > 0 {
220 ts.queued = ts.queued[:1]
221 logger.Warnw(ctx, "Skipping Current Task", log.Fields{"Task": ts.queued[0].Name()})
222 }
223 ts.stop = false
224}
225
226// popTask is used internally to remove the task that is
227// is just completed.
228func (ts *Tasks) popTask() (Task, int) {
229 ts.lock.Lock()
230 defer ts.lock.Unlock()
231
232 var task Task
233 queueLen := len(ts.queued)
234 if queueLen > 0 {
235 task = ts.queued[0]
236 ts.queued = append(ts.queued[:0], ts.queued[0+1:]...)
237 } else {
238 logger.Errorw(ctx, "Trying to remove task from empty Task List", log.Fields{"#task ": queueLen})
239 }
240
241 return task, len(ts.queued)
242}
243
244// NumPendingTasks returns the count of tasks that are either in progress or
245// yet to be executed. The first in the list is the in progress
246// task.
247func (ts *Tasks) NumPendingTasks() uint16 {
248 return uint16(len(ts.queued))
249}
250
251// GetTaskList returns the list of tasks that are either in progress or
252// yet to be executed. The first in the list is the in progress
253// task.
254func (ts *Tasks) GetTaskList() []Task {
255 taskList := []Task{}
256 return append(taskList, ts.queued...)
257}
258
259// CurrentTask returns the task that is currently running. This can be
vinokuma926cb3e2023-03-29 11:41:06 +0530260// used for verifying upon unforeseen failures for debugging from
Naveen Sampath04696f72022-06-13 15:19:14 +0530261// with the code
262func (ts *Tasks) CurrentTask() Task {
263 return ts.queued[0]
264}
265
266// executeTasks executes the pending tasks one by one. The tasks are attempted
267// one after another to avoid two tasks simultaneously operating on the
268// same ONU.
269func (ts *Tasks) executeTasks() {
270 // logger.Debug(ctx, "Starting Execution of tasks")
271 for (len(ts.queued) != 0) && (!ts.stop) {
272 task := ts.queued[0]
273 taskID := ts.getNewTaskID()
274 // logger.Infow(ctx, "Starting Execution of task", log.Fields{"TaskName": task.Name()})
275 ts.totalTasks++
276
277 err := task.Start(ts.ctx, taskID)
278 if err == ErrTaskCancelError {
vinokuma926cb3e2023-03-29 11:41:06 +0530279 logger.Warnw(ctx, "Previous task canceled. Exiting current task queue execution thread", log.Fields{"TaskCount": len(ts.queued)})
Naveen Sampath04696f72022-06-13 15:19:14 +0530280 return
281 }
282 _, pending := ts.popTask()
283
284 if err != nil {
285 ts.failedTasks++
286 }
287 if err == ErrCxtCancelError {
288 // TODO - This needs correction
289 ts.StopAll()
290 return
291 }
292
293 if pending == 0 {
294 break
295 }
296 }
297 // logger.Debug(ctx, "Exiting Execution of tasks")
298}
299
300func init() {
301 // Setup this package so that it's log level can be modified at run time
302 var err error
Tinoj Joseph1d108322022-07-13 10:07:39 +0530303 logger, err = log.AddPackageWithDefaultParam()
Naveen Sampath04696f72022-06-13 15:19:14 +0530304 if err != nil {
305 panic(err)
306 }
307}