VOL-1399: The value of the metadata field used by OFAgent & Arouter
          should not be hard-coded
- Added grpc-timeout and core-binding-key options to OFAgent run command
- Added core_binding_key option to rw_core run command

Change-Id: Icf5fe226d17a1a5fcd9459a85e41c434fc7ac8b9
diff --git a/compose/ofagent.yml b/compose/ofagent.yml
new file mode 100644
index 0000000..31c0cad
--- /dev/null
+++ b/compose/ofagent.yml
@@ -0,0 +1,44 @@
+---
+# Copyright 2019 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2'
+services:
+  #
+  # OFAgent server instance
+  #
+  ofagent:
+    image: "${REGISTRY}${REPOSITORY}voltha-ofagent${TAG}"
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "10m"
+        max-file: "3"
+    command: [
+      "/ofagent/ofagent/main.py",
+      "-v",
+      "--consul=${DOCKER_HOST_IP}:8500",
+      "--controller=${DOCKER_HOST_IP}:6653",
+      "--grpc-endpoint=${DOCKER_HOST_IP}:50057",
+#      "--grpc-timeout=15",
+#      "--core-binding-key=non_default_voltha_backend_name",
+      "--instance-id-is-container-name",
+      "--enable-tls",
+      "--key-file=/ofagent/pki/voltha.key",
+      "--cert-file=/ofagent/pki/voltha.crt",
+      "-v"
+    ]
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+    restart: unless-stopped
diff --git a/python/ofagent/connection_mgr.py b/python/ofagent/connection_mgr.py
index b960b69..fea4910 100755
--- a/python/ofagent/connection_mgr.py
+++ b/python/ofagent/connection_mgr.py
@@ -39,7 +39,8 @@
 # _ = third_party
 
 class ConnectionManager(object):
-    def __init__(self, consul_endpoint, vcore_endpoint, vcore_grpc_timeout,
+    def __init__(self, consul_endpoint,
+                 vcore_endpoint, vcore_grpc_timeout, vcore_binding_key,
                  controller_endpoints, instance_id,
                  enable_tls=False, key_file=None, cert_file=None,
                  vcore_retry_interval=0.5, devices_refresh_interval=5,
@@ -51,6 +52,7 @@
         self.consul_endpoint = consul_endpoint
         self.vcore_endpoint = vcore_endpoint
         self.grpc_timeout = vcore_grpc_timeout
+        self.core_binding_key = vcore_binding_key
         self.instance_id = instance_id
         self.enable_tls = enable_tls
         self.key_file = key_file
@@ -161,7 +163,8 @@
                 # Send subscription request to register the current ofagent instance
                 container_name = self.instance_id
                 if self.grpc_client is None:
-                    self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout)
+                    self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout,
+                                                  self.core_binding_key)
                 subscription = yield self.grpc_client.subscribe(
                     OfAgentSubscriber(ofagent_id=container_name))
 
diff --git a/python/ofagent/grpc_client.py b/python/ofagent/grpc_client.py
index 76b052e..4a6d274 100755
--- a/python/ofagent/grpc_client.py
+++ b/python/ofagent/grpc_client.py
@@ -38,18 +38,18 @@
 
 class GrpcClient(object):
 
-    def __init__(self, connection_manager, channel, grpc_timeout):
+    def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key):
 
         self.connection_manager = connection_manager
         self.channel = channel
         self.grpc_timeout = grpc_timeout
         self.local_stub = VolthaServiceStub(channel)
 
-        # This is the vcore group to which an OFAgent is bound.
+        # This is the rw-core cluster to which an OFAgent is bound.
         # It is the affinity router that forwards all OFAgent
-        # requests to the primary vcore in the group.
+        # requests to a specific rw-core in this back-end cluster.
         self.core_group_id = ''
-        self.CORE_GROUP_ID = 'voltha_backend_name'
+        self.core_group_id_key = core_binding_key
 
         self.stopped = False
 
@@ -58,7 +58,8 @@
         self.change_event_queue = DeferredQueue()  # queue change events
 
     def start(self):
-        log.debug('starting', grpc_timeout=self.grpc_timeout)
+        log.debug('starting', grpc_timeout=self.grpc_timeout,
+                  core_binding_key=self.core_group_id_key)
         self.start_packet_out_stream()
         self.start_packet_in_stream()
         self.start_change_event_in_stream()
@@ -88,7 +89,7 @@
             generator = packet_generator()
             try:
                 self.local_stub.StreamPacketsOut(generator,
-                                                 metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+                                                 metadata=((self.core_group_id_key, self.core_group_id), ))
             except _Rendezvous, e:
                 log.error('grpc-exception', status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
@@ -101,7 +102,7 @@
         def receive_packet_in_stream():
             streaming_rpc_method = self.local_stub.ReceivePacketsIn
             iterator = streaming_rpc_method(empty_pb2.Empty(),
-                                            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+                                            metadata=((self.core_group_id_key, self.core_group_id),))
             try:
                 for packet_in in iterator:
                     reactor.callFromThread(self.packet_in_queue.put,
@@ -121,7 +122,7 @@
         def receive_change_events():
             streaming_rpc_method = self.local_stub.ReceiveChangeEvents
             iterator = streaming_rpc_method(empty_pb2.Empty(),
-                                            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+                                            metadata=((self.core_group_id_key, self.core_group_id),))
             try:
                 for event in iterator:
                     reactor.callFromThread(self.change_event_queue.put, event)
@@ -166,7 +167,7 @@
         req = LogicalPortId(id=device_id, port_id=port_id)
         res = yield threads.deferToThread(
             self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -174,7 +175,7 @@
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
@@ -185,7 +186,7 @@
         )
         res = yield threads.deferToThread(
             self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -196,7 +197,7 @@
         )
         res = yield threads.deferToThread(
             self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -204,7 +205,7 @@
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -215,7 +216,7 @@
         )
         res = yield threads.deferToThread(
             self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -226,7 +227,7 @@
         )
         res = yield threads.deferToThread(
             self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res)
 
     @inlineCallbacks
@@ -234,7 +235,7 @@
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
@@ -242,7 +243,7 @@
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
@@ -250,21 +251,21 @@
         req = ID(id=device_id)
         res = yield threads.deferToThread(
             self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+            metadata=((self.core_group_id_key, self.core_group_id),))
         returnValue(res.items)
 
     @inlineCallbacks
     def list_logical_devices(self):
         res = yield threads.deferToThread(
             self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+            metadata=((self.core_group_id_key, self.core_group_id), ))
         returnValue(res.items)
 
     @inlineCallbacks
     def subscribe(self, subscriber):
         res, call = yield threads.deferToThread(
             self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
-            metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+            metadata=((self.core_group_id_key, self.core_group_id), ))
         returned_metadata = call.initial_metadata()
 
         # Update the core_group_id if present in the returned metadata
@@ -273,7 +274,7 @@
         else:
             log.debug('metadata-returned', metadata=returned_metadata)
             for pair in returned_metadata:
-                if pair[0] == self.CORE_GROUP_ID:
+                if pair[0] == self.core_group_id_key:
                     self.core_group_id = pair[1]
-                    log.debug('received-core-group-id', vcore_group=self.core_group_id)
+                    log.debug('core-binding', core_group=self.core_group_id)
         returnValue(res)
diff --git a/python/ofagent/main.py b/python/ofagent/main.py
index 06c2ae3..4f0531c 100755
--- a/python/ofagent/main.py
+++ b/python/ofagent/main.py
@@ -33,6 +33,8 @@
     external_host_address=os.environ.get('EXTERNAL_HOST_ADDRESS',
                                          get_my_primary_local_ipv4()),
     grpc_endpoint=os.environ.get('GRPC_ENDPOINT', 'localhost:50055'),
+    grpc_timeout=os.environ.get('GRPC_TIMEOUT', '10'),
+    core_binding_key=os.environ.get('CORE_BINDING_KEY', 'voltha_backend_name'),
     instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
     internal_host_address=os.environ.get('INTERNAL_HOST_ADDRESS',
                                          get_my_primary_local_ipv4()),
@@ -76,11 +78,11 @@
                         default=defs['external_host_address'],
                         help=_help)
 
-    _help = ('gRPC end-point to connect to. It can either be a direct'
-             'definition in the form of <hostname>:<port>, or it can be an'
-             'indirect definition in the form of @<service-name> where'
-             '<service-name> is the name of the grpc service as registered'
-             'in consul (example: @voltha-grpc). (default: %s'
+    _help = ('gRPC end-point to connect to. It can either be a direct '
+             'definition in the form of <hostname>:<port>, or it can be an '
+             'indirect definition in the form of @<service-name> where '
+             '<service-name> is the name of the grpc service as registered '
+             'in consul (example: @voltha-grpc). (default: %s)'
              % defs['grpc_endpoint'])
     parser.add_argument('-G', '--grpc-endpoint',
                         dest='grpc_endpoint',
@@ -88,7 +90,23 @@
                         default=defs['grpc_endpoint'],
                         help=_help)
 
-    _help = ('<hostname> or <ip> at which ofagent is reachable from inside'
+    _help = 'gRPC timeout in seconds (default: %s)' % defs['grpc_timeout']
+    parser.add_argument('-T', '--grpc-timeout',
+                        dest='grpc_timeout',
+                        action='store',
+                        default=defs['grpc_timeout'],
+                        help=_help)
+
+    _help = ('The name of the meta-key whose value is the rw-core group '
+             'to which the ofagent\'s gRPC client is bound. '
+             '(default: %s)' % defs['core_binding_key'])
+    parser.add_argument('-B', '--core-binding-key',
+                        dest='core_binding_key',
+                        action='store',
+                        default=defs['core_binding_key'],
+                        help=_help)
+
+    _help = ('<hostname> or <ip> at which ofagent is reachable from inside '
              'the cluster (default: %s)' % defs['internal_host_address'])
     parser.add_argument('-H', '--internal-host-address',
                         dest='internal_host_address',
@@ -203,9 +221,7 @@
 
         self.args = args = parse_args()
         self.config = load_config(args)
-        # May want to specify the gRPC timeout as an arg (in future)
-        # Right now, set a default value
-        self.grpc_timeout = 10
+        self.grpc_timeout = int(self.args.grpc_timeout)
 
         verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
         self.log = setup_logging(self.config.get('logging', {}),
@@ -231,7 +247,7 @@
         args = self.args
         self.connection_manager = yield ConnectionManager(
             args.consul, args.grpc_endpoint, self.grpc_timeout,
-            args.controller, args.instance_id,
+            args.core_binding_key, args.controller, args.instance_id,
             args.enable_tls, args.key_file, args.cert_file).start()
         self.log.info('started-internal-services')
 
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index b924376..f977be0 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -50,6 +50,7 @@
 	default_InCompetingMode     = true
 	default_LongRunningRequestTimeout = int64(2000)
 	default_DefaultRequestTimeout = int64(500)
+	default_CoreBindingKey      = "voltha_backend_name"
 )
 
 // RWCoreFlags represents the set of configurations used by the read-write core service
@@ -79,6 +80,7 @@
 	InCompetingMode     bool
 	LongRunningRequestTimeout int64
 	DefaultRequestTimeout int64
+	CoreBindingKey      string
 }
 
 func init() {
@@ -112,6 +114,7 @@
 		InCompetingMode:     default_InCompetingMode,
 		DefaultRequestTimeout:default_DefaultRequestTimeout,
 		LongRunningRequestTimeout:default_LongRunningRequestTimeout,
+		CoreBindingKey:      default_CoreBindingKey,
 	}
 	return &rwCoreFlag
 }
@@ -181,6 +184,9 @@
 	help = fmt.Sprintf("Show startup banner log lines")
 	flag.BoolVar(&cf.Banner, "banner", default_Banner, help)
 
+	help = fmt.Sprintf("The name of the meta-key whose value is the rw-core group to which the ofagent is bound")
+	flag.StringVar(&(cf.CoreBindingKey), "core_binding_key", default_CoreBindingKey, help)
+
 	flag.Parse()
 
 	containerName := getContainerInfo()
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index b3bbc71..13baf84 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -124,6 +124,7 @@
 
 	//core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
 	core.grpcNBIAPIHandler = NewAPIHandler(core)
+	log.Infow("grpc-handler", log.Fields{"core_binding_key": core.config.CoreBindingKey})
 	core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
 	//	Create a function to register the core GRPC service with the GRPC server
 	f := func(gs *grpc.Server) {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 0bbe5de..7b486bd 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -32,9 +32,6 @@
 	"time"
 )
 
-//TODO:  Move this Tag into the proto file
-const OF_CONTROLLER_TAG= "voltha_backend_name"
-
 const (
 	IMAGE_DOWNLOAD = iota
 	CANCEL_IMAGE_DOWNLOAD     = iota
@@ -124,10 +121,10 @@
 
 // isOFControllerRequest is a helper function to determine if a request was initiated
 // from the OpenFlow controller (or its proxy, e.g. OFAgent)
-func isOFControllerRequest(ctx context.Context) bool {
+func (handler *APIHandler) isOFControllerRequest(ctx context.Context) bool {
 	if md, ok := metadata.FromIncomingContext(ctx); ok {
 		// Metadata in context
-		if _, ok = md[OF_CONTROLLER_TAG]; ok {
+		if _, ok = md[handler.core.config.CoreBindingKey]; ok {
 			// OFAgent field in metadata
 			return true
 		}
@@ -271,7 +268,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
+		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
 			if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
@@ -294,7 +291,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
+		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
 			if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {