[VOL-1913] Replies with errors of the form "transaction-not-acquired" are now ignored.
Also had to add a conditional branch in case no valid responses are received (the last thread to exit must clean up the connection).
The rw cores will now respond with a "transaction-not-acquired" error in place of "failed-to-seize-request" or COMPLETED_BY_OTHER.
Change-Id: I199d4a0091ba4fc1db5b8097adbad951408e5034
diff --git a/afrouter/afrouter/request.go b/afrouter/afrouter/request.go
index 0af20e3..8fc15aa 100644
--- a/afrouter/afrouter/request.go
+++ b/afrouter/afrouter/request.go
@@ -22,6 +22,8 @@
"errors"
"github.com/opencord/voltha-go/common/log"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"io"
"sync"
)
@@ -48,6 +50,8 @@
isStreamingResponse bool
}
+var transactionNotAcquiredErrorString = status.Error(codes.Canceled, "transaction-not-acquired").Error()
+
// catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked
func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) {
r.streams[connName] = stream
@@ -78,6 +82,10 @@
activeStream := false
for {
err = stream.RecvMsg(&frame)
+ // if this is an inactive responder, ignore everything it sends
+ if err != nil && err.Error() == transactionNotAcquiredErrorString {
+ break
+ }
// the first thread to reach this point (first to receive a response frame) will become the active stream
r.activeResponseStreamOnce.Do(func() { activeStream = true })
if err != nil {
@@ -123,6 +131,11 @@
delete(r.streams, connName)
streamsLeft := len(r.streams)
+ // handle the case where no cores are the active responder. Should never happen, but just in case...
+ if streamsLeft == 0 {
+ r.activeResponseStreamOnce.Do(func() { activeStream = true })
+ }
+
// if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests)
if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) {
// request is complete, cleanup
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 2b28fa5..bd84bdd 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -72,7 +72,7 @@
log.Debugw("acquired-request", log.Fields{"xtrnsId": transactionId})
return txn, nil
} else {
- return nil, errors.New("failed-to-seize-request")
+ return nil, errorTransactionNotAcquired
}
}
@@ -93,7 +93,7 @@
log.Debugw("processing-request", log.Fields{"Id": devId})
return txn, nil
} else {
- return nil, errors.New("failed-to-seize-request")
+ return nil, errorTransactionNotAcquired
}
} else {
log.Debugw("not-owned-by-me", log.Fields{"Id": devId})
@@ -101,7 +101,7 @@
log.Debugw("timeout-processing-request", log.Fields{"Id": devId})
return txn, nil
} else {
- return nil, errors.New("device-not-owned")
+ return nil, errorTransactionNotAcquired
}
}
}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 7f45875..987a3f6 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -138,7 +138,7 @@
} else if txn.Acquired(timeout) {
return txn, nil
} else {
- return nil, errors.New("failed-to-seize-request")
+ return nil, errorTransactionNotAcquired
}
}
@@ -166,7 +166,7 @@
log.Debugw("acquired-transaction", log.Fields{"transaction-timeout": timeout})
return txn, nil
} else {
- return nil, errors.New("failed-to-seize-request")
+ return nil, errorTransactionNotAcquired
}
} else {
if txn.Monitor(timeout) {
@@ -174,7 +174,7 @@
return txn, nil
} else {
log.Debugw("transaction-completed-by-other", log.Fields{"timeout": timeout, "waited-time": time.Since(t)})
- return nil, errors.New(string(COMPLETED_BY_OTHER))
+ return nil, errorTransactionNotAcquired
}
}
}
@@ -557,8 +557,8 @@
if handler.competeForTransaction() {
if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
- // Remove the device in memory
- if err.Error() == (errors.New(string(COMPLETED_BY_OTHER)).Error()) {
+ if err == errorTransactionNotAcquired && !handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: id.Id}) {
+ // Remove the device in memory
handler.deviceMgr.stopManagingDevice(id.Id)
}
return new(empty.Empty), err
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 0d30340..b2d01a4 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -37,6 +37,8 @@
import (
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"time"
)
@@ -50,6 +52,8 @@
STOPPED_WAITING_FOR_KEY
)
+var errorTransactionNotAcquired = status.Error(codes.Canceled, "transaction-not-acquired")
+
const (
TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
)