- Updated all OFAgent RPCs destined toward a Voltha core to contain
a metadata field that specifies which Voltha core grouping should
service the request.
- Added transaction handler code to all Voltha core APIs
that modify the model.
Change-Id: I8dafc95f0a1b33d99409d73ee00d8294f09a2782
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index fa6d0ca..0cb9a14 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -32,6 +32,9 @@
"time"
)
+//TODO: Move this Tag into the proto file
+const OF_CONTROLLER_TAG= "voltha_backend_name"
+
const MAX_RESPONSE_TIME = 500 // milliseconds
type APIHandler struct {
@@ -81,6 +84,29 @@
return txn, nil
}
+// isOFControllerRequest is a helper function to determine if a request was initiated
+// from the OpenFlow controller (or its proxy, e.g. OFAgent)
+func isOFControllerRequest(ctx context.Context) bool {
+ var (
+ ok bool
+ md metadata.MD
+ value []string
+ )
+ if md, ok = metadata.FromIncomingContext(ctx); !ok {
+ // No metadata
+ return false
+ }
+ if value, ok = md[OF_CONTROLLER_TAG]; !ok {
+ // No OFAgent field in metadata
+ return false
+ }
+ if value[0] == "" {
+ // OFAgent has not set a field value
+ return false
+ }
+ return true
+}
+
// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
// response is expected in a successful scenario
func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
@@ -114,6 +140,17 @@
out := new(empty.Empty)
return out, nil
}
+
+ if isOFControllerRequest(ctx) {
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return new(empty.Empty), err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return new(empty.Empty), errors.New("failed-to-seize-request")
+ }
+ }
ch := make(chan interface{})
defer close(ch)
go handler.logicalDeviceMgr.enableLogicalPort(ctx, id, ch)
@@ -126,6 +163,17 @@
out := new(empty.Empty)
return out, nil
}
+
+ if isOFControllerRequest(ctx) {
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return new(empty.Empty), err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return new(empty.Empty), errors.New("failed-to-seize-request")
+ }
+ }
ch := make(chan interface{})
defer close(ch)
go handler.logicalDeviceMgr.disableLogicalPort(ctx, id, ch)
@@ -138,6 +186,17 @@
out := new(empty.Empty)
return out, nil
}
+
+ if isOFControllerRequest(ctx) {
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return new(empty.Empty), err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return new(empty.Empty), errors.New("failed-to-seize-request")
+ }
+ }
ch := make(chan interface{})
defer close(ch)
go handler.logicalDeviceMgr.updateFlowTable(ctx, flow.Id, flow.FlowMod, ch)
@@ -150,6 +209,17 @@
out := new(empty.Empty)
return out, nil
}
+
+ if isOFControllerRequest(ctx) {
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return new(empty.Empty), err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return new(empty.Empty), errors.New("failed-to-seize-request")
+ }
+ }
ch := make(chan interface{})
defer close(ch)
go handler.logicalDeviceMgr.updateGroupTable(ctx, flow.Id, flow.GroupMod, ch)
@@ -185,6 +255,17 @@
out := new(empty.Empty)
return out, nil
}
+
+ if isOFControllerRequest(ctx) {
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return new(empty.Empty), err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return new(empty.Empty), errors.New("failed-to-seize-request")
+ }
+ }
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.ReconcileDevices(ctx, ids, ch)
@@ -216,15 +297,16 @@
return &voltha.Device{Id: device.Id}, nil
}
- //txn, err := handler.createKvTransaction(ctx)
- //if txn == nil {
- // return &voltha.Device{}, err
- //} else if txn.Acquired(MAX_RESPONSE_TIME) {
- // defer txn.Close() // Ensure active core signals "done" to standby
- //} else {
- // return &voltha.Device{}, nil
- //}
-
+ if isOFControllerRequest(ctx) {
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return &voltha.Device{}, err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return &voltha.Device{}, errors.New("failed-to-seize-request")
+ }
+ }
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.createDevice(ctx, device, ch)
@@ -254,15 +336,16 @@
return new(empty.Empty), nil
}
- //txn, err := handler.createKvTransaction(ctx)
- //if txn == nil {
- // return new(empty.Empty), err
- //} else if txn.Acquired(MAX_RESPONSE_TIME) {
- // defer txn.Close() // Ensure active core signals "done" to standby
- //} else {
- // return new(empty.Empty), nil
- //}
-
+ if isOFControllerRequest(ctx) {
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return new(empty.Empty), err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return new(empty.Empty), errors.New("failed-to-seize-request")
+ }
+ }
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.enableDevice(ctx, id, ch)
@@ -275,6 +358,17 @@
if isTestMode(ctx) {
return new(empty.Empty), nil
}
+
+ if isOFControllerRequest(ctx) {
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return new(empty.Empty), err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return new(empty.Empty), errors.New("failed-to-seize-request")
+ }
+ }
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.disableDevice(ctx, id, ch)
@@ -287,6 +381,17 @@
if isTestMode(ctx) {
return new(empty.Empty), nil
}
+
+ if isOFControllerRequest(ctx) {
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return new(empty.Empty), err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return new(empty.Empty), errors.New("failed-to-seize-request")
+ }
+ }
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.rebootDevice(ctx, id, ch)
@@ -299,6 +404,17 @@
if isTestMode(ctx) {
return new(empty.Empty), nil
}
+
+ if isOFControllerRequest(ctx) {
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return new(empty.Empty), err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ defer txn.Close() // Ensure active core signals "done" to standby
+ } else {
+ return new(empty.Empty), errors.New("failed-to-seize-request")
+ }
+ }
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.deleteDevice(ctx, id, ch)
@@ -389,8 +505,8 @@
func (handler *APIHandler) forwardPacketOut(packet *openflow_13.PacketOut) {
log.Debugw("forwardPacketOut-request", log.Fields{"packet": packet})
- //agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
- //agent.packetOut(packet.PacketOut)
+ agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
+ agent.packetOut(packet.PacketOut)
}
func (handler *APIHandler) StreamPacketsOut(
packets voltha.VolthaService_StreamPacketsOutServer,