blob: 2b6a26c897a953e6502ef81133f7fbb5c6a6e7e8 [file] [log] [blame]
# Copyright 2020-present Open Networking Foundation
# Original copyright 2020-present ADTRAN, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from robot.api.deco import keyword
from grpc_robot.services.service import is_connected
from grpc_robot.services.service import Service
from voltha_protos import openolt_pb2_grpc, openolt_pb2, tech_profile_pb2, ext_config_pb2
class Openolt(Service):
prefix = 'open_olt_'
def __init__(self, ctx):
super().__init__(ctx=ctx, stub=openolt_pb2_grpc.OpenoltStub)
# rpc DisableOlt(Empty) returns (Empty) {...};
@keyword
@is_connected
def open_olt_disable_olt(self, **kwargs):
return self._grpc_helper(self.stub.DisableOlt, **kwargs)
# rpc ReenableOlt(Empty) returns (Empty) {...};
@keyword
@is_connected
def open_olt_reenable_olt(self, **kwargs):
return self._grpc_helper(self.stub.ReenableOlt, **kwargs)
# rpc ActivateOnu(Onu) returns (Empty) {...};
@keyword
@is_connected
def open_olt_activate_onu(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.ActivateOnu, openolt_pb2.Onu, param_dict, **kwargs)
# rpc DeactivateOnu(Onu) returns (Empty) {...};
@keyword
@is_connected
def open_olt_deactivate_onu(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.DeactivateOnu, openolt_pb2.Onu, param_dict, **kwargs)
# rpc DeleteOnu(Onu) returns (Empty) {...};
@keyword
@is_connected
def open_olt_delete_onu(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.DeleteOnu, openolt_pb2.Onu, param_dict, **kwargs)
# rpc OmciMsgOut(OmciMsg) returns (Empty) {...};
@keyword
@is_connected
def open_olt_omci_msg_out(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.OmciMsgOut, openolt_pb2.OmciMsg, param_dict, **kwargs)
# rpc OnuPacketOut(OnuPacket) returns (Empty) {...};
@keyword
@is_connected
def open_olt_onu_packet_out(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.OnuPacketOut, openolt_pb2.OnuPacket, param_dict, **kwargs)
# rpc UplinkPacketOut(UplinkPacket) returns (Empty) {...};
@keyword
@is_connected
def open_olt_uplink_packet_out(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.UplinkPacketOut, openolt_pb2.UplinkPacket, param_dict, **kwargs)
# rpc FlowAdd(Flow) returns (Empty) {...};
@keyword
@is_connected
def open_olt_flow_add(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.FlowAdd, openolt_pb2.Flow, param_dict, **kwargs)
# rpc FlowRemove(Flow) returns (Empty) {...};
@keyword
@is_connected
def open_olt_flow_remove(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.FlowRemove, openolt_pb2.Flow, param_dict, **kwargs)
# rpc HeartbeatCheck(Empty) returns (Heartbeat) {...};
@keyword
@is_connected
def open_olt_heartbeat_check(self, **kwargs):
return self._grpc_helper(self.stub.HeartbeatCheck, **kwargs)
# rpc EnablePonIf(Interface) returns (Empty) {...};
@keyword
@is_connected
def open_olt_enable_pon_if(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.EnablePonIf, openolt_pb2.Interface, param_dict, **kwargs)
# rpc DisablePonIf(Interface) returns (Empty) {...};
@keyword
@is_connected
def open_olt_disable_pon_if(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.DisablePonIf, openolt_pb2.Interface, param_dict, **kwargs)
# rpc GetDeviceInfo(Empty) returns (DeviceInfo) {...};
@keyword
@is_connected
def open_olt_get_device_info(self, **kwargs):
return self._grpc_helper(self.stub.GetDeviceInfo, **kwargs)
# rpc Reboot(Empty) returns (Empty) {...};
@keyword
@is_connected
def open_olt_reboot(self, **kwargs):
return self._grpc_helper(self.stub.Reboot, **kwargs)
# rpc CollectStatistics(Empty) returns (Empty) {...};
@keyword
@is_connected
def open_olt_collect_statistics(self, **kwargs):
return self._grpc_helper(self.stub.CollectStatistics, **kwargs)
# rpc GetOnuStatistics(Onu) returns (OnuStatistics) {...};
@keyword
@is_connected
def open_olt_get_onu_statistics(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetOnuStatistics, openolt_pb2.Onu, param_dict, **kwargs)
# rpc GetGemPortStatistics(OnuPacket) returns (GemPortStatistics) {...};
@keyword
@is_connected
def open_olt_get_gem_port_statistics(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetGemPortStatistics, openolt_pb2.OnuPacket, param_dict, **kwargs)
# rpc CreateTrafficSchedulers(tech_profile.TrafficSchedulers) returns (Empty) {...};
@keyword
@is_connected
def open_olt_create_traffic_schedulers(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.CreateTrafficSchedulers, tech_profile_pb2.TrafficSchedulers, param_dict, **kwargs)
# rpc RemoveTrafficSchedulers(tech_profile.TrafficSchedulers) returns (Empty) {...};
@keyword
@is_connected
def open_olt_remove_traffic_schedulers(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.RemoveTrafficSchedulers, tech_profile_pb2.TrafficSchedulers, param_dict, **kwargs)
# rpc CreateTrafficQueues(tech_profile.TrafficQueues) returns (Empty) {...};
@keyword
@is_connected
def open_olt_create_traffic_queues(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.CreateTrafficQueues, tech_profile_pb2.TrafficQueues, param_dict, **kwargs)
# rpc RemoveTrafficQueues(tech_profile.TrafficQueues) returns (Empty) {...};
@keyword
@is_connected
def open_olt_remove_traffic_queues(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.RemoveTrafficQueues, tech_profile_pb2.TrafficQueues, param_dict, **kwargs)
# rpc EnableIndication(Empty) returns (stream Indication) {...};
@keyword
@is_connected
def open_olt_enable_indication(self, **kwargs):
return self._grpc_helper(self.stub.EnableIndication, **kwargs)
# rpc PerformGroupOperation(Group) returns (Empty) {...};
@keyword
@is_connected
def open_olt_perform_group_operation(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.PerformGroupOperation, openolt_pb2.Group, param_dict, **kwargs)
# rpc DeleteGroup(Group) returns (Empty) {...};
@keyword
@is_connected
def open_olt_delete_group(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.DeleteGroup, openolt_pb2.Group, param_dict, **kwargs)
# rpc GetExtValue(ValueParam) returns (common.ReturnValues) {...};
@keyword
@is_connected
def open_olt_get_ext_value(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetExtValue, openolt_pb2.ValueParam, param_dict, **kwargs)
# rpc OnuItuPonAlarmSet(config.OnuItuPonAlarm) returns (Empty) {...};
@keyword
@is_connected
def open_olt_onu_itu_pon_alarm_set(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.OnuItuPonAlarmSet, ext_config_pb2.OnuItuPonAlarm, param_dict, **kwargs)
# rpc GetLogicalOnuDistanceZero(Onu) returns (OnuLogicalDistance) {...};
@keyword
@is_connected
def open_olt_get_logical_onu_distance_zero(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetLogicalOnuDistanceZero, openolt_pb2.Onu, param_dict, **kwargs)
# rpc GetLogicalOnuDistance(Onu) returns (OnuLogicalDistance) {...};
@keyword
@is_connected
def open_olt_get_logical_onu_distance(self, param_dict, **kwargs):
return self._grpc_helper(self.stub.GetLogicalOnuDistanceF, openolt_pb2.Onu, param_dict, **kwargs)