blob: 1d744c4b2ff6d40a2201b2fd007ea6cc82d3ae7f [file] [log] [blame]
/*
* Copyright 2019-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package afrouter
import (
"context"
"encoding/hex"
"errors"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
"sync"
)
type request struct {
mutex sync.Mutex
activeResponseStreamOnce sync.Once
setResponseHeaderOnce sync.Once
responseStreamMutex sync.Mutex
streams map[string]grpc.ClientStream
requestFrameBacklog [][]byte
responseErrChan chan error
sendClosed bool
backend *backend
ctx context.Context
serverStream grpc.ServerStream
methodInfo methodDetails
requestFrame *requestFrame
responseFrame *responseFrame
isStreamingRequest bool
isStreamingResponse bool
}
var transactionNotAcquiredErrorString = status.Error(codes.Canceled, "transaction-not-acquired").Error()
// catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked
func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) {
r.streams[connName] = stream
// prime new streams with any traffic they might have missed (non-streaming requests only)
frame := *r.requestFrame // local copy of frame
for _, payload := range r.requestFrameBacklog {
frame.payload = payload
if err := stream.SendMsg(&frame); err != nil {
log.Debugf("Error on SendMsg: %s", err.Error())
break
}
}
if r.sendClosed {
err := stream.CloseSend()
if err != nil {
log.Errorf("%v", err)
}
}
r.mutex.Unlock()
r.forwardResponseStream(connName, stream)
}
// forwardResponseStream forwards the response stream
func (r *request) forwardResponseStream(connName string, stream grpc.ClientStream) {
var queuedFrames [][]byte
frame := *r.responseFrame
var err error
activeStream := false
for {
err = stream.RecvMsg(&frame)
// if this is an inactive responder, ignore everything it sends
if err != nil && err.Error() == transactionNotAcquiredErrorString {
break
}
// the first thread to reach this point (first to receive a response frame) will become the active stream
r.activeResponseStreamOnce.Do(func() { activeStream = true })
if err != nil {
// this can be io.EOF which is the success case
break
}
if r.isStreamingResponse {
// streaming response - send immediately
if err = r.sendResponseFrame(stream, frame); err != nil {
break
}
} else { // !r.isStreamingResponse
if r.isStreamingRequest { // && !r.isStreamingResponse
// queue the frame (only send response when the last stream closes)
queuedFrames = append(queuedFrames, frame.payload)
} else { // !r.isStreamingRequest && !r.isStreamingResponse
// only the active stream will respond
if activeStream { // && !r.isStreamingRequest && !r.isStreamingResponse
// send the response immediately
if err = r.sendResponseFrame(stream, frame); err != nil {
break
}
}
// !activeStream && !r.isStreamingRequest && !r.isStreamingResponse
// just read & discard until the stream dies
}
}
}
log.Debugf("Closing stream to %s", connName)
// io.EOF is the success case
if err == io.EOF {
err = nil
}
// this double-lock sets off alarm bells in my head
r.backend.mutex.Lock()
r.mutex.Lock()
delete(r.streams, connName)
streamsLeft := len(r.streams)
// handle the case where no cores are the active responder. Should never happen, but just in case...
if streamsLeft == 0 {
r.activeResponseStreamOnce.Do(func() { activeStream = true })
}
// if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests)
if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) {
// request is complete, cleanup
delete(r.backend.activeRequests, r)
r.mutex.Unlock()
r.backend.mutex.Unlock()
// send any queued frames we have (streaming request & !streaming response only, but no harm trying in other cases)
for _, payload := range queuedFrames {
if err != nil {
// if there's been an error, don't try to send anymore
break
}
frame.payload = payload
err = r.sendResponseFrame(stream, frame)
}
// We may have received Trailers as part of the call.
r.serverStream.SetTrailer(stream.Trailer())
// response stream complete
r.responseErrChan <- err
} else {
r.mutex.Unlock()
r.backend.mutex.Unlock()
}
}
func (r *request) sendResponseFrame(stream grpc.ClientStream, f responseFrame) error {
r.responseStreamMutex.Lock()
defer r.responseStreamMutex.Unlock()
// the header should only be set once, even if multiple streams can respond.
setHeader := false
r.setResponseHeaderOnce.Do(func() { setHeader = true })
if setHeader {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
md, err := stream.Header()
if err != nil {
return err
}
// Update the metadata for the response.
if f.metaKey != NoMeta {
if f.metaVal == "" {
// We could also always just do this
md.Set(f.metaKey, f.backend.name)
} else {
md.Set(f.metaKey, f.metaVal)
}
}
if err := r.serverStream.SendHeader(md); err != nil {
return err
}
}
log.Debugf("Response frame %s", hex.EncodeToString(f.payload))
return r.serverStream.SendMsg(&f)
}
func (r *request) sendAll(frame *requestFrame) error {
r.mutex.Lock()
if !r.isStreamingRequest {
// save frames of non-streaming requests, so we can catchup new streams
r.requestFrameBacklog = append(r.requestFrameBacklog, frame.payload)
}
// send to all existing streams
streams := make(map[string]grpc.ClientStream, len(r.streams))
for n, s := range r.streams {
streams[n] = s
}
r.mutex.Unlock()
var rtrn error
atLeastOne := false
atLeastOneSuccess := false
for _, stream := range streams {
if err := stream.SendMsg(frame); err != nil {
log.Debugf("Error on SendMsg: %s", err.Error())
rtrn = err
} else {
atLeastOneSuccess = true
}
atLeastOne = true
}
// If one of the streams succeeded, declare success
// if none did pick an error and return it.
if atLeastOne {
if atLeastOneSuccess {
return nil
} else {
return rtrn
}
} else {
err := errors.New("unable to send, all streams have closed")
log.Error(err)
return err
}
}
func (r *request) forwardRequestStream(src grpc.ServerStream) error {
// The frame buffer already has the results of a first
// RecvMsg in it so the first thing to do is to
// send it to the list of client streams and only
// then read some more.
frame := *r.requestFrame // local copy of frame
var rtrn error
for {
// Send the message to each of the backend streams
if err := r.sendAll(&frame); err != nil {
log.Debugf("SendAll failed %s", err.Error())
rtrn = err
break
}
log.Debugf("Request frame %s", hex.EncodeToString(frame.payload))
if err := src.RecvMsg(&frame); err != nil {
rtrn = err // this can be io.EOF which is happy case
break
}
}
r.mutex.Lock()
log.Debug("Closing southbound streams")
r.sendClosed = true
for _, stream := range r.streams {
err := stream.CloseSend()
if err != nil {
log.Errorf("%v", err)
}
}
r.mutex.Unlock()
if rtrn != io.EOF {
log.Debugf("s2cErr reporting %v", rtrn)
return rtrn
}
log.Debug("s2cErr reporting EOF")
// this is the successful case where the sender has encountered io.EOF, and won't be sending anymore.
// the clientStream>serverStream may continue sending though.
return nil
}