blob: 66c509962761c937e4808bb9e60c83dabc35a248 [file] [log] [blame]
# Copyright 2020-present Open Networking Foundation
# Original copyright 2020-present ADTRAN, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from robot.api.deco import keyword
from grpc_robot.services.service import is_connected
from grpc_robot.services.service import Service
from voltha_protos import voltha_pb2_grpc, voltha_pb2, common_pb2, openflow_13_pb2
class VolthaService(Service):
prefix = 'voltha_service_'
def __init__(self, ctx):
super().__init__(ctx=ctx, stub=voltha_pb2_grpc.VolthaServiceStub)
# rpc GetMembership(google.protobuf.Empty) returns(Membership) {...};
@keyword
@is_connected
def voltha_service_get_membership(self, **kwargs):
return self._grpc_helper(self.stub.GetMembership, **kwargs)
# rpc UpdateMembership(Membership) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_update_membership(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.UpdateMembership, voltha_pb2.Membership, param_dict, **kwargs)
# rpc GetVoltha(google.protobuf.Empty) returns(Voltha) {...};
@keyword
@is_connected
def voltha_service_get_voltha(self, **kwargs):
return self._grpc_helper(self.stub.GetVoltha, **kwargs)
# rpc ListCoreInstances(google.protobuf.Empty) returns(CoreInstances) {...};
@keyword
@is_connected
def voltha_service_list_core_instances(self, **kwargs):
return self._grpc_helper(self.stub.ListCoreInstances, **kwargs)
# rpc GetCoreInstance(common.ID) returns(CoreInstance) {...};
@keyword
@is_connected
def voltha_service_get_core_instance(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetCoreInstance, common_pb2.ID, param_dict, **kwargs)
# rpc ListAdapters(google.protobuf.Empty) returns(Adapters) {...};
@keyword
@is_connected
def voltha_service_list_adapters(self, **kwargs):
return self._grpc_helper(self.stub.ListAdapters, **kwargs)
# rpc ListLogicalDevices(google.protobuf.Empty) returns(LogicalDevices) {...};
@keyword
@is_connected
def voltha_service_list_logical_devices(self, **kwargs):
return self._grpc_helper(self.stub.ListLogicalDevices, **kwargs)
# rpc GetLogicalDevice(common.ID) returns(LogicalDevice) {...};
@keyword
@is_connected
def voltha_service_get_logical_device(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetLogicalDevice, common_pb2.ID, param_dict, **kwargs)
# rpc ListLogicalDevicePorts(common.ID) returns(LogicalPorts) {...};
@keyword
@is_connected
def voltha_service_list_logical_device_ports(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ListLogicalDevicePorts, common_pb2.ID, param_dict, **kwargs)
# rpc GetLogicalDevicePort(LogicalPortId) returns(LogicalPort) {...};
@keyword
@is_connected
def voltha_service_get_logical_device_port(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetLogicalDevicePort, voltha_pb2.LogicalPortId, param_dict, **kwargs)
# rpc EnableLogicalDevicePort(LogicalPortId) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_enable_logical_device_port(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.EnableLogicalDevicePort, voltha_pb2.LogicalPortId, param_dict, **kwargs)
# rpc DisableLogicalDevicePort(LogicalPortId) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_disable_logical_device_port(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.DisableLogicalDevicePort, voltha_pb2.LogicalPortId, param_dict, **kwargs)
# rpc ListLogicalDeviceFlows(common.ID) returns(openflow_13.Flows) {...};
@keyword
@is_connected
def voltha_service_list_logical_device_flows(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ListLogicalDeviceFlows, common_pb2.ID, param_dict, **kwargs)
# rpc UpdateLogicalDeviceFlowTable(openflow_13.FlowTableUpdate) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_update_logical_device_flow_table(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.UpdateLogicalDeviceFlowTable, openflow_13_pb2.FlowTableUpdate, param_dict, **kwargs)
# rpc UpdateLogicalDeviceMeterTable(openflow_13.MeterModUpdate) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_update_logical_device_meter_table(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.UpdateLogicalDeviceMeterTable, openflow_13_pb2.MeterModUpdate, param_dict, **kwargs)
# rpc ListLogicalDeviceMeters(common.ID) returns (openflow_13.Meters) {...};
@keyword
@is_connected
def voltha_service_list_logical_device_meters(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ListLogicalDeviceMeters, common_pb2.ID, param_dict, **kwargs)
# rpc ListLogicalDeviceFlowGroups(common.ID) returns(openflow_13.FlowGroups) {...};
@keyword
@is_connected
def voltha_service_list_logical_device_flow_groups(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ListLogicalDeviceFlowGroups, common_pb2.ID, param_dict, **kwargs)
# rpc UpdateLogicalDeviceFlowGroupTable(openflow_13.FlowGroupTableUpdate) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_update_logical_device_flow_group_table(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.UpdateLogicalDeviceFlowGroupTable, openflow_13_pb2.FlowGroupTableUpdate, param_dict, **kwargs)
# rpc ListDevices(google.protobuf.Empty) returns(Devices) {...};
@keyword
@is_connected
def voltha_service_list_devices(self, **kwargs):
return self._grpc_helper(self.stub.ListDevices, **kwargs)
# rpc ListDeviceIds(google.protobuf.Empty) returns(common.IDs) {...};
@keyword
@is_connected
def voltha_service_list_device_ids(self, **kwargs):
return self._grpc_helper(self.stub.ListDeviceIds, **kwargs)
# rpc ReconcileDevices(common.IDs) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_reconcile_devices(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ReconcileDevices, common_pb2.IDs, param_dict, **kwargs)
# rpc GetDevice(common.ID) returns(Device) {...};
@keyword
@is_connected
def voltha_service_get_device(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetDevice, common_pb2.ID, param_dict, **kwargs)
# rpc CreateDevice(Device) returns(Device) {...};
@keyword
@is_connected
def voltha_service_create_device(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.CreateDevice, voltha_pb2.Device, param_dict, **kwargs)
# rpc EnableDevice(common.ID) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_enable_device(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.EnableDevice, common_pb2.ID, param_dict, **kwargs)
# rpc DisableDevice(common.ID) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_disable_device(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.DisableDevice, common_pb2.ID, param_dict, **kwargs)
# rpc RebootDevice(common.ID) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_reboot_device(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.RebootDevice, common_pb2.ID, param_dict, **kwargs)
# rpc DeleteDevice(common.ID) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_delete_device(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.DeleteDevice, common_pb2.ID, param_dict, **kwargs)
# rpc ForceDeleteDevice(common.ID) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_force_delete_device(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ForceDeleteDevice, common_pb2.ID, param_dict, **kwargs)
# rpc DownloadImage(ImageDownload) returns(common.OperationResp) {...};
@keyword
@is_connected
def voltha_service_download_image(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.DownloadImage, voltha_pb2.ImageDownload, param_dict, **kwargs)
# rpc GetImageDownloadStatus(ImageDownload) returns(ImageDownload) {...};
@keyword
@is_connected
def voltha_service_get_image_download_status(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetImageDownloadStatus, voltha_pb2.ImageDownload, param_dict, **kwargs)
# rpc GetImageDownload(ImageDownload) returns(ImageDownload) {...};
@keyword
@is_connected
def voltha_service_get_image_download(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetImageDownload, voltha_pb2.ImageDownload, param_dict, **kwargs)
# rpc ListImageDownloads(common.ID) returns(ImageDownloads) {...};
@keyword
@is_connected
def voltha_service_list_image_downloads(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ListImageDownloads, common_pb2.ID, param_dict, **kwargs)
# rpc CancelImageDownload(ImageDownload) returns(common.OperationResp) {...};
@keyword
@is_connected
def voltha_service_cancel_image_download(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.CancelImageDownload, voltha_pb2.ImageDownload, param_dict, **kwargs)
# rpc ActivateImageUpdate(ImageDownload) returns(common.OperationResp) {...};
@keyword
@is_connected
def voltha_service_activate_image_update(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ActivateImageUpdate, voltha_pb2.ImageDownload, param_dict, **kwargs)
# rpc RevertImageUpdate(ImageDownload) returns(common.OperationResp) {...};
@keyword
@is_connected
def voltha_service_revert_image_update(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.RevertImageUpdate, voltha_pb2.ImageDownload, param_dict, **kwargs)
# rpc ListDevicePorts(common.ID) returns(Ports) {...};
@keyword
@is_connected
def voltha_service_list_device_ports(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ListDevicePorts, common_pb2.ID, param_dict, **kwargs)
# rpc ListDevicePmConfigs(common.ID) returns(PmConfigs) {...};
@keyword
@is_connected
def voltha_service_list_device_pm_configs(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ListDevicePmConfigs, common_pb2.ID, param_dict, **kwargs)
# rpc UpdateDevicePmConfigs(voltha.PmConfigs) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_update_device_pm_configs(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.UpdateDevicePmConfigs, voltha_pb2.PmConfigs, param_dict, **kwargs)
# rpc ListDeviceFlows(common.ID) returns(openflow_13.Flows) {...};
@keyword
@is_connected
def voltha_service_list_device_flows(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ListDeviceFlows, common_pb2.ID, param_dict, **kwargs)
# rpc ListDeviceFlowGroups(common.ID) returns(openflow_13.FlowGroups) {...};
@keyword
@is_connected
def voltha_service_list_device_flow_groups(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ListDeviceFlowGroups, common_pb2.ID, param_dict, **kwargs)
# rpc ListDeviceTypes(google.protobuf.Empty) returns(DeviceTypes) {...};
@keyword
@is_connected
def voltha_service_list_device_types(self, **kwargs):
return self._grpc_helper(self.stub.ListDeviceTypes, **kwargs)
# rpc GetDeviceType(common.ID) returns(DeviceType) {...};
@keyword
@is_connected
def voltha_service_get_device_type(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetDeviceType, common_pb2.ID, param_dict, **kwargs)
# rpc ListDeviceGroups(google.protobuf.Empty) returns(DeviceGroups) {...};
@keyword
@is_connected
def voltha_service_list_device_groups(self, **kwargs):
return self._grpc_helper(self.stub.ListDeviceGroups, **kwargs)
# rpc StreamPacketsOut(stream openflow_13.PacketOut) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_stream_packets_out(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.StreamPacketsOut, openflow_13_pb2.PacketOut, param_dict, **kwargs)
# rpc ReceivePacketsIn(google.protobuf.Empty) returns(stream openflow_13.PacketIn) {...};
@keyword
@is_connected
def voltha_service_receive_packets_in(self, **kwargs):
return self._grpc_helper(self.stub.ReceivePacketsIn, **kwargs)
# rpc ReceiveChangeEvents(google.protobuf.Empty) returns(stream openflow_13.ChangeEvent) {...};
@keyword
@is_connected
def voltha_service_receive_change_events(self, **kwargs):
return self._grpc_helper(self.stub.ReceiveChangeEvents, **kwargs)
# rpc GetDeviceGroup(common.ID) returns(DeviceGroup) {...};
@keyword
@is_connected
def voltha_service_get_device_group(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetDeviceGroup, common_pb2.ID, param_dict, **kwargs)
# rpc CreateEventFilter(EventFilter) returns(EventFilter) {...};
@keyword
@is_connected
def voltha_service_create_event_filter(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.CreateEventFilter, voltha_pb2.EventFilter, param_dict, **kwargs)
# rpc GetEventFilter(common.ID) returns(EventFilters) {...};
@keyword
@is_connected
def voltha_service_get_event_filter(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetEventFilter, common_pb2.ID, param_dict, **kwargs)
# rpc UpdateEventFilter(EventFilter) returns(EventFilter) {...};
@keyword
@is_connected
def voltha_service_update_event_filter(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.UpdateEventFilter, voltha_pb2.EventFilter, param_dict, **kwargs)
# rpc DeleteEventFilter(EventFilter) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_delete_event_filter(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.DeleteEventFilter, voltha_pb2.EventFilter, param_dict, **kwargs)
# rpc ListEventFilters(google.protobuf.Empty) returns(EventFilters) {...};
@keyword
@is_connected
def voltha_service_list_event_filters(self, **kwargs):
return self._grpc_helper(self.stub.ListEventFilters, **kwargs)
# rpc GetImages(common.ID) returns(Images) {...};
@keyword
@is_connected
def voltha_service_get_images(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetImages, common_pb2.ID, param_dict, **kwargs)
# rpc SelfTest(common.ID) returns(SelfTestResponse) {...};
@keyword
@is_connected
def voltha_service_self_test(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.SelfTest, common_pb2.ID, param_dict, **kwargs)
# rpc GetMibDeviceData(common.ID) returns(omci.MibDeviceData) {...};
@keyword
@is_connected
def voltha_service_get_mib_device_data(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetMibDeviceData, common_pb2.ID, param_dict, **kwargs)
# rpc GetAlarmDeviceData(common.ID) returns(omci.AlarmDeviceData) {...};
@keyword
@is_connected
def voltha_service_get_alarm_device_data(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetAlarmDeviceData, common_pb2.ID, param_dict, **kwargs)
# rpc SimulateAlarm(SimulateAlarmRequest) returns(common.OperationResp) {...};
@keyword
@is_connected
def voltha_service_simulate_alarm(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.SimulateAlarm, voltha_pb2.SimulateAlarmRequest, param_dict, **kwargs)
# rpc Subscribe (OfAgentSubscriber) returns (OfAgentSubscriber) {...};
@keyword
@is_connected
def voltha_service_subscribe(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.Subscribe, voltha_pb2.OfAgentSubscriber, param_dict, **kwargs)
# rpc EnablePort(voltha.Port) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_enable_port(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.EnablePort, voltha_pb2.Port, param_dict, **kwargs)
# rpc DisablePort(voltha.Port) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_disable_port(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.DisablePort, voltha_pb2.Port, param_dict, **kwargs)
# rpc GetExtValue(common.ValueSpecifier) returns(common.ReturnValues) {...};
@keyword
@is_connected
def voltha_service_get_ext_value(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetExtValue, common_pb2.ValueSpecifier, param_dict, **kwargs)
# rpc SetExtValue(ValueSet) returns(google.protobuf.Empty) {...};
@keyword
@is_connected
def voltha_service_set_ext_value(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.SetExtValue, voltha_pb2.ValueSet, param_dict, **kwargs)
# rpc StartOmciTestAction(OmciTestRequest) returns(TestResponse) {...};
@keyword
@is_connected
def voltha_service_start_omci_test_action(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.StartOmciTestAction, voltha_pb2.OmciTestRequest, param_dict, **kwargs)