VOL-1399: The value of the metadata field used by OFAgent & Arouter
          should not be hard-coded
- Added grpc-timeout and core-binding-key options to OFAgent run command
- Added core_binding_key option to rw_core run command

Change-Id: Icf5fe226d17a1a5fcd9459a85e41c434fc7ac8b9
diff --git a/python/ofagent/connection_mgr.py b/python/ofagent/connection_mgr.py
index b960b69..fea4910 100755
--- a/python/ofagent/connection_mgr.py
+++ b/python/ofagent/connection_mgr.py
@@ -39,7 +39,8 @@
 # _ = third_party
 
 class ConnectionManager(object):
-    def __init__(self, consul_endpoint, vcore_endpoint, vcore_grpc_timeout,
+    def __init__(self, consul_endpoint,
+                 vcore_endpoint, vcore_grpc_timeout, vcore_binding_key,
                  controller_endpoints, instance_id,
                  enable_tls=False, key_file=None, cert_file=None,
                  vcore_retry_interval=0.5, devices_refresh_interval=5,
@@ -51,6 +52,7 @@
         self.consul_endpoint = consul_endpoint
         self.vcore_endpoint = vcore_endpoint
         self.grpc_timeout = vcore_grpc_timeout
+        self.core_binding_key = vcore_binding_key
         self.instance_id = instance_id
         self.enable_tls = enable_tls
         self.key_file = key_file
@@ -161,7 +163,8 @@
                 # Send subscription request to register the current ofagent instance
                 container_name = self.instance_id
                 if self.grpc_client is None:
-                    self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout)
+                    self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout,
+                                                  self.core_binding_key)
                 subscription = yield self.grpc_client.subscribe(
                     OfAgentSubscriber(ofagent_id=container_name))
 
diff --git a/python/ofagent/grpc_client.py b/python/ofagent/grpc_client.py
index 76b052e..4a6d274 100755
--- a/python/ofagent/grpc_client.py
+++ b/python/ofagent/grpc_client.py
@@ -38,18 +38,18 @@
 
 class GrpcClient(object):
 
-    def __init__(self, connection_manager, channel, grpc_timeout):
+    def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key):
 
         self.connection_manager = connection_manager
         self.channel = channel
         self.grpc_timeout = grpc_timeout
         self.local_stub = VolthaServiceStub(channel)
 
-        # This is the vcore group to which an OFAgent is bound.
+        # This is the rw-core cluster to which an OFAgent is bound.
         # It is the affinity router that forwards all OFAgent
-        # requests to the primary vcore in the group.
+        # requests to a specific rw-core in this back-end cluster.
         self.core_group_id = ''
-        self.CORE_GROUP_ID = 'voltha_backend_name'
+        self.core_group_id_key = core_binding_key
 
         self.stopped = False
 
@@ -58,7 +58,8 @@
         self.change_event_queue = DeferredQueue()  # queue change events
 
     def start(self):
-        log.debug('starting', grpc_timeout=self.grpc_timeout)
+        log.debug('starting', grpc_timeout=self.grpc_timeout,
+                  core_binding_key=self.core_group_id_key)
         self.start_packet_out_stream()
         self.start_packet_in_stream()
         self.start_change_event_in_stream()
@@ -88,7 +89,7 @@
             generator = packet_generator()
             try:
                 self.local_stub.StreamPacketsOut(generator,
-                                                 metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+                                                 metadata=((self.core_group_id_key, self.core_group_id), ))
             except _Rendezvous, e:
                 log.error('grpc-exception', status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
@@ -101,7 +102,7 @@
         def receive_packet_in_stream():
             streaming_rpc_method = self.local_stub.ReceivePacketsIn
             iterator = streaming_rpc_method(empty_pb2.Empty(),
-                                            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+                                            metadata=((self.core_group_id_key, self.core_group_id),))
             try:
                 for packet_in in iterator:
                     reactor.callFromThread(self.packet_in_queue.put,
@@ -121,7 +122,7 @@
         def receive_change_events():
             streaming_rpc_method = self.local_stub.ReceiveChangeEvents
             iterator = streaming_rpc_method(empty_pb2.Empty(),
-                                            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+                                            metadata=((self.core_group_id_key, self.core_group_id),))
             try:
                 for event in iterator:
                     reactor.callFromThread(self.change_event_queue.put, event)
@@ -166,7 +167,7 @@
         req = LogicalPortId(id=device_id, port_id=port_id)
         res = yield threads.deferToThread(
             self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -174,7 +175,7 @@
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
@@ -185,7 +186,7 @@
         )
         res = yield threads.deferToThread(
             self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -196,7 +197,7 @@
         )
         res = yield threads.deferToThread(
             self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -204,7 +205,7 @@
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -215,7 +216,7 @@
         )
         res = yield threads.deferToThread(
             self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -226,7 +227,7 @@
         )
         res = yield threads.deferToThread(
             self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -234,7 +235,7 @@
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
@@ -242,7 +243,7 @@
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
@@ -250,21 +251,21 @@
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
     def list_logical_devices(self):
         res = yield threads.deferToThread(
             self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+            metadata=((self.core_group_id_key, self.core_group_id), ))
         returnValue(res.items)
 
     @inlineCallbacks
     def subscribe(self, subscriber):
         res, call = yield threads.deferToThread(
             self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+            metadata=((self.core_group_id_key, self.core_group_id), ))
         returned_metadata = call.initial_metadata()
 
         # Update the core_group_id if present in the returned metadata
@@ -273,7 +274,7 @@
         else:
             log.debug('metadata-returned', metadata=returned_metadata)
             for pair in returned_metadata:
-                if pair[0] == self.CORE_GROUP_ID:
+                if pair[0] == self.core_group_id_key:
                     self.core_group_id = pair[1]
-                    log.debug('received-core-group-id', vcore_group=self.core_group_id)
+                    log.debug('core-binding', core_group=self.core_group_id)
         returnValue(res)
diff --git a/python/ofagent/main.py b/python/ofagent/main.py
index 06c2ae3..4f0531c 100755
--- a/python/ofagent/main.py
+++ b/python/ofagent/main.py
@@ -33,6 +33,8 @@
     external_host_address=os.environ.get('EXTERNAL_HOST_ADDRESS',
                                          get_my_primary_local_ipv4()),
     grpc_endpoint=os.environ.get('GRPC_ENDPOINT', 'localhost:50055'),
+    grpc_timeout=os.environ.get('GRPC_TIMEOUT', '10'),
+    core_binding_key=os.environ.get('CORE_BINDING_KEY', 'voltha_backend_name'),
     instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
     internal_host_address=os.environ.get('INTERNAL_HOST_ADDRESS',
                                          get_my_primary_local_ipv4()),
@@ -76,11 +78,11 @@
                         default=defs['external_host_address'],
                         help=_help)
 
-    _help = ('gRPC end-point to connect to. It can either be a direct'
-             'definition in the form of <hostname>:<port>, or it can be an'
-             'indirect definition in the form of @<service-name> where'
-             '<service-name> is the name of the grpc service as registered'
-             'in consul (example: @voltha-grpc). (default: %s'
+    _help = ('gRPC end-point to connect to. It can either be a direct '
+             'definition in the form of <hostname>:<port>, or it can be an '
+             'indirect definition in the form of @<service-name> where '
+             '<service-name> is the name of the grpc service as registered '
+             'in consul (example: @voltha-grpc). (default: %s)'
              % defs['grpc_endpoint'])
     parser.add_argument('-G', '--grpc-endpoint',
                         dest='grpc_endpoint',
@@ -88,7 +90,23 @@
                         default=defs['grpc_endpoint'],
                         help=_help)
 
-    _help = ('<hostname> or <ip> at which ofagent is reachable from inside'
+    _help = 'gRPC timeout in seconds (default: %s)' % defs['grpc_timeout']
+    parser.add_argument('-T', '--grpc-timeout',
+                        dest='grpc_timeout',
+                        action='store',
+                        default=defs['grpc_timeout'],
+                        help=_help)
+
+    _help = ('The name of the meta-key whose value is the rw-core group '
+             'to which the ofagent\'s gRPC client is bound. '
+             '(default: %s)' % defs['core_binding_key'])
+    parser.add_argument('-B', '--core-binding-key',
+                        dest='core_binding_key',
+                        action='store',
+                        default=defs['core_binding_key'],
+                        help=_help)
+
+    _help = ('<hostname> or <ip> at which ofagent is reachable from inside '
              'the cluster (default: %s)' % defs['internal_host_address'])
     parser.add_argument('-H', '--internal-host-address',
                         dest='internal_host_address',
@@ -203,9 +221,7 @@
 
         self.args = args = parse_args()
         self.config = load_config(args)
-        # May want to specify the gRPC timeout as an arg (in future)
-        # Right now, set a default value
-        self.grpc_timeout = 10
+        self.grpc_timeout = int(self.args.grpc_timeout)
 
         verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
         self.log = setup_logging(self.config.get('logging', {}),
@@ -231,7 +247,7 @@
         args = self.args
         self.connection_manager = yield ConnectionManager(
             args.consul, args.grpc_endpoint, self.grpc_timeout,
-            args.controller, args.instance_id,
+            args.core_binding_key, args.controller, args.instance_id,
             args.enable_tls, args.key_file, args.cert_file).start()
         self.log.info('started-internal-services')