blob: c232b6dd0eb160a3d205b4932d4ac2f871d07bb1 [file] [log] [blame]
Kent Hagermana05f4d42020-04-01 15:11:22 -04001/*
Joey Armstrong7a9af442024-01-03 19:26:36 -05002 * Copyright 2020-2024 Open Networking Foundation (ONF) and the ONF Contributors
Kent Hagermana05f4d42020-04-01 15:11:22 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package utils
18
19import (
20 "context"
21 "sync"
22)
23
24type request struct {
25 prev, next *request
26 notifyOnComplete chan<- struct{}
27}
28
29// RequestQueue represents a request processing queue where each request is processed to completion before another
30// request is given the green light to proceed.
31type RequestQueue struct {
Kent Hagermana05f4d42020-04-01 15:11:22 -040032 last, current *request
33 lastCompleteCh <-chan struct{}
Akash Reddy Kankanala929cc002025-04-08 15:05:21 +053034 mutex sync.Mutex
Kent Hagermana05f4d42020-04-01 15:11:22 -040035}
36
37// NewRequestQueue creates a new request queue
38func NewRequestQueue() *RequestQueue {
39 ch := make(chan struct{})
40 close(ch) // assume the "current" request is already complete
41 return &RequestQueue{lastCompleteCh: ch}
42}
43
44// WaitForGreenLight is invoked by a function processing a request to receive the green light before
45// proceeding. The caller can also provide a context with timeout. The timeout will be triggered if the wait is
46// too long (previous requests taking too long)
47func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
48 // add ourselves to the end of the queue
49 rq.mutex.Lock()
50 waitingOn := rq.lastCompleteCh
51
52 ch := make(chan struct{})
53 rq.lastCompleteCh = ch
54 r := &request{notifyOnComplete: ch}
55
56 if rq.last != nil {
57 rq.last.next, r.prev = r, rq.last
58 }
59 rq.last = r
60 rq.mutex.Unlock()
61
62 // wait for our turn
63 select {
64 case <-ctx.Done():
65 // canceled, so cleanup
66 rq.mutex.Lock()
67 defer rq.mutex.Unlock()
68
69 select {
70 case <-waitingOn:
71 // chan has been closed, so the lock has been acquired
72 // context is canceled, so just release the lock immediately
73 rq.current = r
74 rq.releaseWithoutLock()
75 default:
76 // on abort, skip our position in the queue
77 r.prev.notifyOnComplete = r.notifyOnComplete
78 // and remove ourselves from the queue
79 if r.next != nil { // if we are somewhere in the middle of the queue
80 r.prev.next = r.next
81 r.next.prev = r.prev
82 } else { // if we are at the end of the queue
83 rq.last = r.prev
84 r.prev.next = nil
85 }
86 }
87 return ctx.Err()
88
89 case <-waitingOn:
David K. Bainbridge5809b5b2020-08-27 00:07:41 +000090 // Previous request has signaled that it is complete.
91 // This request now can proceed as the active
92 // request
93
94 rq.mutex.Lock()
95 defer rq.mutex.Unlock()
Kent Hagermana05f4d42020-04-01 15:11:22 -040096 rq.current = r
97 return nil
98 }
99}
100
101// RequestComplete must be invoked by a process when it completes processing the request. That process must have
102// invoked WaitForGreenLight() before.
103func (rq *RequestQueue) RequestComplete() {
104 rq.mutex.Lock()
105 defer rq.mutex.Unlock()
106
107 rq.releaseWithoutLock()
108}
109
110func (rq *RequestQueue) releaseWithoutLock() {
111 // Notify the next waiting request. This will panic if the lock is released more than once.
112 close(rq.current.notifyOnComplete)
113
114 if rq.current.next != nil {
115 rq.current.next.prev = nil
116 }
117}