VOL-1025: Implement a Go language library for affinity proxy request/response handling
- Added examples of how to use the library in Voltha core's gRPC API handler
- Two APIs were modified: CreateDevice and EnableDevice
Change-Id: Ib5f30cd1783ea088399ac019a2eb0e51604bd2f7
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index ed30bb7..215f60f 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -29,6 +29,8 @@
"google.golang.org/grpc/status"
)
+const MAX_RESPONSE_TIME = 500 // milliseconds
+
type APIHandler struct {
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
@@ -48,6 +50,29 @@
return exist
}
+// This function attempts to extract the serial number from the request metadata
+// and create a KV transaction for that serial number for the current core.
+func (handler *APIHandler) createKvTransaction(ctx context.Context) (*KVTransaction, error) {
+ var (
+ err error
+ ok bool
+ md metadata.MD
+ serNum []string
+ )
+ if md, ok = metadata.FromIncomingContext(ctx); !ok {
+ err = errors.New("metadata-not-found")
+ } else if serNum, ok = md["voltha_serial_number"]; !ok {
+ err = errors.New("serial-number-not-found")
+ }
+ if !ok {
+ log.Error(err)
+ return nil, err
+ }
+ // Create KV transaction
+ txn := NewKVTransaction(serNum[0])
+ return txn, nil
+}
+
// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
// response is expected in a successful scenario
func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
@@ -159,6 +184,16 @@
if isTestMode(ctx) {
return &voltha.Device{Id: device.Id}, nil
}
+
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return &voltha.Device{}, err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return &voltha.Device{}, nil
+ }
+
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.createDevice(ctx, device, ch)
@@ -187,6 +222,16 @@
if isTestMode(ctx) {
return new(empty.Empty), nil
}
+
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return new(empty.Empty), err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return new(empty.Empty), nil
+ }
+
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.enableDevice(ctx, id, ch)