- Updated all OFAgent RPCs destined toward a Voltha core to contain
  a metadata field that specifies which Voltha core grouping should
  service the request.
- Added transaction handler code to all Voltha core APIs
  that modify the model.

Change-Id: I8dafc95f0a1b33d99409d73ee00d8294f09a2782
diff --git a/python/ofagent/grpc_client.py b/python/ofagent/grpc_client.py
index 5a05696..b58612f 100755
--- a/python/ofagent/grpc_client.py
+++ b/python/ofagent/grpc_client.py
@@ -45,6 +45,12 @@
         self.grpc_timeout = grpc_timeout
         self.local_stub = VolthaServiceStub(channel)
 
+        # This is the vcore group to which an OFAgent is bound.
+        # It is the affinity router that forwards all OFAgent
+        # requests to the primary vcore in the group.
+        self.core_group_id = ''
+        self.CORE_GROUP_ID = 'voltha_backend_name'
+
         self.stopped = False
 
         self.packet_out_queue = Queue()  # queue to send out PacketOut msgs
@@ -81,7 +87,8 @@
         def stream_packets_out():
             generator = packet_generator()
             try:
-                self.local_stub.StreamPacketsOut(generator)
+                self.local_stub.StreamPacketsOut(generator,
+                                                 metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
             except _Rendezvous, e:
                 log.error('grpc-exception', status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
@@ -156,14 +163,16 @@
     def get_port(self, device_id, port_id):
         req = LogicalPortId(id=device_id, port_id=port_id)
         res = yield threads.deferToThread(
-            self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout)
+            self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
     def get_port_list(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
+            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
@@ -173,7 +182,8 @@
             port_id=port_id
         )
         res = yield threads.deferToThread(
-            self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout)
+            self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -183,14 +193,16 @@
             port_id=port_id
         )
         res = yield threads.deferToThread(
-            self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout)
+            self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
     def get_device_info(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout)
+            self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -200,7 +212,8 @@
             flow_mod=flow_mod
         )
         res = yield threads.deferToThread(
-            self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout)
+            self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -210,38 +223,55 @@
             group_mod=group_mod
         )
         res = yield threads.deferToThread(
-            self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout)
+            self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
     def list_flows(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout)
+            self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
     def list_groups(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout)
+            self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
     def list_ports(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
+            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
     def list_logical_devices(self):
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout)
+            self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
         returnValue(res.items)
 
     @inlineCallbacks
     def subscribe(self, subscriber):
-        res = yield threads.deferToThread(
-            self.local_stub.Subscribe, subscriber, timeout=self.grpc_timeout)
+        res, call = yield threads.deferToThread(
+            self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
+            metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+        returned_metadata = call.initial_metadata()
+
+        # Update the core_group_id if present in the returned metadata
+        if returned_metadata is None:
+            log.debug('header-metadata-missing')
+        else:
+            log.debug('metadata-returned', metadata=returned_metadata)
+            for pair in returned_metadata:
+                if pair[0] == self.CORE_GROUP_ID:
+                    self.core_group_id = pair[1]
+                    log.debug('received-core-group-id', vcore_group=self.core_group_id)
         returnValue(res)
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index fa6d0ca..0cb9a14 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -32,6 +32,9 @@
 	"time"
 )
 
+//TODO:  Move this Tag into the proto file
+const OF_CONTROLLER_TAG= "voltha_backend_name"
+
 const MAX_RESPONSE_TIME = 500 // milliseconds
 
 type APIHandler struct {
@@ -81,6 +84,29 @@
 	return txn, nil
 }
 
+// isOFControllerRequest is a helper function to determine if a request was initiated
+// from the OpenFlow controller (or its proxy, e.g. OFAgent)
+func isOFControllerRequest(ctx context.Context) bool {
+	var (
+		ok    bool
+		md    metadata.MD
+		value []string
+	)
+	if md, ok = metadata.FromIncomingContext(ctx); !ok {
+		// No metadata
+		return false
+	}
+	if value, ok = md[OF_CONTROLLER_TAG]; !ok {
+		// No OFAgent field in metadata
+		return false
+	}
+	if value[0] == "" {
+		// OFAgent has not set a field value
+		return false
+	}
+	return true
+}
+
 // waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
 // response is expected in a successful scenario
 func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
@@ -114,6 +140,17 @@
 		out := new(empty.Empty)
 		return out, nil
 	}
+
+	if isOFControllerRequest(ctx) {
+		txn, err := handler.createKvTransaction(ctx)
+		if txn == nil {
+			return new(empty.Empty), err
+		} else if txn.Acquired(MAX_RESPONSE_TIME) {
+			defer txn.Close() // Ensure active core signals "done" to standby
+		} else {
+			return new(empty.Empty), errors.New("failed-to-seize-request")
+		}
+	}
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.logicalDeviceMgr.enableLogicalPort(ctx, id, ch)
@@ -126,6 +163,17 @@
 		out := new(empty.Empty)
 		return out, nil
 	}
+
+	if isOFControllerRequest(ctx) {
+		txn, err := handler.createKvTransaction(ctx)
+		if txn == nil {
+			return new(empty.Empty), err
+		} else if txn.Acquired(MAX_RESPONSE_TIME) {
+			defer txn.Close() // Ensure active core signals "done" to standby
+		} else {
+			return new(empty.Empty), errors.New("failed-to-seize-request")
+		}
+	}
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.logicalDeviceMgr.disableLogicalPort(ctx, id, ch)
@@ -138,6 +186,17 @@
 		out := new(empty.Empty)
 		return out, nil
 	}
+
+	if isOFControllerRequest(ctx) {
+		txn, err := handler.createKvTransaction(ctx)
+		if txn == nil {
+			return new(empty.Empty), err
+		} else if txn.Acquired(MAX_RESPONSE_TIME) {
+			defer txn.Close() // Ensure active core signals "done" to standby
+		} else {
+			return new(empty.Empty), errors.New("failed-to-seize-request")
+		}
+	}
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.logicalDeviceMgr.updateFlowTable(ctx, flow.Id, flow.FlowMod, ch)
@@ -150,6 +209,17 @@
 		out := new(empty.Empty)
 		return out, nil
 	}
+
+	if isOFControllerRequest(ctx) {
+		txn, err := handler.createKvTransaction(ctx)
+		if txn == nil {
+			return new(empty.Empty), err
+		} else if txn.Acquired(MAX_RESPONSE_TIME) {
+			defer txn.Close() // Ensure active core signals "done" to standby
+		} else {
+			return new(empty.Empty), errors.New("failed-to-seize-request")
+		}
+	}
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.logicalDeviceMgr.updateGroupTable(ctx, flow.Id, flow.GroupMod, ch)
@@ -185,6 +255,17 @@
 		out := new(empty.Empty)
 		return out, nil
 	}
+
+	if isOFControllerRequest(ctx) {
+		txn, err := handler.createKvTransaction(ctx)
+		if txn == nil {
+			return new(empty.Empty), err
+		} else if txn.Acquired(MAX_RESPONSE_TIME) {
+			defer txn.Close() // Ensure active core signals "done" to standby
+		} else {
+			return new(empty.Empty), errors.New("failed-to-seize-request")
+		}
+	}
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.ReconcileDevices(ctx, ids, ch)
@@ -216,15 +297,16 @@
 		return &voltha.Device{Id: device.Id}, nil
 	}
 
-	//txn, err := handler.createKvTransaction(ctx)
-	//if txn == nil {
-	//	return &voltha.Device{}, err
-	//} else if txn.Acquired(MAX_RESPONSE_TIME) {
-	//	defer txn.Close()   // Ensure active core signals "done" to standby
-	//} else {
-	//	return &voltha.Device{}, nil
-	//}
-
+	if isOFControllerRequest(ctx) {
+		txn, err := handler.createKvTransaction(ctx)
+		if txn == nil {
+			return &voltha.Device{}, err
+		} else if txn.Acquired(MAX_RESPONSE_TIME) {
+			defer txn.Close() // Ensure active core signals "done" to standby
+		} else {
+			return &voltha.Device{}, errors.New("failed-to-seize-request")
+		}
+	}
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.createDevice(ctx, device, ch)
@@ -254,15 +336,16 @@
 		return new(empty.Empty), nil
 	}
 
-	//txn, err := handler.createKvTransaction(ctx)
-	//if txn == nil {
-	//	return new(empty.Empty), err
-	//} else if txn.Acquired(MAX_RESPONSE_TIME) {
-	//	defer txn.Close()   // Ensure active core signals "done" to standby
-	//} else {
-	//	return new(empty.Empty), nil
-	//}
-
+	if isOFControllerRequest(ctx) {
+		txn, err := handler.createKvTransaction(ctx)
+		if txn == nil {
+			return new(empty.Empty), err
+		} else if txn.Acquired(MAX_RESPONSE_TIME) {
+			defer txn.Close() // Ensure active core signals "done" to standby
+		} else {
+			return new(empty.Empty), errors.New("failed-to-seize-request")
+		}
+	}
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.enableDevice(ctx, id, ch)
@@ -275,6 +358,17 @@
 	if isTestMode(ctx) {
 		return new(empty.Empty), nil
 	}
+
+	if isOFControllerRequest(ctx) {
+		txn, err := handler.createKvTransaction(ctx)
+		if txn == nil {
+			return new(empty.Empty), err
+		} else if txn.Acquired(MAX_RESPONSE_TIME) {
+			defer txn.Close() // Ensure active core signals "done" to standby
+		} else {
+			return new(empty.Empty), errors.New("failed-to-seize-request")
+		}
+	}
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.disableDevice(ctx, id, ch)
@@ -287,6 +381,17 @@
 	if isTestMode(ctx) {
 		return new(empty.Empty), nil
 	}
+
+	if isOFControllerRequest(ctx) {
+		txn, err := handler.createKvTransaction(ctx)
+		if txn == nil {
+			return new(empty.Empty), err
+		} else if txn.Acquired(MAX_RESPONSE_TIME) {
+			defer txn.Close() // Ensure active core signals "done" to standby
+		} else {
+			return new(empty.Empty), errors.New("failed-to-seize-request")
+		}
+	}
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.rebootDevice(ctx, id, ch)
@@ -299,6 +404,17 @@
 	if isTestMode(ctx) {
 		return new(empty.Empty), nil
 	}
+
+	if isOFControllerRequest(ctx) {
+		txn, err := handler.createKvTransaction(ctx)
+		if txn == nil {
+			return new(empty.Empty), err
+		} else if txn.Acquired(MAX_RESPONSE_TIME) {
+			defer txn.Close() // Ensure active core signals "done" to standby
+		} else {
+			return new(empty.Empty), errors.New("failed-to-seize-request")
+		}
+	}
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.deleteDevice(ctx, id, ch)
@@ -389,8 +505,8 @@
 
 func (handler *APIHandler) forwardPacketOut(packet *openflow_13.PacketOut) {
 	log.Debugw("forwardPacketOut-request", log.Fields{"packet": packet})
-	//agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
-	//agent.packetOut(packet.PacketOut)
+	agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
+	agent.packetOut(packet.PacketOut)
 }
 func (handler *APIHandler) StreamPacketsOut(
 	packets voltha.VolthaService_StreamPacketsOutServer,