VOL-1399: The value of the metadata field used by OFAgent & Arouter
should not be hard-coded
- Added grpc-timeout and core-binding-key options to OFAgent run command
- Added core_binding_key option to rw_core run command
Change-Id: Icf5fe226d17a1a5fcd9459a85e41c434fc7ac8b9
diff --git a/python/ofagent/grpc_client.py b/python/ofagent/grpc_client.py
index 76b052e..4a6d274 100755
--- a/python/ofagent/grpc_client.py
+++ b/python/ofagent/grpc_client.py
@@ -38,18 +38,18 @@
class GrpcClient(object):
- def __init__(self, connection_manager, channel, grpc_timeout):
+ def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key):
self.connection_manager = connection_manager
self.channel = channel
self.grpc_timeout = grpc_timeout
self.local_stub = VolthaServiceStub(channel)
- # This is the vcore group to which an OFAgent is bound.
+ # This is the rw-core cluster to which an OFAgent is bound.
# It is the affinity router that forwards all OFAgent
- # requests to the primary vcore in the group.
+ # requests to a specific rw-core in this back-end cluster.
self.core_group_id = ''
- self.CORE_GROUP_ID = 'voltha_backend_name'
+ self.core_group_id_key = core_binding_key
self.stopped = False
@@ -58,7 +58,8 @@
self.change_event_queue = DeferredQueue() # queue change events
def start(self):
- log.debug('starting', grpc_timeout=self.grpc_timeout)
+ log.debug('starting', grpc_timeout=self.grpc_timeout,
+ core_binding_key=self.core_group_id_key)
self.start_packet_out_stream()
self.start_packet_in_stream()
self.start_change_event_in_stream()
@@ -88,7 +89,7 @@
generator = packet_generator()
try:
self.local_stub.StreamPacketsOut(generator,
- metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+ metadata=((self.core_group_id_key, self.core_group_id), ))
except _Rendezvous, e:
log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
@@ -101,7 +102,7 @@
def receive_packet_in_stream():
streaming_rpc_method = self.local_stub.ReceivePacketsIn
iterator = streaming_rpc_method(empty_pb2.Empty(),
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
try:
for packet_in in iterator:
reactor.callFromThread(self.packet_in_queue.put,
@@ -121,7 +122,7 @@
def receive_change_events():
streaming_rpc_method = self.local_stub.ReceiveChangeEvents
iterator = streaming_rpc_method(empty_pb2.Empty(),
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
try:
for event in iterator:
reactor.callFromThread(self.change_event_queue.put, event)
@@ -166,7 +167,7 @@
req = LogicalPortId(id=device_id, port_id=port_id)
res = yield threads.deferToThread(
self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -174,7 +175,7 @@
req = ID(id=device_id)
res = yield threads.deferToThread(
self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
@@ -185,7 +186,7 @@
)
res = yield threads.deferToThread(
self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -196,7 +197,7 @@
)
res = yield threads.deferToThread(
self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -204,7 +205,7 @@
req = ID(id=device_id)
res = yield threads.deferToThread(
self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -215,7 +216,7 @@
)
res = yield threads.deferToThread(
self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -226,7 +227,7 @@
)
res = yield threads.deferToThread(
self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -234,7 +235,7 @@
req = ID(id=device_id)
res = yield threads.deferToThread(
self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
@@ -242,7 +243,7 @@
req = ID(id=device_id)
res = yield threads.deferToThread(
self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
@@ -250,21 +251,21 @@
req = ID(id=device_id)
res = yield threads.deferToThread(
self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id),))
+ metadata=((self.core_group_id_key, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
def list_logical_devices(self):
res = yield threads.deferToThread(
self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+ metadata=((self.core_group_id_key, self.core_group_id), ))
returnValue(res.items)
@inlineCallbacks
def subscribe(self, subscriber):
res, call = yield threads.deferToThread(
self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
- metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+ metadata=((self.core_group_id_key, self.core_group_id), ))
returned_metadata = call.initial_metadata()
# Update the core_group_id if present in the returned metadata
@@ -273,7 +274,7 @@
else:
log.debug('metadata-returned', metadata=returned_metadata)
for pair in returned_metadata:
- if pair[0] == self.CORE_GROUP_ID:
+ if pair[0] == self.core_group_id_key:
self.core_group_id = pair[1]
- log.debug('received-core-group-id', vcore_group=self.core_group_id)
+ log.debug('core-binding', core_group=self.core_group_id)
returnValue(res)