blob: b8971883d544a5d4c1cdf9c39346c9b9fab8e75a [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an adapter.
19"""
20import structlog
21from google.protobuf.message import Message
22from twisted.internet.defer import inlineCallbacks, returnValue
23
24from container_proxy import ContainerProxy
25from python.protos.common_pb2 import ID, ConnectStatus, OperStatus
26from python.protos.inter_container_pb2 import StrType, BoolType, IntType, Packet
27from python.protos.device_pb2 import Device, Ports
28from python.protos.voltha_pb2 import CoreInstance
29
30log = structlog.get_logger()
31
32
33def createSubTopic(*args):
34 return '_'.join(args)
35
36class CoreProxy(ContainerProxy):
37
38 def __init__(self, kafka_proxy, core_topic, my_listening_topic):
39 super(CoreProxy, self).__init__(kafka_proxy, core_topic,
40 my_listening_topic)
41
42 @ContainerProxy.wrap_request(CoreInstance)
43 @inlineCallbacks
44 def register(self, adapter, deviceTypes):
45 log.debug("register")
46 try:
47 res = yield self.invoke(rpc="Register",
48 adapter=adapter,
49 deviceTypes=deviceTypes)
50 log.info("registration-returned", res=res)
51 returnValue(res)
52 except Exception as e:
53 log.exception("registration-exception", e=e)
54 raise
55
56 @ContainerProxy.wrap_request(Device)
57 @inlineCallbacks
58 def get_device(self, device_id):
59 log.debug("get-device")
60 id = ID()
61 id.id = device_id
62 # Once we have a device being managed, all communications between the
63 # the adapter and the core occurs over a topic associated with that
64 # device
65 to_topic = createSubTopic(self.core_topic, device_id)
66 reply_topic = createSubTopic(self.listening_topic, device_id)
67 res = yield self.invoke(rpc="GetDevice",
68 to_topic=to_topic,
69 reply_topic=reply_topic,
70 device_id=id)
71 returnValue(res)
72
73 @ContainerProxy.wrap_request(Device)
74 @inlineCallbacks
75 def get_child_device(self, parent_device_id, **kwargs):
76 raise NotImplementedError()
77
78 @ContainerProxy.wrap_request(Ports)
79 @inlineCallbacks
80 def get_ports(self, device_id, port_type):
81 id = ID()
82 id.id = device_id
83 p_type = IntType()
84 p_type.val = port_type
85 to_topic = createSubTopic(self.core_topic, device_id)
86 reply_topic = createSubTopic(self.listening_topic, device_id)
87 res = yield self.invoke(rpc="GetPorts",
88 to_topic=to_topic,
89 reply_topic=reply_topic,
90 device_id=id,
91 port_type=p_type)
92 returnValue(res)
93
94 def get_child_devices(self, parent_device_id):
95 raise NotImplementedError()
96
97 def get_child_device_with_proxy_address(self, proxy_address):
98 raise NotImplementedError()
99
100 def _to_proto(self, **kwargs):
101 encoded = {}
102 for k, v in kwargs.iteritems():
103 if isinstance(v, Message):
104 encoded[k] = v
105 elif type(v) == int:
106 i_proto = IntType()
107 i_proto.val = v
108 encoded[k] = i_proto
109 elif type(v) == str:
110 s_proto = StrType()
111 s_proto.val = v
112 encoded[k] = s_proto
113 elif type(v) == bool:
114 b_proto = BoolType()
115 b_proto.val = v
116 encoded[k] = b_proto
117 return encoded
118
119 @ContainerProxy.wrap_request(None)
120 @inlineCallbacks
121 def child_device_detected(self,
122 parent_device_id,
123 parent_port_no,
124 child_device_type,
125 channel_id,
126 **kw):
127 id = ID()
128 id.id = parent_device_id
129 ppn = IntType()
130 ppn.val = parent_port_no
131 cdt = StrType()
132 cdt.val = child_device_type
133 channel = IntType()
134 channel.val = channel_id
135 to_topic = createSubTopic(self.core_topic, parent_device_id)
136 reply_topic = createSubTopic(self.listening_topic, parent_device_id)
137 args = self._to_proto(**kw)
138 res = yield self.invoke(rpc="ChildDeviceDetected",
139 to_topic=to_topic,
140 reply_topic=reply_topic,
141 parent_device_id=id,
142 parent_port_no=ppn,
143 child_device_type=cdt,
144 channel_id=channel,
145 **args)
146 returnValue(res)
147
148 @ContainerProxy.wrap_request(None)
149 @inlineCallbacks
150 def device_update(self, device):
151 log.debug("device_update")
152 to_topic = createSubTopic(self.core_topic, device.id)
153 reply_topic = createSubTopic(self.listening_topic, device.id)
154 res = yield self.invoke(rpc="DeviceUpdate",
155 to_topic=to_topic,
156 reply_topic=reply_topic,
157 device=device)
158 returnValue(res)
159
160 def child_device_removed(parent_device_id, child_device_id):
161 raise NotImplementedError()
162
163 @ContainerProxy.wrap_request(None)
164 @inlineCallbacks
165 def device_state_update(self, device_id,
166 oper_status=None,
167 connect_status=None):
168 id = ID()
169 id.id = device_id
170 o_status = IntType()
171 if oper_status or oper_status == OperStatus.UNKNOWN:
172 o_status.val = oper_status
173 else:
174 o_status.val = -1
175 c_status = IntType()
176 if connect_status or connect_status == ConnectStatus.UNKNOWN:
177 c_status.val = connect_status
178 else:
179 c_status.val = -1
180
181 to_topic = createSubTopic(self.core_topic, device_id)
182 reply_topic = createSubTopic(self.listening_topic, device_id)
183 res = yield self.invoke(rpc="DeviceStateUpdate",
184 to_topic=to_topic,
185 reply_topic=reply_topic,
186 device_id=id,
187 oper_status=o_status,
188 connect_status=c_status)
189 returnValue(res)
190
191 @ContainerProxy.wrap_request(None)
192 @inlineCallbacks
193 def children_state_update(self, device_id,
194 oper_status=None,
195 connect_status=None):
196 id = ID()
197 id.id = device_id
198 o_status = IntType()
199 if oper_status or oper_status == OperStatus.UNKNOWN:
200 o_status.val = oper_status
201 else:
202 o_status.val = -1
203 c_status = IntType()
204 if connect_status or connect_status == ConnectStatus.UNKNOWN:
205 c_status.val = connect_status
206 else:
207 c_status.val = -1
208
209 to_topic = createSubTopic(self.core_topic, device_id)
210 reply_topic = createSubTopic(self.listening_topic, device_id)
211 res = yield self.invoke(rpc="ChildrenStateUpdate",
212 to_topic=to_topic,
213 reply_topic=reply_topic,
214 device_id=id,
215 oper_status=o_status,
216 connect_status=c_status)
217 returnValue(res)
218
219 @ContainerProxy.wrap_request(None)
220 @inlineCallbacks
221 def port_state_update(self,
222 device_id,
223 port_type,
224 port_no,
225 oper_status):
226 id = ID()
227 id.id = device_id
228 pt = IntType()
229 pt.val = port_type
230 pNo = IntType()
231 pNo.val = port_no
232 o_status = IntType()
233 o_status.val = oper_status
234
235 to_topic = createSubTopic(self.core_topic, device_id)
236 reply_topic = createSubTopic(self.listening_topic, device_id)
237 res = yield self.invoke(rpc="PortStateUpdate",
238 to_topic=to_topic,
239 reply_topic=reply_topic,
240 device_id=id,
241 port_type=pt,
242 port_no=pNo,
243 oper_status=o_status)
244 returnValue(res)
245
246 @ContainerProxy.wrap_request(None)
247 @inlineCallbacks
248 def child_devices_state_update(self, parent_device_id,
249 oper_status=None,
250 connect_status=None):
251
252 id = ID()
253 id.id = parent_device_id
254 o_status = IntType()
255 if oper_status or oper_status == OperStatus.UNKNOWN:
256 o_status.val = oper_status
257 else:
258 o_status.val = -1
259 c_status = IntType()
260 if connect_status or connect_status == ConnectStatus.UNKNOWN:
261 c_status.val = connect_status
262 else:
263 c_status.val = -1
264
265 to_topic = createSubTopic(self.core_topic, parent_device_id)
266 reply_topic = createSubTopic(self.listening_topic, parent_device_id)
267 res = yield self.invoke(rpc="child_devices_state_update",
268 to_topic=to_topic,
269 reply_topic=reply_topic,
270 parent_device_id=id,
271 oper_status=o_status,
272 connect_status=c_status)
273 returnValue(res)
274
275 def child_devices_removed(parent_device_id):
276 raise NotImplementedError()
277
278 @ContainerProxy.wrap_request(None)
279 @inlineCallbacks
280 def device_pm_config_update(self, device_pm_config, init=False):
281 log.debug("device_pm_config_update")
282 b = BoolType()
283 b.val = init
284 to_topic = createSubTopic(self.core_topic, device_pm_config.id)
285 reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
286 res = yield self.invoke(rpc="DevicePMConfigUpdate",
287 to_topic=to_topic,
288 reply_topic=reply_topic,
289 device_pm_config=device_pm_config,
290 init=b)
291 returnValue(res)
292
293 @ContainerProxy.wrap_request(None)
294 @inlineCallbacks
295 def port_created(self, device_id, port):
296 log.debug("port_created")
297 proto_id = ID()
298 proto_id.id = device_id
299 to_topic = createSubTopic(self.core_topic, device_id)
300 reply_topic = createSubTopic(self.listening_topic, device_id)
301 res = yield self.invoke(rpc="PortCreated",
302 to_topic=to_topic,
303 reply_topic=reply_topic,
304 device_id=proto_id,
305 port=port)
306 returnValue(res)
307
308 def port_removed(device_id, port):
309 raise NotImplementedError()
310
311 def ports_enabled(device_id):
312 raise NotImplementedError()
313
314 def ports_disabled(device_id):
315 raise NotImplementedError()
316
317 def ports_oper_status_update(device_id, oper_status):
318 raise NotImplementedError()
319
320 def image_download_update(img_dnld):
321 raise NotImplementedError()
322
323 def image_download_deleted(img_dnld):
324 raise NotImplementedError()
325
326 @ContainerProxy.wrap_request(None)
327 @inlineCallbacks
328 def send_packet_in(self, device_id, port, packet):
329 log.debug("send_packet_in", device_id=device_id)
330 proto_id = ID()
331 proto_id.id = device_id
332 p = IntType()
333 p.val = port
334 pac = Packet()
335 pac.payload = packet
336 to_topic = createSubTopic(self.core_topic, device_id)
337 reply_topic = createSubTopic(self.listening_topic, device_id)
338 res = yield self.invoke(rpc="PacketIn",
339 to_topic=to_topic,
340 reply_topic=reply_topic,
341 device_id=proto_id,
342 port=p,
343 packet=pac)
344 returnValue(res)