[VOL-1913] Replies with errors of the form "transaction-not-acquired" are now ignored.
Also had to add a conditional branch in case no valid responses are received (the last thread to exit must clean up the connection).
The rw cores will now respond with a "transaction-not-acquired" error in place of "failed-to-seize-request" or COMPLETED_BY_OTHER.
Change-Id: I199d4a0091ba4fc1db5b8097adbad951408e5034
diff --git a/afrouter/afrouter/request.go b/afrouter/afrouter/request.go
index 0af20e3..8fc15aa 100644
--- a/afrouter/afrouter/request.go
+++ b/afrouter/afrouter/request.go
@@ -22,6 +22,8 @@
"errors"
"github.com/opencord/voltha-go/common/log"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"io"
"sync"
)
@@ -48,6 +50,8 @@
isStreamingResponse bool
}
+var transactionNotAcquiredErrorString = status.Error(codes.Canceled, "transaction-not-acquired").Error()
+
// catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked
func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) {
r.streams[connName] = stream
@@ -78,6 +82,10 @@
activeStream := false
for {
err = stream.RecvMsg(&frame)
+ // if this is an inactive responder, ignore everything it sends
+ if err != nil && err.Error() == transactionNotAcquiredErrorString {
+ break
+ }
// the first thread to reach this point (first to receive a response frame) will become the active stream
r.activeResponseStreamOnce.Do(func() { activeStream = true })
if err != nil {
@@ -123,6 +131,11 @@
delete(r.streams, connName)
streamsLeft := len(r.streams)
+ // handle the case where no cores are the active responder. Should never happen, but just in case...
+ if streamsLeft == 0 {
+ r.activeResponseStreamOnce.Do(func() { activeStream = true })
+ }
+
// if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests)
if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) {
// request is complete, cleanup