blob: 66c509962761c937e4808bb9e60c83dabc35a248 [file] [log] [blame]
Martin Cosyns0efdc872021-09-27 16:24:30 +00001# Copyright 2020-present Open Networking Foundation
2# Original copyright 2020-present ADTRAN, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14from robot.api.deco import keyword
15from grpc_robot.services.service import is_connected
16
17from grpc_robot.services.service import Service
18from voltha_protos import voltha_pb2_grpc, voltha_pb2, common_pb2, openflow_13_pb2
19
20
21class VolthaService(Service):
22
23 prefix = 'voltha_service_'
24
25 def __init__(self, ctx):
26 super().__init__(ctx=ctx, stub=voltha_pb2_grpc.VolthaServiceStub)
27
28 # rpc GetMembership(google.protobuf.Empty) returns(Membership) {...};
29 @keyword
30 @is_connected
31 def voltha_service_get_membership(self, **kwargs):
32 return self._grpc_helper(self.stub.GetMembership, **kwargs)
33
34 # rpc UpdateMembership(Membership) returns(google.protobuf.Empty) {...};
35 @keyword
36 @is_connected
37 def voltha_service_update_membership(self, param_dict, **kwargs):
38 return self._grpc_helper(self.stub.UpdateMembership, voltha_pb2.Membership, param_dict, **kwargs)
39
40 # rpc GetVoltha(google.protobuf.Empty) returns(Voltha) {...};
41 @keyword
42 @is_connected
43 def voltha_service_get_voltha(self, **kwargs):
44 return self._grpc_helper(self.stub.GetVoltha, **kwargs)
45
46 # rpc ListCoreInstances(google.protobuf.Empty) returns(CoreInstances) {...};
47 @keyword
48 @is_connected
49 def voltha_service_list_core_instances(self, **kwargs):
50 return self._grpc_helper(self.stub.ListCoreInstances, **kwargs)
51
52 # rpc GetCoreInstance(common.ID) returns(CoreInstance) {...};
53 @keyword
54 @is_connected
55 def voltha_service_get_core_instance(self, param_dict, **kwargs):
56 return self._grpc_helper(self.stub.GetCoreInstance, common_pb2.ID, param_dict, **kwargs)
57
58 # rpc ListAdapters(google.protobuf.Empty) returns(Adapters) {...};
59 @keyword
60 @is_connected
61 def voltha_service_list_adapters(self, **kwargs):
62 return self._grpc_helper(self.stub.ListAdapters, **kwargs)
63
64 # rpc ListLogicalDevices(google.protobuf.Empty) returns(LogicalDevices) {...};
65 @keyword
66 @is_connected
67 def voltha_service_list_logical_devices(self, **kwargs):
68 return self._grpc_helper(self.stub.ListLogicalDevices, **kwargs)
69
70 # rpc GetLogicalDevice(common.ID) returns(LogicalDevice) {...};
71 @keyword
72 @is_connected
73 def voltha_service_get_logical_device(self, param_dict, **kwargs):
74 return self._grpc_helper(self.stub.GetLogicalDevice, common_pb2.ID, param_dict, **kwargs)
75
76 # rpc ListLogicalDevicePorts(common.ID) returns(LogicalPorts) {...};
77 @keyword
78 @is_connected
79 def voltha_service_list_logical_device_ports(self, param_dict, **kwargs):
80 return self._grpc_helper(self.stub.ListLogicalDevicePorts, common_pb2.ID, param_dict, **kwargs)
81
82 # rpc GetLogicalDevicePort(LogicalPortId) returns(LogicalPort) {...};
83 @keyword
84 @is_connected
85 def voltha_service_get_logical_device_port(self, param_dict, **kwargs):
86 return self._grpc_helper(self.stub.GetLogicalDevicePort, voltha_pb2.LogicalPortId, param_dict, **kwargs)
87
88 # rpc EnableLogicalDevicePort(LogicalPortId) returns(google.protobuf.Empty) {...};
89 @keyword
90 @is_connected
91 def voltha_service_enable_logical_device_port(self, param_dict, **kwargs):
92 return self._grpc_helper(self.stub.EnableLogicalDevicePort, voltha_pb2.LogicalPortId, param_dict, **kwargs)
93
94 # rpc DisableLogicalDevicePort(LogicalPortId) returns(google.protobuf.Empty) {...};
95 @keyword
96 @is_connected
97 def voltha_service_disable_logical_device_port(self, param_dict, **kwargs):
98 return self._grpc_helper(self.stub.DisableLogicalDevicePort, voltha_pb2.LogicalPortId, param_dict, **kwargs)
99
100 # rpc ListLogicalDeviceFlows(common.ID) returns(openflow_13.Flows) {...};
101 @keyword
102 @is_connected
103 def voltha_service_list_logical_device_flows(self, param_dict, **kwargs):
104 return self._grpc_helper(self.stub.ListLogicalDeviceFlows, common_pb2.ID, param_dict, **kwargs)
105
106 # rpc UpdateLogicalDeviceFlowTable(openflow_13.FlowTableUpdate) returns(google.protobuf.Empty) {...};
107 @keyword
108 @is_connected
109 def voltha_service_update_logical_device_flow_table(self, param_dict, **kwargs):
110 return self._grpc_helper(self.stub.UpdateLogicalDeviceFlowTable, openflow_13_pb2.FlowTableUpdate, param_dict, **kwargs)
111
112 # rpc UpdateLogicalDeviceMeterTable(openflow_13.MeterModUpdate) returns(google.protobuf.Empty) {...};
113 @keyword
114 @is_connected
115 def voltha_service_update_logical_device_meter_table(self, param_dict, **kwargs):
116 return self._grpc_helper(self.stub.UpdateLogicalDeviceMeterTable, openflow_13_pb2.MeterModUpdate, param_dict, **kwargs)
117
118 # rpc ListLogicalDeviceMeters(common.ID) returns (openflow_13.Meters) {...};
119 @keyword
120 @is_connected
121 def voltha_service_list_logical_device_meters(self, param_dict, **kwargs):
122 return self._grpc_helper(self.stub.ListLogicalDeviceMeters, common_pb2.ID, param_dict, **kwargs)
123
124 # rpc ListLogicalDeviceFlowGroups(common.ID) returns(openflow_13.FlowGroups) {...};
125 @keyword
126 @is_connected
127 def voltha_service_list_logical_device_flow_groups(self, param_dict, **kwargs):
128 return self._grpc_helper(self.stub.ListLogicalDeviceFlowGroups, common_pb2.ID, param_dict, **kwargs)
129
130 # rpc UpdateLogicalDeviceFlowGroupTable(openflow_13.FlowGroupTableUpdate) returns(google.protobuf.Empty) {...};
131 @keyword
132 @is_connected
133 def voltha_service_update_logical_device_flow_group_table(self, param_dict, **kwargs):
134 return self._grpc_helper(self.stub.UpdateLogicalDeviceFlowGroupTable, openflow_13_pb2.FlowGroupTableUpdate, param_dict, **kwargs)
135
136 # rpc ListDevices(google.protobuf.Empty) returns(Devices) {...};
137 @keyword
138 @is_connected
139 def voltha_service_list_devices(self, **kwargs):
140 return self._grpc_helper(self.stub.ListDevices, **kwargs)
141
142 # rpc ListDeviceIds(google.protobuf.Empty) returns(common.IDs) {...};
143 @keyword
144 @is_connected
145 def voltha_service_list_device_ids(self, **kwargs):
146 return self._grpc_helper(self.stub.ListDeviceIds, **kwargs)
147
148 # rpc ReconcileDevices(common.IDs) returns(google.protobuf.Empty) {...};
149 @keyword
150 @is_connected
151 def voltha_service_reconcile_devices(self, param_dict, **kwargs):
152 return self._grpc_helper(self.stub.ReconcileDevices, common_pb2.IDs, param_dict, **kwargs)
153
154 # rpc GetDevice(common.ID) returns(Device) {...};
155 @keyword
156 @is_connected
157 def voltha_service_get_device(self, param_dict, **kwargs):
158 return self._grpc_helper(self.stub.GetDevice, common_pb2.ID, param_dict, **kwargs)
159
160 # rpc CreateDevice(Device) returns(Device) {...};
161 @keyword
162 @is_connected
163 def voltha_service_create_device(self, param_dict, **kwargs):
164 return self._grpc_helper(self.stub.CreateDevice, voltha_pb2.Device, param_dict, **kwargs)
165
166 # rpc EnableDevice(common.ID) returns(google.protobuf.Empty) {...};
167 @keyword
168 @is_connected
169 def voltha_service_enable_device(self, param_dict, **kwargs):
170 return self._grpc_helper(self.stub.EnableDevice, common_pb2.ID, param_dict, **kwargs)
171
172 # rpc DisableDevice(common.ID) returns(google.protobuf.Empty) {...};
173 @keyword
174 @is_connected
175 def voltha_service_disable_device(self, param_dict, **kwargs):
176 return self._grpc_helper(self.stub.DisableDevice, common_pb2.ID, param_dict, **kwargs)
177
178 # rpc RebootDevice(common.ID) returns(google.protobuf.Empty) {...};
179 @keyword
180 @is_connected
181 def voltha_service_reboot_device(self, param_dict, **kwargs):
182 return self._grpc_helper(self.stub.RebootDevice, common_pb2.ID, param_dict, **kwargs)
183
184 # rpc DeleteDevice(common.ID) returns(google.protobuf.Empty) {...};
185 @keyword
186 @is_connected
187 def voltha_service_delete_device(self, param_dict, **kwargs):
188 return self._grpc_helper(self.stub.DeleteDevice, common_pb2.ID, param_dict, **kwargs)
189
190 # rpc ForceDeleteDevice(common.ID) returns(google.protobuf.Empty) {...};
191 @keyword
192 @is_connected
193 def voltha_service_force_delete_device(self, param_dict, **kwargs):
194 return self._grpc_helper(self.stub.ForceDeleteDevice, common_pb2.ID, param_dict, **kwargs)
195
196 # rpc DownloadImage(ImageDownload) returns(common.OperationResp) {...};
197 @keyword
198 @is_connected
199 def voltha_service_download_image(self, param_dict, **kwargs):
200 return self._grpc_helper(self.stub.DownloadImage, voltha_pb2.ImageDownload, param_dict, **kwargs)
201
202 # rpc GetImageDownloadStatus(ImageDownload) returns(ImageDownload) {...};
203 @keyword
204 @is_connected
205 def voltha_service_get_image_download_status(self, param_dict, **kwargs):
206 return self._grpc_helper(self.stub.GetImageDownloadStatus, voltha_pb2.ImageDownload, param_dict, **kwargs)
207
208 # rpc GetImageDownload(ImageDownload) returns(ImageDownload) {...};
209 @keyword
210 @is_connected
211 def voltha_service_get_image_download(self, param_dict, **kwargs):
212 return self._grpc_helper(self.stub.GetImageDownload, voltha_pb2.ImageDownload, param_dict, **kwargs)
213
214 # rpc ListImageDownloads(common.ID) returns(ImageDownloads) {...};
215 @keyword
216 @is_connected
217 def voltha_service_list_image_downloads(self, param_dict, **kwargs):
218 return self._grpc_helper(self.stub.ListImageDownloads, common_pb2.ID, param_dict, **kwargs)
219
220 # rpc CancelImageDownload(ImageDownload) returns(common.OperationResp) {...};
221 @keyword
222 @is_connected
223 def voltha_service_cancel_image_download(self, param_dict, **kwargs):
224 return self._grpc_helper(self.stub.CancelImageDownload, voltha_pb2.ImageDownload, param_dict, **kwargs)
225
226 # rpc ActivateImageUpdate(ImageDownload) returns(common.OperationResp) {...};
227 @keyword
228 @is_connected
229 def voltha_service_activate_image_update(self, param_dict, **kwargs):
230 return self._grpc_helper(self.stub.ActivateImageUpdate, voltha_pb2.ImageDownload, param_dict, **kwargs)
231
232 # rpc RevertImageUpdate(ImageDownload) returns(common.OperationResp) {...};
233 @keyword
234 @is_connected
235 def voltha_service_revert_image_update(self, param_dict, **kwargs):
236 return self._grpc_helper(self.stub.RevertImageUpdate, voltha_pb2.ImageDownload, param_dict, **kwargs)
237
238 # rpc ListDevicePorts(common.ID) returns(Ports) {...};
239 @keyword
240 @is_connected
241 def voltha_service_list_device_ports(self, param_dict, **kwargs):
242 return self._grpc_helper(self.stub.ListDevicePorts, common_pb2.ID, param_dict, **kwargs)
243
244 # rpc ListDevicePmConfigs(common.ID) returns(PmConfigs) {...};
245 @keyword
246 @is_connected
247 def voltha_service_list_device_pm_configs(self, param_dict, **kwargs):
248 return self._grpc_helper(self.stub.ListDevicePmConfigs, common_pb2.ID, param_dict, **kwargs)
249
250 # rpc UpdateDevicePmConfigs(voltha.PmConfigs) returns(google.protobuf.Empty) {...};
251 @keyword
252 @is_connected
253 def voltha_service_update_device_pm_configs(self, param_dict, **kwargs):
254 return self._grpc_helper(self.stub.UpdateDevicePmConfigs, voltha_pb2.PmConfigs, param_dict, **kwargs)
255
256 # rpc ListDeviceFlows(common.ID) returns(openflow_13.Flows) {...};
257 @keyword
258 @is_connected
259 def voltha_service_list_device_flows(self, param_dict, **kwargs):
260 return self._grpc_helper(self.stub.ListDeviceFlows, common_pb2.ID, param_dict, **kwargs)
261
262 # rpc ListDeviceFlowGroups(common.ID) returns(openflow_13.FlowGroups) {...};
263 @keyword
264 @is_connected
265 def voltha_service_list_device_flow_groups(self, param_dict, **kwargs):
266 return self._grpc_helper(self.stub.ListDeviceFlowGroups, common_pb2.ID, param_dict, **kwargs)
267
268 # rpc ListDeviceTypes(google.protobuf.Empty) returns(DeviceTypes) {...};
269 @keyword
270 @is_connected
271 def voltha_service_list_device_types(self, **kwargs):
272 return self._grpc_helper(self.stub.ListDeviceTypes, **kwargs)
273
274 # rpc GetDeviceType(common.ID) returns(DeviceType) {...};
275 @keyword
276 @is_connected
277 def voltha_service_get_device_type(self, param_dict, **kwargs):
278 return self._grpc_helper(self.stub.GetDeviceType, common_pb2.ID, param_dict, **kwargs)
279
280 # rpc ListDeviceGroups(google.protobuf.Empty) returns(DeviceGroups) {...};
281 @keyword
282 @is_connected
283 def voltha_service_list_device_groups(self, **kwargs):
284 return self._grpc_helper(self.stub.ListDeviceGroups, **kwargs)
285
286 # rpc StreamPacketsOut(stream openflow_13.PacketOut) returns(google.protobuf.Empty) {...};
287 @keyword
288 @is_connected
289 def voltha_service_stream_packets_out(self, param_dict, **kwargs):
290 return self._grpc_helper(self.stub.StreamPacketsOut, openflow_13_pb2.PacketOut, param_dict, **kwargs)
291
292 # rpc ReceivePacketsIn(google.protobuf.Empty) returns(stream openflow_13.PacketIn) {...};
293 @keyword
294 @is_connected
295 def voltha_service_receive_packets_in(self, **kwargs):
296 return self._grpc_helper(self.stub.ReceivePacketsIn, **kwargs)
297
298 # rpc ReceiveChangeEvents(google.protobuf.Empty) returns(stream openflow_13.ChangeEvent) {...};
299 @keyword
300 @is_connected
301 def voltha_service_receive_change_events(self, **kwargs):
302 return self._grpc_helper(self.stub.ReceiveChangeEvents, **kwargs)
303
304 # rpc GetDeviceGroup(common.ID) returns(DeviceGroup) {...};
305 @keyword
306 @is_connected
307 def voltha_service_get_device_group(self, param_dict, **kwargs):
308 return self._grpc_helper(self.stub.GetDeviceGroup, common_pb2.ID, param_dict, **kwargs)
309
310 # rpc CreateEventFilter(EventFilter) returns(EventFilter) {...};
311 @keyword
312 @is_connected
313 def voltha_service_create_event_filter(self, param_dict, **kwargs):
314 return self._grpc_helper(self.stub.CreateEventFilter, voltha_pb2.EventFilter, param_dict, **kwargs)
315
316 # rpc GetEventFilter(common.ID) returns(EventFilters) {...};
317 @keyword
318 @is_connected
319 def voltha_service_get_event_filter(self, param_dict, **kwargs):
320 return self._grpc_helper(self.stub.GetEventFilter, common_pb2.ID, param_dict, **kwargs)
321
322 # rpc UpdateEventFilter(EventFilter) returns(EventFilter) {...};
323 @keyword
324 @is_connected
325 def voltha_service_update_event_filter(self, param_dict, **kwargs):
326 return self._grpc_helper(self.stub.UpdateEventFilter, voltha_pb2.EventFilter, param_dict, **kwargs)
327
328 # rpc DeleteEventFilter(EventFilter) returns(google.protobuf.Empty) {...};
329 @keyword
330 @is_connected
331 def voltha_service_delete_event_filter(self, param_dict, **kwargs):
332 return self._grpc_helper(self.stub.DeleteEventFilter, voltha_pb2.EventFilter, param_dict, **kwargs)
333
334 # rpc ListEventFilters(google.protobuf.Empty) returns(EventFilters) {...};
335 @keyword
336 @is_connected
337 def voltha_service_list_event_filters(self, **kwargs):
338 return self._grpc_helper(self.stub.ListEventFilters, **kwargs)
339
340 # rpc GetImages(common.ID) returns(Images) {...};
341 @keyword
342 @is_connected
343 def voltha_service_get_images(self, param_dict, **kwargs):
344 return self._grpc_helper(self.stub.GetImages, common_pb2.ID, param_dict, **kwargs)
345
346 # rpc SelfTest(common.ID) returns(SelfTestResponse) {...};
347 @keyword
348 @is_connected
349 def voltha_service_self_test(self, param_dict, **kwargs):
350 return self._grpc_helper(self.stub.SelfTest, common_pb2.ID, param_dict, **kwargs)
351
352 # rpc GetMibDeviceData(common.ID) returns(omci.MibDeviceData) {...};
353 @keyword
354 @is_connected
355 def voltha_service_get_mib_device_data(self, param_dict, **kwargs):
356 return self._grpc_helper(self.stub.GetMibDeviceData, common_pb2.ID, param_dict, **kwargs)
357
358 # rpc GetAlarmDeviceData(common.ID) returns(omci.AlarmDeviceData) {...};
359 @keyword
360 @is_connected
361 def voltha_service_get_alarm_device_data(self, param_dict, **kwargs):
362 return self._grpc_helper(self.stub.GetAlarmDeviceData, common_pb2.ID, param_dict, **kwargs)
363
364 # rpc SimulateAlarm(SimulateAlarmRequest) returns(common.OperationResp) {...};
365 @keyword
366 @is_connected
367 def voltha_service_simulate_alarm(self, param_dict, **kwargs):
368 return self._grpc_helper(self.stub.SimulateAlarm, voltha_pb2.SimulateAlarmRequest, param_dict, **kwargs)
369
370 # rpc Subscribe (OfAgentSubscriber) returns (OfAgentSubscriber) {...};
371 @keyword
372 @is_connected
373 def voltha_service_subscribe(self, param_dict, **kwargs):
374 return self._grpc_helper(self.stub.Subscribe, voltha_pb2.OfAgentSubscriber, param_dict, **kwargs)
375
376 # rpc EnablePort(voltha.Port) returns(google.protobuf.Empty) {...};
377 @keyword
378 @is_connected
379 def voltha_service_enable_port(self, param_dict, **kwargs):
380 return self._grpc_helper(self.stub.EnablePort, voltha_pb2.Port, param_dict, **kwargs)
381
382 # rpc DisablePort(voltha.Port) returns(google.protobuf.Empty) {...};
383 @keyword
384 @is_connected
385 def voltha_service_disable_port(self, param_dict, **kwargs):
386 return self._grpc_helper(self.stub.DisablePort, voltha_pb2.Port, param_dict, **kwargs)
387
388 # rpc GetExtValue(common.ValueSpecifier) returns(common.ReturnValues) {...};
389 @keyword
390 @is_connected
391 def voltha_service_get_ext_value(self, param_dict, **kwargs):
392 return self._grpc_helper(self.stub.GetExtValue, common_pb2.ValueSpecifier, param_dict, **kwargs)
393
394 # rpc SetExtValue(ValueSet) returns(google.protobuf.Empty) {...};
395 @keyword
396 @is_connected
397 def voltha_service_set_ext_value(self, param_dict, **kwargs):
398 return self._grpc_helper(self.stub.SetExtValue, voltha_pb2.ValueSet, param_dict, **kwargs)
399
400 # rpc StartOmciTestAction(OmciTestRequest) returns(TestResponse) {...};
401 @keyword
402 @is_connected
403 def voltha_service_start_omci_test_action(self, param_dict, **kwargs):
404 return self._grpc_helper(self.stub.StartOmciTestAction, voltha_pb2.OmciTestRequest, param_dict, **kwargs)