blob: bffd9c4d26632924eba70e3dba199e41c0cc4ff6 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"context"
"os"
"sync"
"time"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// ResponseCallback is the function signature for callbacks to execute after a response is received.
type ResponseCallback func(rpc string, response interface{}, reqArgs ...interface{})
// DeviceID represent device id attribute
type DeviceID struct {
ID string
}
// LogicalDeviceID rpresent logical device id attribute
type LogicalDeviceID struct {
ID string
}
// GetHostName returns host name
func GetHostName() string {
return os.Getenv("HOSTNAME")
}
type request struct {
channel chan struct{}
done chan struct{}
}
func newRequest() *request {
return &request{
channel: make(chan struct{}),
done: make(chan struct{}),
}
}
// RequestQueue represents a request processing queue where each request is processed to completion before another
// request is given the green light to proceed.
type RequestQueue struct {
queue chan *request
requestCompleteIndication chan struct{}
queueID string
stopOnce sync.Once
stopped bool
}
// NewRequestQueue creates a new request queue. maxQueueSize is the maximum size of the queue. queueID is used mostly
// for logging.
func NewRequestQueue(queueID string, maxQueueSize int) *RequestQueue {
return &RequestQueue{
queueID: queueID,
queue: make(chan *request, maxQueueSize),
requestCompleteIndication: make(chan struct{}),
}
}
// Start starts the request processing queue in its own go routine
func (rq *RequestQueue) Start() {
go func() {
for {
req, ok := <-rq.queue
if !ok {
log.Warnw("request-sequencer-queue-closed", log.Fields{"id": rq.queueID})
break
}
// If the request is waiting then closing the reqChnl will trigger the request to proceed. Otherwise,
// if the request was cancelled then this will just clean up.
close(req.channel)
// Wait for either a request complete indication or a request aborted due to timeout
select {
case <-req.done:
case <-rq.requestCompleteIndication:
}
}
}()
}
// WaitForGreenLight is invoked by a function processing a request to receive the green light before
// proceeding. The caller can also provide a context with timeout. The timeout will be triggered if the wait is
// too long (previous requests taking too long)
func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
if rq.stopped {
return status.Errorf(codes.Aborted, "queue-already-stopped-%s", rq.queueID)
}
request := newRequest()
// Queue the request
rq.queue <- request
select {
case <-request.channel:
return nil
case <-ctx.Done():
close(request.done)
return ctx.Err()
}
}
// RequestComplete must be invoked by a process when it completes processing the request. That process must have
// invoked WaitForGreenLight() before.
func (rq *RequestQueue) RequestComplete() {
if !rq.stopped {
rq.requestCompleteIndication <- struct{}{}
}
}
// Stop must only be invoked by the process that started the request queue. Prior to invoking Stop, WaitForGreenLight
// must be invoked.
func (rq *RequestQueue) Stop() {
rq.stopOnce.Do(func() {
rq.stopped = true
close(rq.requestCompleteIndication)
close(rq.queue)
})
}
// Response -
type Response struct {
*response
}
type response struct {
err error
ch chan struct{}
done bool
}
// NewResponse -
func NewResponse() Response {
return Response{
&response{
ch: make(chan struct{}),
},
}
}
// Fake a completed response.
func DoneResponse() Response {
r := Response{
&response{
err: nil,
ch: make(chan struct{}),
done: true,
},
}
close(r.ch)
return r
}
// Error sends a response with the given error. It may only be called once.
func (r Response) Error(err error) {
// if this is called twice, it will panic; this is intentional
r.err = err
r.done = true
close(r.ch)
}
// Done sends a non-error response unless Error has already been called, in which case this is a no-op.
func (r Response) Done() {
if !r.done {
close(r.ch)
}
}
//WaitForNilOrErrorResponses waits on a variadic number of channels for either a nil response or an error
//response. If an error is received from a given channel then the returned error array will contain that error.
//The error will be at the index corresponding to the order in which the channel appear in the parameter list.
//If no errors is found then nil is returned. This method also takes in a timeout in milliseconds. If a
//timeout is obtained then this function will stop waiting for the remaining responses and abort.
func WaitForNilOrErrorResponses(timeout time.Duration, responses ...Response) []error {
timedOut := make(chan struct{})
timer := time.AfterFunc(timeout, func() { close(timedOut) })
defer timer.Stop()
gotError := false
errors := make([]error, 0, len(responses))
for _, response := range responses {
var err error
select {
case <-response.ch:
// if a response is already available, use it
err = response.err
default:
// otherwise, wait for either a response or a timeout
select {
case <-response.ch:
err = response.err
case <-timedOut:
err = status.Error(codes.Aborted, "timeout")
}
}
gotError = gotError || err != nil
errors = append(errors, err)
}
if gotError {
return errors
}
return nil
}