blob: aacb8ae2c11e4d10b24d9db5acdc069b8dea9e1b [file] [log] [blame]
khenaidoo1ce37ad2019-03-24 22:07:24 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
khenaidoo1ce37ad2019-03-24 22:07:24 -040017package utils
18
khenaidoo2c6a0992019-04-29 13:46:56 -040019import (
khenaidoo442e7c72020-03-10 16:13:48 -040020 "context"
khenaidoo631fe542019-05-31 15:44:43 -040021 "os"
khenaidoo442e7c72020-03-10 16:13:48 -040022 "sync"
khenaidoo2c6a0992019-04-29 13:46:56 -040023 "time"
npujar1d86a522019-11-14 17:11:16 +053024
25 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/status"
khenaidoo2c6a0992019-04-29 13:46:56 -040027)
28
khenaidoo442e7c72020-03-10 16:13:48 -040029// ResponseCallback is the function signature for callbacks to execute after a response is received.
30type ResponseCallback func(rpc string, response interface{}, reqArgs ...interface{})
31
npujar1d86a522019-11-14 17:11:16 +053032// DeviceID represent device id attribute
khenaidoo1ce37ad2019-03-24 22:07:24 -040033type DeviceID struct {
npujar1d86a522019-11-14 17:11:16 +053034 ID string
khenaidoo1ce37ad2019-03-24 22:07:24 -040035}
36
npujar1d86a522019-11-14 17:11:16 +053037// LogicalDeviceID rpresent logical device id attribute
khenaidoo1ce37ad2019-03-24 22:07:24 -040038type LogicalDeviceID struct {
npujar1d86a522019-11-14 17:11:16 +053039 ID string
khenaidoo1ce37ad2019-03-24 22:07:24 -040040}
khenaidoo2c6a0992019-04-29 13:46:56 -040041
npujar1d86a522019-11-14 17:11:16 +053042// GetHostName returns host name
khenaidoo631fe542019-05-31 15:44:43 -040043func GetHostName() string {
44 return os.Getenv("HOSTNAME")
45}
46
khenaidoo442e7c72020-03-10 16:13:48 -040047type request struct {
Kent Hagerman730cbdf2020-03-31 12:22:08 -040048 prev, next *request
49 notifyOnComplete chan<- struct{}
khenaidoo442e7c72020-03-10 16:13:48 -040050}
51
52// RequestQueue represents a request processing queue where each request is processed to completion before another
53// request is given the green light to proceed.
54type RequestQueue struct {
Kent Hagerman730cbdf2020-03-31 12:22:08 -040055 mutex sync.Mutex
56
57 last, current *request
58 lastCompleteCh <-chan struct{}
khenaidoo442e7c72020-03-10 16:13:48 -040059}
60
Kent Hagerman730cbdf2020-03-31 12:22:08 -040061// NewRequestQueue creates a new request queue
62func NewRequestQueue() *RequestQueue {
63 ch := make(chan struct{})
64 close(ch) // assume the "current" request is already complete
65 return &RequestQueue{lastCompleteCh: ch}
khenaidoo442e7c72020-03-10 16:13:48 -040066}
67
68// WaitForGreenLight is invoked by a function processing a request to receive the green light before
69// proceeding. The caller can also provide a context with timeout. The timeout will be triggered if the wait is
70// too long (previous requests taking too long)
71func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
Kent Hagerman730cbdf2020-03-31 12:22:08 -040072 // add ourselves to the end of the queue
73 rq.mutex.Lock()
74 waitingOn := rq.lastCompleteCh
75
76 ch := make(chan struct{})
77 rq.lastCompleteCh = ch
78 r := &request{notifyOnComplete: ch}
79
80 if rq.last != nil {
81 rq.last.next, r.prev = r, rq.last
khenaidoo442e7c72020-03-10 16:13:48 -040082 }
Kent Hagerman730cbdf2020-03-31 12:22:08 -040083 rq.last = r
84 rq.mutex.Unlock()
85
86 // wait for our turn
khenaidoo442e7c72020-03-10 16:13:48 -040087 select {
khenaidoo442e7c72020-03-10 16:13:48 -040088 case <-ctx.Done():
Kent Hagerman730cbdf2020-03-31 12:22:08 -040089 // canceled, so cleanup
90 rq.mutex.Lock()
91 defer rq.mutex.Unlock()
92
Kent Hagerman914d5332020-04-01 12:38:38 -040093 select {
94 case <-waitingOn:
95 // chan has been closed, so the lock has been acquired
96 // context is canceled, so just release the lock immediately
97 rq.current = r
98 rq.releaseWithoutLock()
99 default:
Kent Hagerman730cbdf2020-03-31 12:22:08 -0400100 // on abort, skip our position in the queue
101 r.prev.notifyOnComplete = r.notifyOnComplete
102 // and remove ourselves from the queue
103 if r.next != nil { // if we are somewhere in the middle of the queue
104 r.prev.next = r.next
105 r.next.prev = r.prev
106 } else { // if we are at the end of the queue
107 rq.last = r.prev
108 r.prev.next = nil
109 }
Kent Hagerman730cbdf2020-03-31 12:22:08 -0400110 }
khenaidoo442e7c72020-03-10 16:13:48 -0400111 return ctx.Err()
Kent Hagerman730cbdf2020-03-31 12:22:08 -0400112
113 case <-waitingOn:
114 // lock is acquired
115 rq.current = r
116 return nil
khenaidoo442e7c72020-03-10 16:13:48 -0400117 }
118}
119
120// RequestComplete must be invoked by a process when it completes processing the request. That process must have
121// invoked WaitForGreenLight() before.
122func (rq *RequestQueue) RequestComplete() {
Kent Hagerman730cbdf2020-03-31 12:22:08 -0400123 rq.mutex.Lock()
124 defer rq.mutex.Unlock()
125
126 rq.releaseWithoutLock()
khenaidoo442e7c72020-03-10 16:13:48 -0400127}
128
Kent Hagerman730cbdf2020-03-31 12:22:08 -0400129func (rq *RequestQueue) releaseWithoutLock() {
130 // Notify the next waiting request. This will panic if the lock is released more than once.
131 close(rq.current.notifyOnComplete)
132
133 if rq.current.next != nil {
134 rq.current.next.prev = nil
135 }
khenaidoo442e7c72020-03-10 16:13:48 -0400136}
137
npujar1d86a522019-11-14 17:11:16 +0530138// Response -
Kent Hagerman8da2f1e2019-11-25 17:28:09 -0500139type Response struct {
140 *response
141}
142type response struct {
143 err error
144 ch chan struct{}
145 done bool
146}
147
npujar1d86a522019-11-14 17:11:16 +0530148// NewResponse -
Kent Hagerman8da2f1e2019-11-25 17:28:09 -0500149func NewResponse() Response {
150 return Response{
151 &response{
152 ch: make(chan struct{}),
153 },
154 }
155}
156
A R Karthick5c28f552019-12-11 22:47:44 -0800157// Fake a completed response.
158func DoneResponse() Response {
159 r := Response{
160 &response{
161 err: nil,
162 ch: make(chan struct{}),
163 done: true,
164 },
165 }
166 close(r.ch)
167 return r
168}
169
Kent Hagerman8da2f1e2019-11-25 17:28:09 -0500170// Error sends a response with the given error. It may only be called once.
171func (r Response) Error(err error) {
172 // if this is called twice, it will panic; this is intentional
173 r.err = err
174 r.done = true
175 close(r.ch)
176}
177
178// Done sends a non-error response unless Error has already been called, in which case this is a no-op.
179func (r Response) Done() {
180 if !r.done {
181 close(r.ch)
182 }
183}
184
khenaidoo2c6a0992019-04-29 13:46:56 -0400185//WaitForNilOrErrorResponses waits on a variadic number of channels for either a nil response or an error
186//response. If an error is received from a given channel then the returned error array will contain that error.
187//The error will be at the index corresponding to the order in which the channel appear in the parameter list.
188//If no errors is found then nil is returned. This method also takes in a timeout in milliseconds. If a
189//timeout is obtained then this function will stop waiting for the remaining responses and abort.
khenaidoo442e7c72020-03-10 16:13:48 -0400190func WaitForNilOrErrorResponses(timeout time.Duration, responses ...Response) []error {
Kent Hagerman8da2f1e2019-11-25 17:28:09 -0500191 timedOut := make(chan struct{})
khenaidoo442e7c72020-03-10 16:13:48 -0400192 timer := time.AfterFunc(timeout, func() { close(timedOut) })
Kent Hagerman8da2f1e2019-11-25 17:28:09 -0500193 defer timer.Stop()
khenaidoo2c6a0992019-04-29 13:46:56 -0400194
Kent Hagerman8da2f1e2019-11-25 17:28:09 -0500195 gotError := false
196 errors := make([]error, 0, len(responses))
197 for _, response := range responses {
198 var err error
199 select {
200 case <-response.ch:
201 // if a response is already available, use it
202 err = response.err
203 default:
204 // otherwise, wait for either a response or a timeout
205 select {
206 case <-response.ch:
207 err = response.err
208 case <-timedOut:
209 err = status.Error(codes.Aborted, "timeout")
khenaidoo2c6a0992019-04-29 13:46:56 -0400210 }
khenaidoo2c6a0992019-04-29 13:46:56 -0400211 }
Kent Hagerman8da2f1e2019-11-25 17:28:09 -0500212 gotError = gotError || err != nil
213 errors = append(errors, err)
khenaidoo2c6a0992019-04-29 13:46:56 -0400214 }
215
Kent Hagerman8da2f1e2019-11-25 17:28:09 -0500216 if gotError {
khenaidoo2c6a0992019-04-29 13:46:56 -0400217 return errors
218 }
219 return nil
220}