blob: 512262f1271baa736c62a8fd44f2bb9dc4eb23e5 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an individual adapter.
19"""
20from uuid import uuid4
21
22import arrow
23import structlog
24from google.protobuf.json_format import MessageToJson
25from scapy.packet import Packet
26from twisted.internet.defer import inlineCallbacks, returnValue
27from twisted.python import failure
28from zope.interface import implementer
29
30from adapters.common.event_bus import EventBusClient
31from adapters.common.frameio.frameio import hexify
32from adapters.common.utils.id_generation import create_cluster_logical_device_ids
33from adapters.interface import IAdapterInterface
34from adapters.protos import third_party
khenaidoo92e62c52018-10-03 14:02:54 -040035from adapters.protos.device_pb2 import Device, Port, Ports, PmConfigs
khenaidoob9203542018-09-17 22:56:37 -040036from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
37 AlarmEventSeverity, AlarmEventState, AlarmEventCategory
38from adapters.protos.events_pb2 import KpiEvent
39from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
khenaidoo92e62c52018-10-03 14:02:54 -040040 LogicalPort, AlarmFilterRuleKey, CoreInstance
khenaidoob9203542018-09-17 22:56:37 -040041from adapters.common.utils.registry import registry, IComponent
42from adapters.common.utils.id_generation import create_cluster_device_id
43import re
44from adapters.interface import ICoreSouthBoundInterface
45from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
khenaidoo92e62c52018-10-03 14:02:54 -040046from adapters.protos.common_pb2 import ID, ConnectStatus, OperStatus
khenaidoob9203542018-09-17 22:56:37 -040047from google.protobuf.message import Message
48from adapters.common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
49
50log = structlog.get_logger()
51
52class KafkaMessagingError(BaseException):
53 def __init__(self, error):
54 self.error = error
55
56def wrap_request(return_cls):
57 def real_wrapper(func):
58 @inlineCallbacks
59 def wrapper(*args, **kw):
60 try:
61 (success, d) = yield func(*args, **kw)
62 if success:
63 log.debug("successful-response", func=func, val=d)
64 if return_cls is not None:
65 rc = return_cls()
66 if d is not None:
67 d.Unpack(rc)
68 returnValue(rc)
69 else:
70 log.debug("successful-response-none", func=func,
71 val=None)
72 returnValue(None)
73 else:
74 log.warn("unsuccessful-request", func=func, args=args, kw=kw)
75 returnValue(d)
76 except Exception as e:
77 log.exception("request-wrapper-exception", func=func, e=e)
78 raise
79 return wrapper
80 return real_wrapper
81
82
83@implementer(IComponent, ICoreSouthBoundInterface)
84class CoreProxy(object):
85
86 def __init__(self, kafka_proxy, core_topic, my_listening_topic):
87 self.kafka_proxy = kafka_proxy
88 self.listening_topic = my_listening_topic
89 self.core_topic = core_topic
90 self.default_timeout = 3
91
92 def start(self):
93 log.info('started')
94
95 return self
96
97 def stop(self):
98 log.info('stopped')
99
100 @inlineCallbacks
101 def invoke(self, rpc, to_topic=None, **kwargs):
102 @inlineCallbacks
103 def _send_request(rpc, m_callback,to_topic, **kwargs):
104 try:
105 log.debug("sending-request", rpc=rpc)
106 if to_topic is None:
107 to_topic = self.core_topic
108 result = yield self.kafka_proxy.send_request(rpc=rpc,
109 to_topic=to_topic,
110 reply_topic=self.listening_topic,
111 callback=None,
112 **kwargs)
113 if not m_callback.called:
114 m_callback.callback(result)
115 else:
116 log.debug('timeout-already-occurred', rpc=rpc)
117 except Exception as e:
118 log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
119 if not m_callback.called:
120 m_callback.errback(failure.Failure())
121
122 log.debug('invoke-request', rpc=rpc)
123 cb = DeferredWithTimeout(timeout=self.default_timeout)
124 _send_request(rpc, cb, to_topic, **kwargs)
125 try:
126 res = yield cb
127 returnValue(res)
128 except TimeOutError as e:
129 log.warn('invoke-timeout', e=e)
130 raise e
131
132
133 @wrap_request(CoreInstance)
134 @inlineCallbacks
135 def register(self, adapter):
136 log.debug("register")
137 try:
138 res = yield self.invoke(rpc="Register", adapter=adapter)
139 log.info("registration-returned", res=res)
140 returnValue(res)
141 except Exception as e:
142 log.exception("registration-exception", e=e)
143 raise
144
145 @wrap_request(Device)
146 @inlineCallbacks
147 def get_device(self, device_id):
148 log.debug("get-device")
149 id = ID()
150 id.id = device_id
151 res = yield self.invoke(rpc="GetDevice", device_id=id)
152 returnValue(res)
153
154 @wrap_request(Device)
155 @inlineCallbacks
156 def get_child_device(self, parent_device_id, **kwargs):
157 raise NotImplementedError()
158
159 # def add_device(self, device):
160 # raise NotImplementedError()
161
khenaidoo92e62c52018-10-03 14:02:54 -0400162 @wrap_request(Ports)
163 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400164 def get_ports(self, device_id, port_type):
khenaidoo92e62c52018-10-03 14:02:54 -0400165 id = ID()
166 id.id = device_id
167 p_type = IntType()
168 p_type.val = port_type
169 res = yield self.invoke(rpc="GetPorts",
170 device_id=id,
171 port_type=p_type)
172 returnValue(res)
khenaidoob9203542018-09-17 22:56:37 -0400173
174 def get_child_devices(self, parent_device_id):
175 raise NotImplementedError()
176
177 def get_child_device_with_proxy_address(self, proxy_address):
178 raise NotImplementedError()
179
180 def _to_proto(self, **kwargs):
181 encoded = {}
182 for k,v in kwargs.iteritems():
183 if isinstance(v, Message):
184 encoded[k] = v
185 elif type(v) == int:
186 i_proto = IntType()
187 i_proto.val = v
188 encoded[k] = i_proto
189 elif type(v) == str:
190 s_proto = StrType()
191 s_proto.val = v
192 encoded[k] = s_proto
193 elif type(v) == bool:
194 b_proto = BoolType()
195 b_proto.val = v
196 encoded[k] = b_proto
197 return encoded
198
199
200 @wrap_request(None)
201 @inlineCallbacks
202 def child_device_detected(self,
203 parent_device_id,
204 parent_port_no,
205 child_device_type,
206 channel_id,
207 **kw):
208 id = ID()
209 id.id = parent_device_id
210 ppn = IntType()
211 ppn.val = parent_port_no
212 cdt = StrType()
213 cdt.val = child_device_type
214 channel = IntType()
215 channel.val = channel_id
216
217 args = self._to_proto(**kw)
218 res = yield self.invoke(rpc="ChildDeviceDetected",
219 parent_device_id=id,
220 parent_port_no = ppn,
221 child_device_type= cdt,
222 channel_id=channel,
223 **args)
224 returnValue(res)
225
226
227 @wrap_request(None)
228 @inlineCallbacks
229 def device_update(self, device):
230 log.debug("device_update")
231 res = yield self.invoke(rpc="DeviceUpdate", device=device)
232 returnValue(res)
233
234 def child_device_removed(parent_device_id, child_device_id):
235 raise NotImplementedError()
236
237
238 @wrap_request(None)
239 @inlineCallbacks
240 def device_state_update(self, device_id,
241 oper_status=None,
242 connect_status=None):
khenaidoob9203542018-09-17 22:56:37 -0400243 id = ID()
244 id.id = device_id
245 o_status = IntType()
khenaidoo92e62c52018-10-03 14:02:54 -0400246 if oper_status or oper_status==OperStatus.UNKNOWN:
khenaidoob9203542018-09-17 22:56:37 -0400247 o_status.val = oper_status
248 else:
249 o_status.val = -1
250 c_status = IntType()
khenaidoo92e62c52018-10-03 14:02:54 -0400251 if connect_status or connect_status==ConnectStatus.UNKNOWN:
khenaidoob9203542018-09-17 22:56:37 -0400252 c_status.val = connect_status
253 else:
254 c_status.val = -1
khenaidoob9203542018-09-17 22:56:37 -0400255
256 res = yield self.invoke(rpc="DeviceStateUpdate",
257 device_id=id,
258 oper_status=o_status,
259 connect_status=c_status)
260 returnValue(res)
261
khenaidoo4d4802d2018-10-04 21:59:49 -0400262
263 @wrap_request(None)
264 @inlineCallbacks
265 def children_state_update(self, device_id,
266 oper_status=None,
267 connect_status=None):
268 id = ID()
269 id.id = device_id
270 o_status = IntType()
271 if oper_status or oper_status==OperStatus.UNKNOWN:
272 o_status.val = oper_status
273 else:
274 o_status.val = -1
275 c_status = IntType()
276 if connect_status or connect_status==ConnectStatus.UNKNOWN:
277 c_status.val = connect_status
278 else:
279 c_status.val = -1
280
281 res = yield self.invoke(rpc="ChildrenStateUpdate",
282 device_id=id,
283 oper_status=o_status,
284 connect_status=c_status)
285 returnValue(res)
286
khenaidoob9203542018-09-17 22:56:37 -0400287 @wrap_request(None)
288 @inlineCallbacks
khenaidoo92e62c52018-10-03 14:02:54 -0400289 def port_state_update(self,
290 device_id,
291 port_type,
292 port_no,
293 oper_status):
294 id = ID()
295 id.id = device_id
296 pt = IntType()
297 pt.val = port_type
298 pNo = IntType()
299 pNo.val = port_no
300 o_status = IntType()
301 o_status.val = oper_status
302
303 res = yield self.invoke(rpc="PortStateUpdate",
304 device_id=id,
305 port_type=pt,
306 port_no=pNo,
307 oper_status=o_status)
308 returnValue(res)
309
310
311
312 @wrap_request(None)
313 @inlineCallbacks
khenaidoob9203542018-09-17 22:56:37 -0400314 def child_devices_state_update(self, parent_device_id,
315 oper_status=None,
khenaidoo92e62c52018-10-03 14:02:54 -0400316 connect_status=None):
khenaidoob9203542018-09-17 22:56:37 -0400317
318 id = ID()
319 id.id = parent_device_id
320 o_status = IntType()
khenaidoo92e62c52018-10-03 14:02:54 -0400321 if oper_status or oper_status==OperStatus.UNKNOWN:
khenaidoob9203542018-09-17 22:56:37 -0400322 o_status.val = oper_status
323 else:
324 o_status.val = -1
325 c_status = IntType()
khenaidoo92e62c52018-10-03 14:02:54 -0400326 if connect_status or connect_status==ConnectStatus.UNKNOWN:
khenaidoob9203542018-09-17 22:56:37 -0400327 c_status.val = connect_status
328 else:
329 c_status.val = -1
khenaidoob9203542018-09-17 22:56:37 -0400330
331 res = yield self.invoke(rpc="child_devices_state_update",
332 parent_device_id=id,
333 oper_status=o_status,
khenaidoo92e62c52018-10-03 14:02:54 -0400334 connect_status=c_status)
khenaidoob9203542018-09-17 22:56:37 -0400335 returnValue(res)
336
337
338 def child_devices_removed(parent_device_id):
339 raise NotImplementedError()
340
341
342 @wrap_request(None)
343 @inlineCallbacks
344 def device_pm_config_update(self, device_pm_config, init=False):
345 log.debug("device_pm_config_update")
346 b = BoolType()
347 b.val = init
348 res = yield self.invoke(rpc="DevicePMConfigUpdate",
349 device_pm_config=device_pm_config, init=b)
350 returnValue(res)
351
352 @wrap_request(None)
353 @inlineCallbacks
354 def port_created(self, device_id, port):
355 log.debug("port_created")
356 proto_id = ID()
357 proto_id.id = device_id
358 res = yield self.invoke(rpc="PortCreated", device_id=proto_id, port=port)
359 returnValue(res)
360
361
362 def port_removed(device_id, port):
363 raise NotImplementedError()
364
365 def ports_enabled(device_id):
366 raise NotImplementedError()
367
368 def ports_disabled(device_id):
369 raise NotImplementedError()
370
371 def ports_oper_status_update(device_id, oper_status):
372 raise NotImplementedError()
373
374 def image_download_update(img_dnld):
375 raise NotImplementedError()
376
377 def image_download_deleted(img_dnld):
378 raise NotImplementedError()
379
380 def packet_in(device_id, egress_port_no, packet):
381 raise NotImplementedError()