VOL-1399: The value of the metadata field used by OFAgent & Arouter
should not be hard-coded
- Added grpc-timeout and core-binding-key options to OFAgent run command
- Added core_binding_key option to rw_core run command
Change-Id: Icf5fe226d17a1a5fcd9459a85e41c434fc7ac8b9
diff --git a/compose/ofagent.yml b/compose/ofagent.yml
new file mode 100644
index 0000000..31c0cad
--- /dev/null
+++ b/compose/ofagent.yml
@@ -0,0 +1,44 @@
+---
+# Copyright 2019 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2'
+services:
+ #
+ # OFAgent server instance
+ #
+ ofagent:
+ image: "${REGISTRY}${REPOSITORY}voltha-ofagent${TAG}"
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "10m"
+ max-file: "3"
+ command: [
+ "/ofagent/ofagent/main.py",
+ "-v",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--controller=${DOCKER_HOST_IP}:6653",
+ "--grpc-endpoint=${DOCKER_HOST_IP}:50057",
+# "--grpc-timeout=15",
+# "--core-binding-key=non_default_voltha_backend_name",
+ "--instance-id-is-container-name",
+ "--enable-tls",
+ "--key-file=/ofagent/pki/voltha.key",
+ "--cert-file=/ofagent/pki/voltha.crt",
+ "-v"
+ ]
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ restart: unless-stopped
diff --git a/python/ofagent/connection_mgr.py b/python/ofagent/connection_mgr.py
index b960b69..fea4910 100755
--- a/python/ofagent/connection_mgr.py
+++ b/python/ofagent/connection_mgr.py
@@ -39,7 +39,8 @@
# _ = third_party
class ConnectionManager(object):
- def __init__(self, consul_endpoint, vcore_endpoint, vcore_grpc_timeout,
+ def __init__(self, consul_endpoint,
+ vcore_endpoint, vcore_grpc_timeout, vcore_binding_key,
controller_endpoints, instance_id,
enable_tls=False, key_file=None, cert_file=None,
vcore_retry_interval=0.5, devices_refresh_interval=5,
@@ -51,6 +52,7 @@
self.consul_endpoint = consul_endpoint
self.vcore_endpoint = vcore_endpoint
self.grpc_timeout = vcore_grpc_timeout
+ self.core_binding_key = vcore_binding_key
self.instance_id = instance_id
self.enable_tls = enable_tls
self.key_file = key_file
@@ -161,7 +163,8 @@
# Send subscription request to register the current ofagent instance
container_name = self.instance_id
if self.grpc_client is None:
- self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout)
+ self.grpc_client = GrpcClient(self, self.channel, self.grpc_timeout,
+ self.core_binding_key)
subscription = yield self.grpc_client.subscribe(
OfAgentSubscriber(ofagent_id=container_name))
diff --git a/python/ofagent/grpc_client.py b/python/ofagent/grpc_client.py
index 76b052e..4a6d274 100755
--- a/python/ofagent/grpc_client.py
+++ b/python/ofagent/grpc_client.py
@@ -38,18 +38,18 @@
class GrpcClient(object):
- def __init__(self, connection_manager, channel, grpc_timeout):
+ def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key):
self.connection_manager = connection_manager
self.channel = channel
self.grpc_timeout = grpc_timeout
self.local_stub = VolthaServiceStub(channel)
- # This is the vcore group to which an OFAgent is bound.
+ # This is the rw-core cluster to which an OFAgent is bound.
# It is the affinity router that forwards all OFAgent
- # requests to the primary vcore in the group.
+ # requests to a specific rw-core in this back-end cluster.
self.core_group_id = ''
- self.CORE_GROUP_ID = 'voltha_backend_name'
+ self.core_group_id_key = core_binding_key
self.stopped = False
@@ -58,7 +58,8 @@
self.change_event_queue = DeferredQueue() # queue change events
def start(self):
- log.debug('starting', grpc_timeout=self.grpc_timeout)
+ log.debug('starting', grpc_timeout=self.grpc_timeout,
+ core_binding_key=self.core_group_id_key)
self.start_packet_out_stream()
self.start_packet_in_stream()
self.start_change_event_in_stream()
@@ -88,7 +89,7 @@
generator = packet_generator()
try:
self.local_stub.StreamPacketsOut(generator,
- metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+ metadata=((self.core_group_id_key, self.core_group_id), ))
except _Rendezvous, e:
log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
@@ -101,7 +102,7 @@
def receive_packet_in_stream():
streaming_rpc_method = self.local_stub.ReceivePacketsIn
iterator = streaming_rpc_method(empty_pb2.Empty(),
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
try:
for packet_in in iterator:
reactor.callFromThread(self.packet_in_queue.put,
@@ -121,7 +122,7 @@
def receive_change_events():
streaming_rpc_method = self.local_stub.ReceiveChangeEvents
iterator = streaming_rpc_method(empty_pb2.Empty(),
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
try:
for event in iterator:
reactor.callFromThread(self.change_event_queue.put, event)
@@ -166,7 +167,7 @@
req = LogicalPortId(id=device_id, port_id=port_id)
res = yield threads.deferToThread(
self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -174,7 +175,7 @@
req = ID(id=device_id)
res = yield threads.deferToThread(
self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
@@ -185,7 +186,7 @@
)
res = yield threads.deferToThread(
self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -196,7 +197,7 @@
)
res = yield threads.deferToThread(
self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -204,7 +205,7 @@
req = ID(id=device_id)
res = yield threads.deferToThread(
self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -215,7 +216,7 @@
)
res = yield threads.deferToThread(
self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -226,7 +227,7 @@
)
res = yield threads.deferToThread(
self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -234,7 +235,7 @@
req = ID(id=device_id)
res = yield threads.deferToThread(
self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
@@ -242,7 +243,7 @@
req = ID(id=device_id)
res = yield threads.deferToThread(
self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
@@ -250,21 +251,21 @@
req = ID(id=device_id)
res = yield threads.deferToThread(
self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
def list_logical_devices(self):
res = yield threads.deferToThread(
self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+ metadata=((self.core_group_id_key, self.core_group_id), ))
returnValue(res.items)
@inlineCallbacks
def subscribe(self, subscriber):
res, call = yield threads.deferToThread(
self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+ metadata=((self.core_group_id_key, self.core_group_id), ))
returned_metadata = call.initial_metadata()
# Update the core_group_id if present in the returned metadata
@@ -273,7 +274,7 @@
else:
log.debug('metadata-returned', metadata=returned_metadata)
for pair in returned_metadata:
- if pair[0] == self.CORE_GROUP_ID:
+ if pair[0] == self.core_group_id_key:
self.core_group_id = pair[1]
- log.debug('received-core-group-id', vcore_group=self.core_group_id)
+ log.debug('core-binding', core_group=self.core_group_id)
returnValue(res)
diff --git a/python/ofagent/main.py b/python/ofagent/main.py
index 06c2ae3..4f0531c 100755
--- a/python/ofagent/main.py
+++ b/python/ofagent/main.py
@@ -33,6 +33,8 @@
external_host_address=os.environ.get('EXTERNAL_HOST_ADDRESS',
get_my_primary_local_ipv4()),
grpc_endpoint=os.environ.get('GRPC_ENDPOINT', 'localhost:50055'),
+ grpc_timeout=os.environ.get('GRPC_TIMEOUT', '10'),
+ core_binding_key=os.environ.get('CORE_BINDING_KEY', 'voltha_backend_name'),
instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
internal_host_address=os.environ.get('INTERNAL_HOST_ADDRESS',
get_my_primary_local_ipv4()),
@@ -76,11 +78,11 @@
default=defs['external_host_address'],
help=_help)
- _help = ('gRPC end-point to connect to. It can either be a direct'
- 'definition in the form of <hostname>:<port>, or it can be an'
- 'indirect definition in the form of @<service-name> where'
- '<service-name> is the name of the grpc service as registered'
- 'in consul (example: @voltha-grpc). (default: %s'
+ _help = ('gRPC end-point to connect to. It can either be a direct '
+ 'definition in the form of <hostname>:<port>, or it can be an '
+ 'indirect definition in the form of @<service-name> where '
+ '<service-name> is the name of the grpc service as registered '
+ 'in consul (example: @voltha-grpc). (default: %s)'
% defs['grpc_endpoint'])
parser.add_argument('-G', '--grpc-endpoint',
dest='grpc_endpoint',
@@ -88,7 +90,23 @@
default=defs['grpc_endpoint'],
help=_help)
- _help = ('<hostname> or <ip> at which ofagent is reachable from inside'
+ _help = 'gRPC timeout in seconds (default: %s)' % defs['grpc_timeout']
+ parser.add_argument('-T', '--grpc-timeout',
+ dest='grpc_timeout',
+ action='store',
+ default=defs['grpc_timeout'],
+ help=_help)
+
+ _help = ('The name of the meta-key whose value is the rw-core group '
+ 'to which the ofagent\'s gRPC client is bound. '
+ '(default: %s)' % defs['core_binding_key'])
+ parser.add_argument('-B', '--core-binding-key',
+ dest='core_binding_key',
+ action='store',
+ default=defs['core_binding_key'],
+ help=_help)
+
+ _help = ('<hostname> or <ip> at which ofagent is reachable from inside '
'the cluster (default: %s)' % defs['internal_host_address'])
parser.add_argument('-H', '--internal-host-address',
dest='internal_host_address',
@@ -203,9 +221,7 @@
self.args = args = parse_args()
self.config = load_config(args)
- # May want to specify the gRPC timeout as an arg (in future)
- # Right now, set a default value
- self.grpc_timeout = 10
+ self.grpc_timeout = int(self.args.grpc_timeout)
verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
self.log = setup_logging(self.config.get('logging', {}),
@@ -231,7 +247,7 @@
args = self.args
self.connection_manager = yield ConnectionManager(
args.consul, args.grpc_endpoint, self.grpc_timeout,
- args.controller, args.instance_id,
+ args.core_binding_key, args.controller, args.instance_id,
args.enable_tls, args.key_file, args.cert_file).start()
self.log.info('started-internal-services')
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index b924376..f977be0 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -50,6 +50,7 @@
default_InCompetingMode = true
default_LongRunningRequestTimeout = int64(2000)
default_DefaultRequestTimeout = int64(500)
+ default_CoreBindingKey = "voltha_backend_name"
)
// RWCoreFlags represents the set of configurations used by the read-write core service
@@ -79,6 +80,7 @@
InCompetingMode bool
LongRunningRequestTimeout int64
DefaultRequestTimeout int64
+ CoreBindingKey string
}
func init() {
@@ -112,6 +114,7 @@
InCompetingMode: default_InCompetingMode,
DefaultRequestTimeout:default_DefaultRequestTimeout,
LongRunningRequestTimeout:default_LongRunningRequestTimeout,
+ CoreBindingKey: default_CoreBindingKey,
}
return &rwCoreFlag
}
@@ -181,6 +184,9 @@
help = fmt.Sprintf("Show startup banner log lines")
flag.BoolVar(&cf.Banner, "banner", default_Banner, help)
+ help = fmt.Sprintf("The name of the meta-key whose value is the rw-core group to which the ofagent is bound")
+ flag.StringVar(&(cf.CoreBindingKey), "core_binding_key", default_CoreBindingKey, help)
+
flag.Parse()
containerName := getContainerInfo()
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index b3bbc71..13baf84 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -124,6 +124,7 @@
//core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
core.grpcNBIAPIHandler = NewAPIHandler(core)
+ log.Infow("grpc-handler", log.Fields{"core_binding_key": core.config.CoreBindingKey})
core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
// Create a function to register the core GRPC service with the GRPC server
f := func(gs *grpc.Server) {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 0bbe5de..7b486bd 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -32,9 +32,6 @@
"time"
)
-//TODO: Move this Tag into the proto file
-const OF_CONTROLLER_TAG= "voltha_backend_name"
-
const (
IMAGE_DOWNLOAD = iota
CANCEL_IMAGE_DOWNLOAD = iota
@@ -124,10 +121,10 @@
// isOFControllerRequest is a helper function to determine if a request was initiated
// from the OpenFlow controller (or its proxy, e.g. OFAgent)
-func isOFControllerRequest(ctx context.Context) bool {
+func (handler *APIHandler) isOFControllerRequest(ctx context.Context) bool {
if md, ok := metadata.FromIncomingContext(ctx); ok {
// Metadata in context
- if _, ok = md[OF_CONTROLLER_TAG]; ok {
+ if _, ok = md[handler.core.config.CoreBindingKey]; ok {
// OFAgent field in metadata
return true
}
@@ -271,7 +268,7 @@
}
if handler.competeForTransaction() {
- if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
+ if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {
@@ -294,7 +291,7 @@
}
if handler.competeForTransaction() {
- if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
+ if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {