blob: 34c4c2e191c412b2758d4f2d6ababe5dc2fa1007 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package tasks
17
18import (
19 "context"
20 "errors"
21 "sync"
22 "time"
23
Tinoj Joseph1d108322022-07-13 10:07:39 +053024 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053025)
26
27var logger log.CLogger
28
29var (
30 // ErrCxtCancelError error
31 ErrCxtCancelError = errors.New("Context Cancelled")
32 // ErrTaskCancelError error
33 ErrTaskCancelError = errors.New("Task Cancelled")
34
35 ctx = context.TODO()
36)
37
38// TaskSet implements a set of dependent tasks into a single unit. The
39// tasks are added in the order they are expected to be executed. If any
40// of the tasks fails, the remaining tasks are not executed.
41
42// TaskSet structure
43type TaskSet struct {
44 name string
45 taskID uint8
46 timestamp string
47 queued []Task
48}
49
50// NewTaskSet is constructor for TaskSet
51func NewTaskSet(name string) *TaskSet {
52 var ts TaskSet
53 ts.name = name
54 tstamp := (time.Now()).Format(time.RFC3339Nano)
55 ts.timestamp = tstamp
56 return &ts
57}
58
59// Name to return name of the task
60func (ts *TaskSet) Name() string {
61 return ts.name
62}
63
64// TaskID to return task id of the task
65func (ts *TaskSet) TaskID() uint8 {
66 return ts.taskID
67}
68
69// Timestamp to return timestamp for the task
70func (ts *TaskSet) Timestamp() string {
71 return ts.timestamp
72}
73
74// AddTask to add task
75func (ts *TaskSet) AddTask(task Task) {
76 logger.Debugw(ctx, "Adding Task to TaskSet", log.Fields{"SetName": ts.name, "TaskName": task.Name()})
77 ts.queued = append(ts.queued, task)
78}
79
80// Start to start the task
81func (ts *TaskSet) Start(ctx context.Context, taskID uint8) error {
Tinoj Joseph1d108322022-07-13 10:07:39 +053082 logger.Debugw(ctx, "Starting Execution TaskSet", log.Fields{"SetName": ts.name})
Naveen Sampath04696f72022-06-13 15:19:14 +053083 ts.taskID = taskID
84 for len(ts.queued) != 0 {
85 task := ts.queued[0]
86 logger.Infow(ctx, "Starting Execution of task", log.Fields{"TaskName": task.Name()})
87 err := task.Start(ctx, ts.taskID)
88 if err != nil {
89 return err
90 }
91 task = ts.popTask()
92 logger.Infow(ctx, "Execution of task completed", log.Fields{"TaskName": task.Name()})
93 }
94 logger.Debug(ctx, "Exiting Execution of TaskSet")
95 return nil
96}
97
98// popTask is used internally to remove the task that is
99// is just completed.
100func (ts *TaskSet) popTask() Task {
101 var task Task
102 task, ts.queued = ts.queued[0], ts.queued[1:]
103 return task
104}
105
106// Stop is used internally to remove the task that is
107// is just completed.
108func (ts *TaskSet) Stop() {
109 var task Task
110 // Stop all the tasks and clean up
111 for size := len(ts.queued); size > 0; size = len(ts.queued) {
112 // Pop out the first task and clean up resources
113 task, ts.queued = ts.queued[0], ts.queued[1:]
114 task.Stop()
115 }
116}
117
118//***************************************************************************
119// Task Execution Environment
120// ------------------------------
121// The section below helps create an execution environment for tasks
122// of a single ONU. Addition and in sequence execution of tasks is
123// the main goal.
124
125// queued - holds tasks yet to be executed and the current in progress
126// taskID - This variable is used to generate unique task id for each task
127// currentTask - This holds the value of task being executed
128// timout - This variable sets the timeout value for all of the messages
129// stop - This provides a way of stopping the execution of next task
130
131// Tasks structure
132type Tasks struct {
133 queued []Task
134 taskID uint8
135 stop bool
136 totalTasks uint16
137 failedTasks uint16
138 lock sync.RWMutex
139 ctx context.Context
140}
141
142// NewTasks is constructor for Tasks
143func NewTasks(ctx context.Context) *Tasks {
144 var ts Tasks
145 ts.taskID = 0xff
146 ts.stop = false
147 ts.queued = []Task{}
148 ts.totalTasks = 0
149 ts.failedTasks = 0
150 ts.ctx = ctx
151 return &ts
152}
153
154// Initialize is used to initialize the embedded tasks structure within
155// each ONU.
156func (ts *Tasks) Initialize(ctx context.Context) {
157
158 //Send signal to stop any task which are being executed
159 ts.StopAll()
160 ts.taskID = 0xff
161 ts.ctx = ctx
162}
163
164// CheckAndInitialize is used to initialize the embedded tasks structure within
165// NNI and resets taskID only when there are no pending tasks
166func (ts *Tasks) CheckAndInitialize(ctx context.Context) {
167
168 ts.lock.Lock()
169 logger.Infow(ctx, "Queued Tasks", log.Fields{"Count": len(ts.queued)})
170 if len(ts.queued) == 0 {
171 ts.lock.Unlock()
172 ts.Initialize(ctx)
173 return
174 }
175 ts.ctx = ctx
176 ts.lock.Unlock()
177}
178
179// getNewTaskId generates a unique task-id for each new task. The
180// transaction-ids are generated for the task-ids.
181func (ts *Tasks) getNewTaskID() uint8 {
182 ts.taskID++
183 return ts.taskID
184}
185
186// GetContext to get context of the task
187func (ts *Tasks) GetContext() context.Context {
188 return ts.ctx
189}
190
191// AddTask adds a task and executes it if there is no task
192// pending execution. The execution happens on a seperate thread.
193// The tasks are maintained per ONU. This structure is instantiated
194// one per ONU
195func (ts *Tasks) AddTask(task Task) {
196 ts.lock.Lock()
197 defer ts.lock.Unlock()
198
199 // logger.Infow(ctx, "Adding Task", log.Fields{"TaskName": task.Name()})
200 ts.queued = append(ts.queued, task)
201 if ts.queued[0] == task {
202 go ts.executeTasks()
203 }
204}
205
206// TotalTasks returns the total number of tasks completed by the
207// the execution of the tasks.
208func (ts *Tasks) TotalTasks() uint16 {
209 return ts.totalTasks
210}
211
212// StopAll stops the execution of the tasks and cleans up
213// everything associated with the tasks
214func (ts *Tasks) StopAll() {
215 ts.lock.Lock()
216 defer ts.lock.Unlock()
217
218 ts.stop = true
219 logger.Infow(ctx, "Stopping all tasks in queue", log.Fields{"TaskCount": len(ts.queued)})
220
221 if len(ts.queued) > 0 {
222 ts.queued = ts.queued[:1]
223 logger.Warnw(ctx, "Skipping Current Task", log.Fields{"Task": ts.queued[0].Name()})
224 }
225 ts.stop = false
226}
227
228// popTask is used internally to remove the task that is
229// is just completed.
230func (ts *Tasks) popTask() (Task, int) {
231 ts.lock.Lock()
232 defer ts.lock.Unlock()
233
234 var task Task
235 queueLen := len(ts.queued)
236 if queueLen > 0 {
237 task = ts.queued[0]
238 ts.queued = append(ts.queued[:0], ts.queued[0+1:]...)
239 } else {
240 logger.Errorw(ctx, "Trying to remove task from empty Task List", log.Fields{"#task ": queueLen})
241 }
242
243 return task, len(ts.queued)
244}
245
246// NumPendingTasks returns the count of tasks that are either in progress or
247// yet to be executed. The first in the list is the in progress
248// task.
249func (ts *Tasks) NumPendingTasks() uint16 {
250 return uint16(len(ts.queued))
251}
252
253// GetTaskList returns the list of tasks that are either in progress or
254// yet to be executed. The first in the list is the in progress
255// task.
256func (ts *Tasks) GetTaskList() []Task {
257 taskList := []Task{}
258 return append(taskList, ts.queued...)
259}
260
261// CurrentTask returns the task that is currently running. This can be
262// used for verifying upon unforseen failures for debugging from
263// with the code
264func (ts *Tasks) CurrentTask() Task {
265 return ts.queued[0]
266}
267
268// executeTasks executes the pending tasks one by one. The tasks are attempted
269// one after another to avoid two tasks simultaneously operating on the
270// same ONU.
271func (ts *Tasks) executeTasks() {
272 // logger.Debug(ctx, "Starting Execution of tasks")
273 for (len(ts.queued) != 0) && (!ts.stop) {
274 task := ts.queued[0]
275 taskID := ts.getNewTaskID()
276 // logger.Infow(ctx, "Starting Execution of task", log.Fields{"TaskName": task.Name()})
277 ts.totalTasks++
278
279 err := task.Start(ts.ctx, taskID)
280 if err == ErrTaskCancelError {
281 logger.Warnw(ctx, "Previous task cancelled. Exiting current task queue execution thread", log.Fields{"TaskCount": len(ts.queued)})
282 return
283 }
284 _, pending := ts.popTask()
285
286 if err != nil {
287 ts.failedTasks++
288 }
289 if err == ErrCxtCancelError {
290 // TODO - This needs correction
291 ts.StopAll()
292 return
293 }
294
295 if pending == 0 {
296 break
297 }
298 }
299 // logger.Debug(ctx, "Exiting Execution of tasks")
300}
301
302func init() {
303 // Setup this package so that it's log level can be modified at run time
304 var err error
Tinoj Joseph1d108322022-07-13 10:07:39 +0530305 logger, err = log.AddPackageWithDefaultParam()
Naveen Sampath04696f72022-06-13 15:19:14 +0530306 if err != nil {
307 panic(err)
308 }
309}