blob: 2b6a26c897a953e6502ef81133f7fbb5c6a6e7e8 [file] [log] [blame]
Martin Cosyns0efdc872021-09-27 16:24:30 +00001# Copyright 2020-present Open Networking Foundation
2# Original copyright 2020-present ADTRAN, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14from robot.api.deco import keyword
15from grpc_robot.services.service import is_connected
16
17from grpc_robot.services.service import Service
18from voltha_protos import openolt_pb2_grpc, openolt_pb2, tech_profile_pb2, ext_config_pb2
19
20
21class Openolt(Service):
22
23 prefix = 'open_olt_'
24
25 def __init__(self, ctx):
26 super().__init__(ctx=ctx, stub=openolt_pb2_grpc.OpenoltStub)
27
28 # rpc DisableOlt(Empty) returns (Empty) {...};
29 @keyword
30 @is_connected
31 def open_olt_disable_olt(self, **kwargs):
32 return self._grpc_helper(self.stub.DisableOlt, **kwargs)
33
34 # rpc ReenableOlt(Empty) returns (Empty) {...};
35 @keyword
36 @is_connected
37 def open_olt_reenable_olt(self, **kwargs):
38 return self._grpc_helper(self.stub.ReenableOlt, **kwargs)
39
40 # rpc ActivateOnu(Onu) returns (Empty) {...};
41 @keyword
42 @is_connected
43 def open_olt_activate_onu(self, param_dict, **kwargs):
44 return self._grpc_helper(self.stub.ActivateOnu, openolt_pb2.Onu, param_dict, **kwargs)
45
46 # rpc DeactivateOnu(Onu) returns (Empty) {...};
47 @keyword
48 @is_connected
49 def open_olt_deactivate_onu(self, param_dict, **kwargs):
50 return self._grpc_helper(self.stub.DeactivateOnu, openolt_pb2.Onu, param_dict, **kwargs)
51
52 # rpc DeleteOnu(Onu) returns (Empty) {...};
53 @keyword
54 @is_connected
55 def open_olt_delete_onu(self, param_dict, **kwargs):
56 return self._grpc_helper(self.stub.DeleteOnu, openolt_pb2.Onu, param_dict, **kwargs)
57
58 # rpc OmciMsgOut(OmciMsg) returns (Empty) {...};
59 @keyword
60 @is_connected
61 def open_olt_omci_msg_out(self, param_dict, **kwargs):
62 return self._grpc_helper(self.stub.OmciMsgOut, openolt_pb2.OmciMsg, param_dict, **kwargs)
63
64 # rpc OnuPacketOut(OnuPacket) returns (Empty) {...};
65 @keyword
66 @is_connected
67 def open_olt_onu_packet_out(self, param_dict, **kwargs):
68 return self._grpc_helper(self.stub.OnuPacketOut, openolt_pb2.OnuPacket, param_dict, **kwargs)
69
70 # rpc UplinkPacketOut(UplinkPacket) returns (Empty) {...};
71 @keyword
72 @is_connected
73 def open_olt_uplink_packet_out(self, param_dict, **kwargs):
74 return self._grpc_helper(self.stub.UplinkPacketOut, openolt_pb2.UplinkPacket, param_dict, **kwargs)
75
76 # rpc FlowAdd(Flow) returns (Empty) {...};
77 @keyword
78 @is_connected
79 def open_olt_flow_add(self, param_dict, **kwargs):
80 return self._grpc_helper(self.stub.FlowAdd, openolt_pb2.Flow, param_dict, **kwargs)
81
82 # rpc FlowRemove(Flow) returns (Empty) {...};
83 @keyword
84 @is_connected
85 def open_olt_flow_remove(self, param_dict, **kwargs):
86 return self._grpc_helper(self.stub.FlowRemove, openolt_pb2.Flow, param_dict, **kwargs)
87
88 # rpc HeartbeatCheck(Empty) returns (Heartbeat) {...};
89 @keyword
90 @is_connected
91 def open_olt_heartbeat_check(self, **kwargs):
92 return self._grpc_helper(self.stub.HeartbeatCheck, **kwargs)
93
94 # rpc EnablePonIf(Interface) returns (Empty) {...};
95 @keyword
96 @is_connected
97 def open_olt_enable_pon_if(self, param_dict, **kwargs):
98 return self._grpc_helper(self.stub.EnablePonIf, openolt_pb2.Interface, param_dict, **kwargs)
99
100 # rpc DisablePonIf(Interface) returns (Empty) {...};
101 @keyword
102 @is_connected
103 def open_olt_disable_pon_if(self, param_dict, **kwargs):
104 return self._grpc_helper(self.stub.DisablePonIf, openolt_pb2.Interface, param_dict, **kwargs)
105
106 # rpc GetDeviceInfo(Empty) returns (DeviceInfo) {...};
107 @keyword
108 @is_connected
109 def open_olt_get_device_info(self, **kwargs):
110 return self._grpc_helper(self.stub.GetDeviceInfo, **kwargs)
111
112 # rpc Reboot(Empty) returns (Empty) {...};
113 @keyword
114 @is_connected
115 def open_olt_reboot(self, **kwargs):
116 return self._grpc_helper(self.stub.Reboot, **kwargs)
117
118 # rpc CollectStatistics(Empty) returns (Empty) {...};
119 @keyword
120 @is_connected
121 def open_olt_collect_statistics(self, **kwargs):
122 return self._grpc_helper(self.stub.CollectStatistics, **kwargs)
123
124 # rpc GetOnuStatistics(Onu) returns (OnuStatistics) {...};
125 @keyword
126 @is_connected
127 def open_olt_get_onu_statistics(self, param_dict, **kwargs):
128 return self._grpc_helper(self.stub.GetOnuStatistics, openolt_pb2.Onu, param_dict, **kwargs)
129
130 # rpc GetGemPortStatistics(OnuPacket) returns (GemPortStatistics) {...};
131 @keyword
132 @is_connected
133 def open_olt_get_gem_port_statistics(self, param_dict, **kwargs):
134 return self._grpc_helper(self.stub.GetGemPortStatistics, openolt_pb2.OnuPacket, param_dict, **kwargs)
135
136 # rpc CreateTrafficSchedulers(tech_profile.TrafficSchedulers) returns (Empty) {...};
137 @keyword
138 @is_connected
139 def open_olt_create_traffic_schedulers(self, param_dict, **kwargs):
140 return self._grpc_helper(self.stub.CreateTrafficSchedulers, tech_profile_pb2.TrafficSchedulers, param_dict, **kwargs)
141
142 # rpc RemoveTrafficSchedulers(tech_profile.TrafficSchedulers) returns (Empty) {...};
143 @keyword
144 @is_connected
145 def open_olt_remove_traffic_schedulers(self, param_dict, **kwargs):
146 return self._grpc_helper(self.stub.RemoveTrafficSchedulers, tech_profile_pb2.TrafficSchedulers, param_dict, **kwargs)
147
148 # rpc CreateTrafficQueues(tech_profile.TrafficQueues) returns (Empty) {...};
149 @keyword
150 @is_connected
151 def open_olt_create_traffic_queues(self, param_dict, **kwargs):
152 return self._grpc_helper(self.stub.CreateTrafficQueues, tech_profile_pb2.TrafficQueues, param_dict, **kwargs)
153
154 # rpc RemoveTrafficQueues(tech_profile.TrafficQueues) returns (Empty) {...};
155 @keyword
156 @is_connected
157 def open_olt_remove_traffic_queues(self, param_dict, **kwargs):
158 return self._grpc_helper(self.stub.RemoveTrafficQueues, tech_profile_pb2.TrafficQueues, param_dict, **kwargs)
159
160 # rpc EnableIndication(Empty) returns (stream Indication) {...};
161 @keyword
162 @is_connected
163 def open_olt_enable_indication(self, **kwargs):
164 return self._grpc_helper(self.stub.EnableIndication, **kwargs)
165
166 # rpc PerformGroupOperation(Group) returns (Empty) {...};
167 @keyword
168 @is_connected
169 def open_olt_perform_group_operation(self, param_dict, **kwargs):
170 return self._grpc_helper(self.stub.PerformGroupOperation, openolt_pb2.Group, param_dict, **kwargs)
171
172 # rpc DeleteGroup(Group) returns (Empty) {...};
173 @keyword
174 @is_connected
175 def open_olt_delete_group(self, param_dict, **kwargs):
176 return self._grpc_helper(self.stub.DeleteGroup, openolt_pb2.Group, param_dict, **kwargs)
177
178 # rpc GetExtValue(ValueParam) returns (common.ReturnValues) {...};
179 @keyword
180 @is_connected
181 def open_olt_get_ext_value(self, param_dict, **kwargs):
182 return self._grpc_helper(self.stub.GetExtValue, openolt_pb2.ValueParam, param_dict, **kwargs)
183
184 # rpc OnuItuPonAlarmSet(config.OnuItuPonAlarm) returns (Empty) {...};
185 @keyword
186 @is_connected
187 def open_olt_onu_itu_pon_alarm_set(self, param_dict, **kwargs):
188 return self._grpc_helper(self.stub.OnuItuPonAlarmSet, ext_config_pb2.OnuItuPonAlarm, param_dict, **kwargs)
189
190 # rpc GetLogicalOnuDistanceZero(Onu) returns (OnuLogicalDistance) {...};
191 @keyword
192 @is_connected
193 def open_olt_get_logical_onu_distance_zero(self, param_dict, **kwargs):
194 return self._grpc_helper(self.stub.GetLogicalOnuDistanceZero, openolt_pb2.Onu, param_dict, **kwargs)
195
196 # rpc GetLogicalOnuDistance(Onu) returns (OnuLogicalDistance) {...};
197 @keyword
198 @is_connected
199 def open_olt_get_logical_onu_distance(self, param_dict, **kwargs):
200 return self._grpc_helper(self.stub.GetLogicalOnuDistanceF, openolt_pb2.Onu, param_dict, **kwargs)