[ 4460 ] Minor fix when getting invalid voltha instance
Initial commit of the Global Forwarder.
Change-Id: I6c619a8589abaeecba00c004a42beaf063f31448
diff --git a/voltha/core/global_handler.py b/voltha/core/global_handler.py
index 6979de3..7c80826 100644
--- a/voltha/core/global_handler.py
+++ b/voltha/core/global_handler.py
@@ -18,20 +18,23 @@
from twisted.internet.defer import returnValue
from common.utils.grpc_utils import twisted_async
+from common.utils.id_generation import create_cluster_id
from voltha.core.config.config_root import ConfigRoot
from voltha.protos.device_pb2 import PmConfigs, Images
from voltha.protos.voltha_pb2 import \
add_VolthaGlobalServiceServicer_to_server, VolthaLocalServiceStub, \
VolthaGlobalServiceServicer, Voltha, VolthaInstances, VolthaInstance, \
- LogicalDevice, Ports, Flows, FlowGroups, Device, SelfTestResponse
+ LogicalDevice, Ports, Flows, FlowGroups, Device, SelfTestResponse, \
+ VolthaGlobalServiceStub, Devices, DeviceType, DeviceTypes, DeviceGroup, \
+ AlarmFilter, AlarmFilters
from voltha.registry import registry
from google.protobuf.empty_pb2 import Empty
+from dispatcher import DispatchError
log = structlog.get_logger()
class GlobalHandler(VolthaGlobalServiceServicer):
-
def __init__(self, dispatcher, instance_id, **init_kw):
self.dispatcher = dispatcher
self.instance_id = instance_id
@@ -61,539 +64,585 @@
return self.root.get('/', depth=1)
@twisted_async
- @inlineCallbacks
def ListVolthaInstances(self, request, context):
log.info('grpc-request', request=request)
- items = yield registry('coordinator').get_members()
- returnValue(VolthaInstances(items=items))
+ items = self.dispatcher.get_cluster_instances()
+ return VolthaInstances(items=items)
@twisted_async
+ @inlineCallbacks
def GetVolthaInstance(self, request, context):
log.info('grpc-request', request=request)
- instance_id = request.id
- try:
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'GetVolthaInstance',
- Empty(),
- context)
- except KeyError:
- context.set_details(
- 'Voltha instance \'{}\' not found'.format(instance_id))
+ core_id = self.dispatcher.get_core_id_from_instance_id(request.id)
+ if not core_id:
+ log.info('invalid-instance-id', instance=request.id)
+ context.set_details('Voltha Instance error')
context.set_code(StatusCode.NOT_FOUND)
- return VolthaInstance()
+ returnValue(VolthaInstance())
+
+ response = yield self.dispatcher.dispatch('GetVolthaInstance',
+ Empty(),
+ context,
+ core_id=core_id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Voltha Instance error')
+ context.set_code(response.error_code)
+ returnValue(VolthaInstance())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListLogicalDevices(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'ListLogicalDevices',
- Empty(),
- context)
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('ListLogicalDevices',
+ Empty(),
+ context,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetLogicalDevice(self, request, context):
log.info('grpc-request', request=request)
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
+ response = yield self.dispatcher.dispatch('GetLogicalDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return LogicalDevice()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'GetLogicalDevice',
- request,
- context)
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(LogicalDevice())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListLogicalDevicePorts(self, request, context):
log.info('grpc-request', request=request)
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
+ response = yield self.dispatcher.dispatch('ListLogicalDevicePorts',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Ports()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListLogicalDevicePorts',
- request,
- context)
+ 'Logical device ports \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Ports())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListLogicalDeviceFlows(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
+ response = yield self.dispatcher.dispatch('ListLogicalDeviceFlows',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Flows()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListLogicalDeviceFlows',
- request,
- context)
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Flows())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def UpdateLogicalDeviceFlowTable(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Empty()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
+ response = yield self.dispatcher.dispatch(
'UpdateLogicalDeviceFlowTable',
request,
- context)
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details(
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListLogicalDeviceFlowGroups(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return FlowGroups()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
+ response = yield self.dispatcher.dispatch(
'ListLogicalDeviceFlowGroups',
request,
- context)
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details(
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(FlowGroups())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def UpdateLogicalDeviceFlowGroupTable(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_logical_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Logical device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Empty()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
+ response = yield self.dispatcher.dispatch(
'UpdateLogicalDeviceFlowGroupTable',
request,
- context)
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details(
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDevices(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'ListDevices',
- request,
- context)
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('ListDevices',
+ Empty(),
+ context,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetDevice(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Device()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'GetDevice',
- request,
- context)
+ response = yield self.dispatcher.dispatch('GetDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Device())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def CreateDevice(self, request, context):
log.info('grpc-request', request=request)
- # TODO dispatching to local instead of passing it to leader
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'CreateDevice',
- request,
- context)
+ response = yield self.dispatcher.dispatch('CreateDevice',
+ request,
+ context)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Create device error')
+ context.set_code(response.error_code)
+ returnValue(Device())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def EnableDevice(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Device()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'EnableDevice',
- request,
- context)
-
+ response = yield self.dispatcher.dispatch('EnableDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Device())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def DisableDevice(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Device()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'DisableDevice',
- request,
- context)
+ response = yield self.dispatcher.dispatch('DisableDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Device())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def RebootDevice(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Device()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'RebootDevice',
- request,
- context)
+ response = yield self.dispatcher.dispatch('RebootDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Device())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def DeleteDevice(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Device()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'DeleteDevice',
- request,
- context)
+ response = yield self.dispatcher.dispatch('DeleteDevice',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(Empty())
@twisted_async
+ @inlineCallbacks
def ListDevicePorts(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Ports()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListDevicePorts',
- request,
- context)
+ response = yield self.dispatcher.dispatch('ListDevicePorts',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Ports())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDevicePmConfigs(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return PmConfigs()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListDevicePmConfigs',
- request,
- context)
+ response = yield self.dispatcher.dispatch('ListDevicePmConfigs',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(PmConfigs())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def UpdateDevicePmConfigs(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Empty()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'UpdateDevicePmConfigs',
- request,
- context)
+ response = yield self.dispatcher.dispatch('UpdateDevicePmConfigs',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDeviceFlows(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Flows()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListDeviceFlows',
- request,
- context)
+ response = yield self.dispatcher.dispatch('ListDeviceFlows',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Flows())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDeviceFlowGroups(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return FlowGroups()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'ListDeviceFlowGroups',
- request,
- context)
+ response = yield self.dispatcher.dispatch('ListDeviceFlowGroups',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(FlowGroups())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDeviceTypes(self, request, context):
log.info('grpc-request', request=request)
# we always deflect this to the local instance, as we assume
# they all loaded the same adapters, supporting the same device
# types
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'ListDeviceTypes',
- request,
- context)
+ response = yield self.dispatcher.dispatch('ListDeviceTypes',
+ request,
+ context)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device types error')
+ context.set_code(response.error_code)
+ returnValue(DeviceTypes())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetDeviceType(self, request, context):
log.info('grpc-request', request=request)
# we always deflect this to the local instance, as we assume
# they all loaded the same adapters, supporting the same device
# types
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'GetDeviceType',
- request,
- context)
+ response = yield self.dispatcher.dispatch('GetDeviceType',
+ request,
+ context)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device type \'{}\' error'.format(
+ request.id))
+ context.set_code(response.error_code)
+ returnValue(DeviceType())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def ListDeviceGroups(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'ListDeviceGroups',
- Empty(),
- context)
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('ListDeviceGroups',
+ Empty(),
+ context,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetDeviceGroup(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'GetDeviceGroup',
- request,
- context)
-
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('GetDeviceGroup',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device group\'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(DeviceGroup())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def CreateAlarmFilter(self, request, context):
log.info('grpc-request', request=request)
- # TODO dispatching to local instead of passing it to leader
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'CreateAlarmFilter',
- request,
- context)
+ # Since AlarmFilter applies to the entire cluster, it will be assigned
+ # a global id (using a global core_id). Every Voltha instance will
+ # have the same data. Since the voltha instances are managed by
+ # docker swarm mode then whenever an instance goes down it will be
+ # brought up right away, hence reducing the chance of two instances
+ # having different data. In future phases, we should adopt the
+ # strategy of having a unique persistence model for cluster data
+ # compare to instance data
+ try:
+ assert isinstance(request, AlarmFilter)
+ request.id = create_cluster_id()
+ except AssertionError, e:
+ context.set_details(e.message)
+ context.set_code(StatusCode.INVALID_ARGUMENT)
+ returnValue(AlarmFilter())
+
+ response = yield self.dispatcher.dispatch('CreateAlarmFilter',
+ request,
+ context,
+ id=request.id,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Create alarm error')
+ context.set_code(response.error_code)
+ returnValue(AlarmFilter())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetAlarmFilter(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'GetAlarmFilter',
- request,
- context)
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('GetAlarmFilter',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Alarm filter\'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(AlarmFilter())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def UpdateAlarmFilter(self, request, context):
log.info('grpc-request', request=request)
- # TODO dispatching to local instead of passing it to leader
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'UpdateAlarmFilter',
- request,
- context)
+ response = yield self.dispatcher.dispatch('UpdateAlarmFilter',
+ request,
+ context,
+ id=request.id,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Alarm filter\'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(AlarmFilter())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def DeleteAlarmFilter(self, request, context):
log.info('grpc-request', request=request)
- # TODO dispatching to local instead of passing it to leader
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'DeleteAlarmFilter',
- request,
- context)
+ response = yield self.dispatcher.dispatch('DeleteAlarmFilter',
+ request,
+ context,
+ id=request.id,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Alarm filter\'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(Empty())
@twisted_async
+ @inlineCallbacks
def ListAlarmFilters(self, request, context):
- log.warning('temp-limited-implementation')
- # TODO dispatching to local instead of collecting all
- return self.dispatcher.dispatch(
- self.instance_id,
- VolthaLocalServiceStub,
- 'ListAlarmFilters',
- Empty(),
- context)
+ log.info('grpc-request', request=request)
+ response = yield self.dispatcher.dispatch('ListAlarmFilters',
+ Empty(),
+ context,
+ broadcast=True)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Alarm filters error')
+ context.set_code(response.error_code)
+ returnValue(AlarmFilter())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def GetImages(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(request.id)
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return Images()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'GetImages',
- request,
- context)
+ response = yield self.dispatcher.dispatch('GetImages',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Images())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
@twisted_async
+ @inlineCallbacks
def SelfTest(self, request, context):
log.info('grpc-request', request=request)
-
- try:
- instance_id = self.dispatcher.instance_id_by_device_id(
- request.id
- )
- except KeyError:
- context.set_details(
- 'Device \'{}\' not found'.format(request.id))
- context.set_code(StatusCode.NOT_FOUND)
- return SelfTestResponse()
-
- return self.dispatcher.dispatch(
- instance_id,
- VolthaLocalServiceStub,
- 'SelfTest',
- request,
- context)
+ response = yield self.dispatcher.dispatch('SelfTest',
+ request,
+ context,
+ id=request.id)
+ log.info('grpc-response', response=response)
+ if isinstance(response, DispatchError):
+ log.info('grpc-error-response', error=response.error_code)
+ context.set_details('Device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(SelfTestResponse())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)