- Updated all OFAgent RPCs destined toward a Voltha core to contain
a metadata field that specifies which Voltha core grouping should
service the request.
- Added transaction handler code to all Voltha core APIs
that modify the model.
Change-Id: I8dafc95f0a1b33d99409d73ee00d8294f09a2782
diff --git a/python/ofagent/grpc_client.py b/python/ofagent/grpc_client.py
index 5a05696..b58612f 100755
--- a/python/ofagent/grpc_client.py
+++ b/python/ofagent/grpc_client.py
@@ -45,6 +45,12 @@
self.grpc_timeout = grpc_timeout
self.local_stub = VolthaServiceStub(channel)
+ # This is the vcore group to which an OFAgent is bound.
+ # It is the affinity router that forwards all OFAgent
+ # requests to the primary vcore in the group.
+ self.core_group_id = ''
+ self.CORE_GROUP_ID = 'voltha_backend_name'
+
self.stopped = False
self.packet_out_queue = Queue() # queue to send out PacketOut msgs
@@ -81,7 +87,8 @@
def stream_packets_out():
generator = packet_generator()
try:
- self.local_stub.StreamPacketsOut(generator)
+ self.local_stub.StreamPacketsOut(generator,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
except _Rendezvous, e:
log.error('grpc-exception', status=e.code())
if e.code() == StatusCode.UNAVAILABLE:
@@ -156,14 +163,16 @@
def get_port(self, device_id, port_id):
req = LogicalPortId(id=device_id, port_id=port_id)
res = yield threads.deferToThread(
- self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout)
+ self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id),))
returnValue(res)
@inlineCallbacks
def get_port_list(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
- self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
+ self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
@@ -173,7 +182,8 @@
port_id=port_id
)
res = yield threads.deferToThread(
- self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout)
+ self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -183,14 +193,16 @@
port_id=port_id
)
res = yield threads.deferToThread(
- self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout)
+ self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id),))
returnValue(res)
@inlineCallbacks
def get_device_info(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
- self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout)
+ self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -200,7 +212,8 @@
flow_mod=flow_mod
)
res = yield threads.deferToThread(
- self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout)
+ self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id),))
returnValue(res)
@inlineCallbacks
@@ -210,38 +223,55 @@
group_mod=group_mod
)
res = yield threads.deferToThread(
- self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout)
+ self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id),))
returnValue(res)
@inlineCallbacks
def list_flows(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
- self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout)
+ self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
def list_groups(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
- self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout)
+ self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
def list_ports(self, device_id):
req = ID(id=device_id)
res = yield threads.deferToThread(
- self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
+ self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id),))
returnValue(res.items)
@inlineCallbacks
def list_logical_devices(self):
res = yield threads.deferToThread(
- self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout)
+ self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
returnValue(res.items)
@inlineCallbacks
def subscribe(self, subscriber):
- res = yield threads.deferToThread(
- self.local_stub.Subscribe, subscriber, timeout=self.grpc_timeout)
+ res, call = yield threads.deferToThread(
+ self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
+ metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
+ returned_metadata = call.initial_metadata()
+
+ # Update the core_group_id if present in the returned metadata
+ if returned_metadata is None:
+ log.debug('header-metadata-missing')
+ else:
+ log.debug('metadata-returned', metadata=returned_metadata)
+ for pair in returned_metadata:
+ if pair[0] == self.CORE_GROUP_ID:
+ self.core_group_id = pair[1]
+ log.debug('received-core-group-id', vcore_group=self.core_group_id)
returnValue(res)