Add transaction key for ofagent request.

Since the afrouter (api-server) binds an ofagent to two rw_core
forming a core-pair and transparently forward requests/responses,
then the ofagent needs to include a transaction key-value when
sending a request to the rw_core.  This will allow the rw_cores in
the pair to compete for the transaction with the winning one
fulfilling the requests while the other Core monitoring the
transaction in case the winning core fails to process the
transaction.

Change-Id: I231ac3c027d40a475f0c395fc8123e9b54fd35d0
diff --git a/python/ofagent/grpc_client.py b/python/ofagent/grpc_client.py
index 096d56e..508ce5c 100755
--- a/python/ofagent/grpc_client.py
+++ b/python/ofagent/grpc_client.py
@@ -19,6 +19,7 @@
 """
 from Queue import Queue, Empty
 import os
+import uuid
 
 from grpc import StatusCode
 from grpc._channel import _Rendezvous
@@ -39,12 +40,12 @@
 
 class GrpcClient(object):
 
-    def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key):
+    def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key, core_transaction_key):
 
         self.connection_manager = connection_manager
         self.channel = channel
         self.grpc_timeout = grpc_timeout
-        self.local_stub = VolthaServiceStub(channel)
+        self.grpc_stub = VolthaServiceStub(channel)
 
         # This is the rw-core cluster to which an OFAgent is bound.
         # It is the affinity router that forwards all OFAgent
@@ -52,6 +53,15 @@
         self.core_group_id = ''
         self.core_group_id_key = core_binding_key
 
+        # Since the api-router binds an OFAgent to two RW Cores in a pair and
+        # transparently forward requests between the two then the onus is on
+        # the OFAgent to fulfill part of the function of the api-server which
+        # involves sending a transaction key to both RW Cores for the latter
+        # to figure out which Core will handle the transaction. To prevent
+        # collision between the api-server ID and the one from OFAgent then the
+        # OFAgent ID will be prefixed with "O-".
+        self.core_transaction_key = core_transaction_key
+
         self.stopped = False
 
         self.packet_out_queue = Queue()  # queue to send out PacketOut msgs
@@ -60,7 +70,8 @@
 
     def start(self):
         log.debug('starting', grpc_timeout=self.grpc_timeout,
-                  core_binding_key=self.core_group_id_key)
+                  core_binding_key=self.core_group_id_key,
+                  core_transaction_key=self.core_transaction_key)
         self.start_packet_out_stream()
         self.start_packet_in_stream()
         self.start_change_event_in_stream()
@@ -74,6 +85,9 @@
         self.stopped = True
         log.info('stopped')
 
+    def get_core_transaction_metadata(self):
+        return (self.core_transaction_key, "O-" + uuid.uuid4().hex)
+
     def start_packet_out_stream(self):
 
         def packet_generator():
@@ -89,8 +103,9 @@
         def stream_packets_out():
             generator = packet_generator()
             try:
-                self.local_stub.StreamPacketsOut(generator,
-                                                 metadata=((self.core_group_id_key, self.core_group_id), ))
+                self.grpc_stub.StreamPacketsOut(generator,
+                                                metadata=((self.core_group_id_key, self.core_group_id),
+                                                           self.get_core_transaction_metadata(),))
             except _Rendezvous, e:
                 log.error('grpc-exception', status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
@@ -101,9 +116,10 @@
     def start_packet_in_stream(self):
 
         def receive_packet_in_stream():
-            streaming_rpc_method = self.local_stub.ReceivePacketsIn
+            streaming_rpc_method = self.grpc_stub.ReceivePacketsIn
             iterator = streaming_rpc_method(empty_pb2.Empty(),
-                                            metadata=((self.core_group_id_key, self.core_group_id),))
+                                            metadata=((self.core_group_id_key, self.core_group_id),
+                                                      self.get_core_transaction_metadata(),))
             try:
                 for packet_in in iterator:
                     reactor.callFromThread(self.packet_in_queue.put,
@@ -121,9 +137,10 @@
     def start_change_event_in_stream(self):
 
         def receive_change_events():
-            streaming_rpc_method = self.local_stub.ReceiveChangeEvents
+            streaming_rpc_method = self.grpc_stub.ReceiveChangeEvents
             iterator = streaming_rpc_method(empty_pb2.Empty(),
-                                            metadata=((self.core_group_id_key, self.core_group_id),))
+                                            metadata=((self.core_group_id_key, self.core_group_id),
+                                                      self.get_core_transaction_metadata(),))
             try:
                 for event in iterator:
                     reactor.callFromThread(self.change_event_queue.put, event)
@@ -167,16 +184,18 @@
     def get_port(self, device_id, port_id):
         req = LogicalPortId(id=device_id, port_id=port_id)
         res = yield threads.deferToThread(
-            self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
     def get_port_list(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res.items)
 
     @inlineCallbacks
@@ -186,8 +205,9 @@
             port_id=port_id
         )
         res = yield threads.deferToThread(
-            self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
@@ -197,16 +217,18 @@
             port_id=port_id
         )
         res = yield threads.deferToThread(
-            self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
     def get_device_info(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
@@ -216,8 +238,9 @@
             flow_mod=flow_mod
         )
         res = yield threads.deferToThread(
-            self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
@@ -227,46 +250,52 @@
             group_mod=group_mod
         )
         res = yield threads.deferToThread(
-            self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
     def list_flows(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res.items)
 
     @inlineCallbacks
     def list_groups(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res.items)
 
     @inlineCallbacks
     def list_ports(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res.items)
 
     @inlineCallbacks
     def list_logical_devices(self):
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id), ))
+            self.grpc_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res.items)
 
     @inlineCallbacks
     def subscribe(self, subscriber):
         res, call = yield threads.deferToThread(
-            self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id), ))
+            self.grpc_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returned_metadata = call.initial_metadata()
 
         # Update the core_group_id if present in the returned metadata