Add transaction key for ofagent request.

Since the afrouter (api-server) binds an ofagent to two rw_core
forming a core-pair and transparently forward requests/responses,
then the ofagent needs to include a transaction key-value when
sending a request to the rw_core.  This will allow the rw_cores in
the pair to compete for the transaction with the winning one
fulfilling the requests while the other Core monitoring the
transaction in case the winning core fails to process the
transaction.

Change-Id: I231ac3c027d40a475f0c395fc8123e9b54fd35d0
diff --git a/python/ofagent/connection_mgr.py b/python/ofagent/connection_mgr.py
index 11a1334..7e17613 100755
--- a/python/ofagent/connection_mgr.py
+++ b/python/ofagent/connection_mgr.py
@@ -39,7 +39,7 @@
 class ConnectionManager(object):
     def __init__(self, consul_endpoint,
                  vcore_endpoint, vcore_grpc_timeout, vcore_binding_key,
-                 controller_endpoints, instance_id,
+                 vcore_transaction_key, controller_endpoints, instance_id,
                  enable_tls=False, key_file=None, cert_file=None,
                  vcore_retry_interval=0.5, devices_refresh_interval=5,
                  subscription_refresh_interval=5):
@@ -51,6 +51,7 @@
         self.vcore_endpoint = vcore_endpoint
         self.grpc_timeout = vcore_grpc_timeout
         self.core_binding_key = vcore_binding_key
+        self.core_transaction_key = vcore_transaction_key
         self.instance_id = instance_id
         self.enable_tls = enable_tls
         self.key_file = key_file
@@ -162,7 +163,7 @@
                 container_name = self.instance_id
                 if self.grpc_client is None:
                     self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout,
-                                                  self.core_binding_key)
+                                                  self.core_binding_key, self.core_transaction_key)
                 subscription = yield self.grpc_client.subscribe(
                     OfAgentSubscriber(ofagent_id=container_name))
 
diff --git a/python/ofagent/grpc_client.py b/python/ofagent/grpc_client.py
index 096d56e..508ce5c 100755
--- a/python/ofagent/grpc_client.py
+++ b/python/ofagent/grpc_client.py
@@ -19,6 +19,7 @@
 """
 from Queue import Queue, Empty
 import os
+import uuid
 
 from grpc import StatusCode
 from grpc._channel import _Rendezvous
@@ -39,12 +40,12 @@
 
 class GrpcClient(object):
 
-    def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key):
+    def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key, core_transaction_key):
 
         self.connection_manager = connection_manager
         self.channel = channel
         self.grpc_timeout = grpc_timeout
-        self.local_stub = VolthaServiceStub(channel)
+        self.grpc_stub = VolthaServiceStub(channel)
 
         # This is the rw-core cluster to which an OFAgent is bound.
         # It is the affinity router that forwards all OFAgent
@@ -52,6 +53,15 @@
         self.core_group_id = ''
         self.core_group_id_key = core_binding_key
 
+        # Since the api-router binds an OFAgent to two RW Cores in a pair and
+        # transparently forward requests between the two then the onus is on
+        # the OFAgent to fulfill part of the function of the api-server which
+        # involves sending a transaction key to both RW Cores for the latter
+        # to figure out which Core will handle the transaction. To prevent
+        # collision between the api-server ID and the one from OFAgent then the
+        # OFAgent ID will be prefixed with "O-".
+        self.core_transaction_key = core_transaction_key
+
         self.stopped = False
 
         self.packet_out_queue = Queue()  # queue to send out PacketOut msgs
@@ -60,7 +70,8 @@
 
     def start(self):
         log.debug('starting', grpc_timeout=self.grpc_timeout,
-                  core_binding_key=self.core_group_id_key)
+                  core_binding_key=self.core_group_id_key,
+                  core_transaction_key=self.core_transaction_key)
         self.start_packet_out_stream()
         self.start_packet_in_stream()
         self.start_change_event_in_stream()
@@ -74,6 +85,9 @@
         self.stopped = True
         log.info('stopped')
 
+    def get_core_transaction_metadata(self):
+        return (self.core_transaction_key, "O-" + uuid.uuid4().hex)
+
     def start_packet_out_stream(self):
 
         def packet_generator():
@@ -89,8 +103,9 @@
         def stream_packets_out():
             generator = packet_generator()
             try:
-                self.local_stub.StreamPacketsOut(generator,
-                                                 metadata=((self.core_group_id_key, self.core_group_id), ))
+                self.grpc_stub.StreamPacketsOut(generator,
+                                                metadata=((self.core_group_id_key, self.core_group_id),
+                                                           self.get_core_transaction_metadata(),))
             except _Rendezvous, e:
                 log.error('grpc-exception', status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
@@ -101,9 +116,10 @@
     def start_packet_in_stream(self):
 
         def receive_packet_in_stream():
-            streaming_rpc_method = self.local_stub.ReceivePacketsIn
+            streaming_rpc_method = self.grpc_stub.ReceivePacketsIn
             iterator = streaming_rpc_method(empty_pb2.Empty(),
-                                            metadata=((self.core_group_id_key, self.core_group_id),))
+                                            metadata=((self.core_group_id_key, self.core_group_id),
+                                                      self.get_core_transaction_metadata(),))
             try:
                 for packet_in in iterator:
                     reactor.callFromThread(self.packet_in_queue.put,
@@ -121,9 +137,10 @@
     def start_change_event_in_stream(self):
 
         def receive_change_events():
-            streaming_rpc_method = self.local_stub.ReceiveChangeEvents
+            streaming_rpc_method = self.grpc_stub.ReceiveChangeEvents
             iterator = streaming_rpc_method(empty_pb2.Empty(),
-                                            metadata=((self.core_group_id_key, self.core_group_id),))
+                                            metadata=((self.core_group_id_key, self.core_group_id),
+                                                      self.get_core_transaction_metadata(),))
             try:
                 for event in iterator:
                     reactor.callFromThread(self.change_event_queue.put, event)
@@ -167,16 +184,18 @@
     def get_port(self, device_id, port_id):
         req = LogicalPortId(id=device_id, port_id=port_id)
         res = yield threads.deferToThread(
-            self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
     def get_port_list(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res.items)
 
     @inlineCallbacks
@@ -186,8 +205,9 @@
             port_id=port_id
         )
         res = yield threads.deferToThread(
-            self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
@@ -197,16 +217,18 @@
             port_id=port_id
         )
         res = yield threads.deferToThread(
-            self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
     def get_device_info(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
@@ -216,8 +238,9 @@
             flow_mod=flow_mod
         )
         res = yield threads.deferToThread(
-            self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
@@ -227,46 +250,52 @@
             group_mod=group_mod
         )
         res = yield threads.deferToThread(
-            self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res)
 
     @inlineCallbacks
     def list_flows(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res.items)
 
     @inlineCallbacks
     def list_groups(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res.items)
 
     @inlineCallbacks
     def list_ports(self, device_id):
         req = ID(id=device_id)
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id),))
+            self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res.items)
 
     @inlineCallbacks
     def list_logical_devices(self):
         res = yield threads.deferToThread(
-            self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id), ))
+            self.grpc_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returnValue(res.items)
 
     @inlineCallbacks
     def subscribe(self, subscriber):
         res, call = yield threads.deferToThread(
-            self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
-            metadata=((self.core_group_id_key, self.core_group_id), ))
+            self.grpc_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
+            metadata=((self.core_group_id_key, self.core_group_id),
+                      self.get_core_transaction_metadata(),))
         returned_metadata = call.initial_metadata()
 
         # Update the core_group_id if present in the returned metadata
diff --git a/python/ofagent/main.py b/python/ofagent/main.py
index e18e480..fa2b5c0 100755
--- a/python/ofagent/main.py
+++ b/python/ofagent/main.py
@@ -35,6 +35,7 @@
     grpc_endpoint=os.environ.get('GRPC_ENDPOINT', 'localhost:50055'),
     grpc_timeout=os.environ.get('GRPC_TIMEOUT', '10'),
     core_binding_key=os.environ.get('CORE_BINDING_KEY', 'voltha_backend_name'),
+    core_transaction_key=os.environ.get('CORE_TRANSACTION_KEY', 'voltha_serial_number'),
     instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
     internal_host_address=os.environ.get('INTERNAL_HOST_ADDRESS',
                                          get_my_primary_local_ipv4()),
@@ -106,6 +107,15 @@
                         default=defs['core_binding_key'],
                         help=_help)
 
+    _help = ('The name of the meta-key whose value is the transaction ID '
+             'used by the OFAgent to send requests to the Voltha RW Core. '
+             '(default: %s)' % defs['core_transaction_key'])
+    parser.add_argument('-ctk', '--core_transaction_key',
+                        dest='core_transaction_key',
+                        action='store',
+                        default=defs['core_transaction_key'],
+                        help=_help)
+
     _help = ('<hostname> or <ip> at which ofagent is reachable from inside '
              'the cluster (default: %s)' % defs['internal_host_address'])
     parser.add_argument('-H', '--internal-host-address',
@@ -246,9 +256,16 @@
         self.log.info('starting-internal-components')
         args = self.args
         self.connection_manager = yield ConnectionManager(
-            args.consul, args.grpc_endpoint, self.grpc_timeout,
-            args.core_binding_key, args.controller, args.instance_id,
-            args.enable_tls, args.key_file, args.cert_file).start()
+            args.consul,
+            args.grpc_endpoint,
+            self.grpc_timeout,
+            args.core_binding_key,
+            args.core_transaction_key,
+            args.controller,
+            args.instance_id,
+            args.enable_tls,
+            args.key_file,
+            args.cert_file).start()
         self.log.info('started-internal-services')
 
     @inlineCallbacks
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 1ad0766..8f7b328 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -121,7 +121,24 @@
 	return handler.coreInCompetingMode
 }
 
-// This function handles the creation of new devices
+// acquireRequestForList handles transaction processing for list requests, i.e. when there are no specific id requested.
+func (handler *APIHandler) acquireRequestForList(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+	timeout := handler.defaultRequestTimeout
+	if len(maxTimeout) > 0 {
+		timeout = maxTimeout[0]
+	}
+	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+	txn, err := handler.createKvTransaction(ctx)
+	if txn == nil {
+		return nil, err
+	} else if txn.Acquired(timeout) {
+		return txn, nil
+	} else {
+		return nil, errors.New("failed-to-seize-request")
+	}
+}
+
+// acquireRequest handles transaction processing for creation of new devices
 func (handler *APIHandler) acquireRequest(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := handler.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
@@ -237,6 +254,19 @@
 	return &voltha.Membership{}, nil
 }
 
+func (handler *APIHandler) GetLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+	log.Debugw("GetLogicalDevicePort-request", log.Fields{"id": *id})
+
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &voltha.LogicalPort{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
+	return handler.logicalDeviceMgr.getLogicalPort(id)
+}
+
 func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
 	log.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
 	if isTestMode(ctx) {
@@ -286,17 +316,11 @@
 		return out, nil
 	}
 
-	// TODO: Update this logic when the OF Controller (OFAgent in this case) is able to send a transaction Id in its
-	// request (the api-router binds the OfAgent to two Cores in a pair and let the traffic flows transparently)
 	if handler.competeForTransaction() {
-		if !handler.isOFControllerRequest(ctx) {
-			if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
-				return new(empty.Empty), err
-			} else {
-				defer txn.Close()
-			}
-		} else if !handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: flow.Id}) {
-			return new(empty.Empty), nil
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
+			return new(empty.Empty), err
+		} else {
+			defer txn.Close()
 		}
 	}
 
@@ -313,17 +337,11 @@
 		return out, nil
 	}
 
-	// TODO: Update this logic when the OF Controller (OFAgent in this case) is able to send a transaction Id in its
-	// request (the api-router binds the OfAgent to two Cores in a pair and let the traffic flows transparently)
 	if handler.competeForTransaction() {
-		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
-				return new(empty.Empty), err
-			} else {
-				defer txn.Close()
-			}
-		} else if !handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: flow.Id}) {
-			return new(empty.Empty), nil
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
+			return new(empty.Empty), err
+		} else {
+			defer txn.Close()
 		}
 	}
 
@@ -371,15 +389,32 @@
 	return waitForNilResponseOnSuccess(ctx, ch)
 }
 
-// GetLogicalDevice must be implemented in the read-only containers - should it also be implemented here?
 func (handler *APIHandler) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
 	log.Debugw("GetLogicalDevice-request", log.Fields{"id": id})
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &voltha.LogicalDevice{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
 	return handler.logicalDeviceMgr.getLogicalDevice(id.Id)
 }
 
-// ListLogicalDevices must be implemented in the read-only containers - should it also be implemented here?
 func (handler *APIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
-	log.Debug("ListLogicalDevices")
+	log.Debug("ListLogicalDevices-request")
+	if handler.competeForTransaction() {
+		if txn, err := handler.acquireRequestForList(ctx); err != nil {
+			return &voltha.LogicalDevices{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
+	if handler.isOFControllerRequest(ctx) {
+		//	Since an OF controller is only interested in the set of logical devices managed by thgis Core then return
+		//	only logical devices managed/monitored by this Core.
+		return handler.logicalDeviceMgr.listManagedLogicalDevices()
+	}
 	return handler.logicalDeviceMgr.listLogicalDevices()
 }
 
@@ -391,17 +426,37 @@
 
 func (handler *APIHandler) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
 	log.Debugw("ListLogicalDeviceFlows", log.Fields{"id": *id})
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &openflow_13.Flows{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
 	return handler.logicalDeviceMgr.ListLogicalDeviceFlows(ctx, id.Id)
 }
 
 func (handler *APIHandler) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
 	log.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"id": *id})
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &openflow_13.FlowGroups{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
 	return handler.logicalDeviceMgr.ListLogicalDeviceFlowGroups(ctx, id.Id)
 }
 
-// ListLogicalDevicePorts must be implemented in the read-only containers - should it also be implemented here?
 func (handler *APIHandler) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
 	log.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &voltha.LogicalPorts{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
 	return handler.logicalDeviceMgr.ListLogicalDevicePorts(ctx, id.Id)
 }
 
@@ -413,7 +468,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireRequest(ctx, nil); err != nil {
+		if txn, err := handler.acquireRequest(ctx, &utils.DeviceID{Id: device.Id}); err != nil {
 			return &voltha.Device{}, err
 		} else {
 			defer txn.Close()
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 6dfcb86..96e3541 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -134,6 +134,19 @@
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
 
+func (ldMgr *LogicalDeviceManager) listManagedLogicalDevices() (*voltha.LogicalDevices, error) {
+	log.Debug("listManagedLogicalDevices")
+	result := &voltha.LogicalDevices{}
+	ldMgr.lockLogicalDeviceAgentsMap.RLock()
+	defer ldMgr.lockLogicalDeviceAgentsMap.RUnlock()
+	for _, agent := range ldMgr.logicalDeviceAgents {
+		if ld, _ := agent.GetLogicalDevice(); ld != nil {
+			result.Items = append(result.Items, ld)
+		}
+	}
+	return result, nil
+}
+
 func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
 	log.Debug("ListAllLogicalDevices")
 	result := &voltha.LogicalDevices{}