blob: a756855fd9824199d6e813104fe8e93ca4ae1c6d [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an adapter.
19"""
Zack Williams84a71e92019-11-15 09:00:19 -070020from __future__ import absolute_import
Chip Boling67b674a2019-02-08 11:42:18 -060021import structlog
serkant.uluderya982a4b62019-03-17 23:29:39 -070022import arrow
Chip Boling67b674a2019-02-08 11:42:18 -060023from google.protobuf.message import Message
24from twisted.internet.defer import inlineCallbacks, returnValue
25
Zack Williams84a71e92019-11-15 09:00:19 -070026from .container_proxy import ContainerProxy
William Kurkianede82e92019-03-05 13:02:57 -050027
28from voltha_protos.common_pb2 import ID, ConnectStatus, OperStatus
29from voltha_protos.inter_container_pb2 import StrType, BoolType, IntType, Packet
Matt Jeanneret29989eb2019-03-07 06:40:12 -050030from voltha_protos.device_pb2 import Device, Ports, Devices
Zack Williams84a71e92019-11-15 09:00:19 -070031from voltha_protos.voltha_pb2 import CoreInstance, EventFilterRuleKey
Devmalya Paul0d3abf02019-07-31 18:34:27 -040032from voltha_protos.events_pb2 import Event
William Kurkian6e643802019-04-02 12:49:59 -040033from voltha_protos.events_pb2 import KpiEvent2, KpiEventType, MetricInformation, MetricMetaData
Zack Williams84a71e92019-11-15 09:00:19 -070034import six
William Kurkian6e643802019-04-02 12:49:59 -040035
Chip Boling67b674a2019-02-08 11:42:18 -060036log = structlog.get_logger()
37
38
39def createSubTopic(*args):
40 return '_'.join(args)
41
42class CoreProxy(ContainerProxy):
43
Devmalya Paul0d3abf02019-07-31 18:34:27 -040044 def __init__(self, kafka_proxy, default_core_topic, default_event_topic, my_listening_topic):
khenaidoo944aee72019-02-28 11:00:24 -050045 super(CoreProxy, self).__init__(kafka_proxy, default_core_topic,
Chip Boling67b674a2019-02-08 11:42:18 -060046 my_listening_topic)
khenaidoo944aee72019-02-28 11:00:24 -050047 self.core_default_topic = default_core_topic
Devmalya Paul0d3abf02019-07-31 18:34:27 -040048 self.event_default_topic = default_event_topic
khenaidoo944aee72019-02-28 11:00:24 -050049 self.deviceId_to_core_map = dict()
50
51 def update_device_core_reference(self, device_id, core_topic):
52 log.debug("update_device_core_reference")
53 self.deviceId_to_core_map[device_id] = core_topic
54
55 def delete_device_core_reference(self, device_id, core_topic):
56 log.debug("delete_device_core_reference")
57 del self.deviceId_to_core_map[device_id]
58
59 def get_adapter_topic(self, **kwargs):
60 return self.listening_topic
61
62 def get_core_topic(self, device_id):
63 if device_id in self.deviceId_to_core_map:
64 return self.deviceId_to_core_map[device_id]
65 return self.core_default_topic
Chip Boling67b674a2019-02-08 11:42:18 -060066
67 @ContainerProxy.wrap_request(CoreInstance)
68 @inlineCallbacks
69 def register(self, adapter, deviceTypes):
70 log.debug("register")
Matteo Scandoloe3c84462020-03-30 15:26:00 -070071
72 if adapter.totalReplicas == 0 and adapter.currentReplica != 0:
73 raise Exception("totalReplicas can't be 0, since you're here you have at least one")
74
75 if adapter.currentReplica == 0 and adapter.totalReplicas != 0:
76 raise Exception("currentReplica can't be 0, it has to start from 1")
77
78 if adapter.currentReplica == 0 and adapter.totalReplicas == 0:
79 # if the adapter is not setting these fields they default to 0,
80 # in that case it means the adapter is not ready to be scaled
81 # and thus it defaults to a single instance
82 adapter.currentReplica = 1
83 adapter.totalReplicas = 1
84
85 if adapter.currentReplica > adapter.totalReplicas:
86 raise Exception("currentReplica (%d) can't be greater than totalReplicas (%d)"
87 % (adapter.currentReplica, adapter.totalReplicas))
88
Chip Boling67b674a2019-02-08 11:42:18 -060089 try:
90 res = yield self.invoke(rpc="Register",
91 adapter=adapter,
92 deviceTypes=deviceTypes)
93 log.info("registration-returned", res=res)
94 returnValue(res)
95 except Exception as e:
96 log.exception("registration-exception", e=e)
97 raise
98
99 @ContainerProxy.wrap_request(Device)
100 @inlineCallbacks
101 def get_device(self, device_id):
102 log.debug("get-device")
103 id = ID()
104 id.id = device_id
105 # Once we have a device being managed, all communications between the
106 # the adapter and the core occurs over a topic associated with that
107 # device
khenaidoo944aee72019-02-28 11:00:24 -0500108 to_topic = self.get_core_topic(device_id)
109 reply_topic = self.get_adapter_topic()
110
111 # to_topic = createSubTopic(self.core_topic, device_id)
112 # reply_topic = createSubTopic(self.listening_topic, device_id)
Chip Boling67b674a2019-02-08 11:42:18 -0600113 res = yield self.invoke(rpc="GetDevice",
114 to_topic=to_topic,
115 reply_topic=reply_topic,
116 device_id=id)
117 returnValue(res)
118
119 @ContainerProxy.wrap_request(Device)
120 @inlineCallbacks
121 def get_child_device(self, parent_device_id, **kwargs):
Matt Jeanneret15ee2cc2019-02-28 11:33:18 -0500122 log.debug("get-child-device")
123 id = ID()
124 id.id = parent_device_id
125 to_topic = self.get_core_topic(parent_device_id)
126 reply_topic = self.get_adapter_topic()
127 args = self._to_proto(**kwargs)
128 res = yield self.invoke(rpc="GetChildDevice",
129 to_topic=to_topic,
130 reply_topic=reply_topic,
131 device_id=id,
132 **args)
133 returnValue(res)
Chip Boling67b674a2019-02-08 11:42:18 -0600134
135 @ContainerProxy.wrap_request(Ports)
136 @inlineCallbacks
137 def get_ports(self, device_id, port_type):
138 id = ID()
139 id.id = device_id
140 p_type = IntType()
141 p_type.val = port_type
khenaidoo944aee72019-02-28 11:00:24 -0500142 to_topic = self.get_core_topic(device_id)
143 reply_topic = self.get_adapter_topic()
144
145 # to_topic = createSubTopic(self.core_topic, device_id)
146 # reply_topic = createSubTopic(self.listening_topic, device_id)
Chip Boling67b674a2019-02-08 11:42:18 -0600147 res = yield self.invoke(rpc="GetPorts",
148 to_topic=to_topic,
149 reply_topic=reply_topic,
150 device_id=id,
151 port_type=p_type)
152 returnValue(res)
153
Matt Jeanneret72469f22019-03-05 21:07:05 -0500154 @ContainerProxy.wrap_request(Devices)
155 @inlineCallbacks
Chip Boling67b674a2019-02-08 11:42:18 -0600156 def get_child_devices(self, parent_device_id):
Matt Jeanneret72469f22019-03-05 21:07:05 -0500157 log.debug("get-child-devices")
158 id = ID()
159 id.id = parent_device_id
160 to_topic = self.get_core_topic(parent_device_id)
161 reply_topic = self.get_adapter_topic()
162 res = yield self.invoke(rpc="GetChildDevices",
163 to_topic=to_topic,
164 reply_topic=reply_topic,
165 device_id=id)
166 returnValue(res)
Chip Boling67b674a2019-02-08 11:42:18 -0600167
Matt Jeanneret72469f22019-03-05 21:07:05 -0500168 @ContainerProxy.wrap_request(Device)
169 @inlineCallbacks
Chip Boling67b674a2019-02-08 11:42:18 -0600170 def get_child_device_with_proxy_address(self, proxy_address):
Matt Jeanneret72469f22019-03-05 21:07:05 -0500171 log.debug("get-child-device-with-proxy-address")
172 id = ID()
173 id.id = proxy_address.device_id
174 to_topic = self.get_core_topic(proxy_address.device_id)
175 reply_topic = self.get_adapter_topic()
176 res = yield self.invoke(rpc="GetChildDeviceWithProxyAddress",
177 to_topic=to_topic,
178 reply_topic=reply_topic,
179 proxy_address=proxy_address)
180 returnValue(res)
Chip Boling67b674a2019-02-08 11:42:18 -0600181
182 def _to_proto(self, **kwargs):
183 encoded = {}
Zack Williams84a71e92019-11-15 09:00:19 -0700184 for k, v in six.iteritems(kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600185 if isinstance(v, Message):
186 encoded[k] = v
187 elif type(v) == int:
188 i_proto = IntType()
189 i_proto.val = v
190 encoded[k] = i_proto
191 elif type(v) == str:
192 s_proto = StrType()
193 s_proto.val = v
194 encoded[k] = s_proto
195 elif type(v) == bool:
196 b_proto = BoolType()
197 b_proto.val = v
198 encoded[k] = b_proto
199 return encoded
200
Matt Jeanneret74976ce2019-07-08 10:51:38 -0400201 @ContainerProxy.wrap_request(Device)
Chip Boling67b674a2019-02-08 11:42:18 -0600202 @inlineCallbacks
203 def child_device_detected(self,
204 parent_device_id,
205 parent_port_no,
206 child_device_type,
207 channel_id,
208 **kw):
209 id = ID()
210 id.id = parent_device_id
211 ppn = IntType()
212 ppn.val = parent_port_no
213 cdt = StrType()
214 cdt.val = child_device_type
215 channel = IntType()
216 channel.val = channel_id
khenaidoo944aee72019-02-28 11:00:24 -0500217 to_topic = self.get_core_topic(parent_device_id)
218 reply_topic = self.get_adapter_topic()
219
220 # to_topic = createSubTopic(self.core_topic, parent_device_id)
221 # reply_topic = createSubTopic(self.listening_topic, parent_device_id)
Chip Boling67b674a2019-02-08 11:42:18 -0600222 args = self._to_proto(**kw)
223 res = yield self.invoke(rpc="ChildDeviceDetected",
224 to_topic=to_topic,
225 reply_topic=reply_topic,
226 parent_device_id=id,
227 parent_port_no=ppn,
228 child_device_type=cdt,
229 channel_id=channel,
230 **args)
231 returnValue(res)
232
233 @ContainerProxy.wrap_request(None)
234 @inlineCallbacks
235 def device_update(self, device):
236 log.debug("device_update")
khenaidoo944aee72019-02-28 11:00:24 -0500237 to_topic = self.get_core_topic(device.id)
238 reply_topic = self.get_adapter_topic()
239
240 # to_topic = createSubTopic(self.core_topic, device.id)
241 # reply_topic = createSubTopic(self.listening_topic, device.id)
Chip Boling67b674a2019-02-08 11:42:18 -0600242 res = yield self.invoke(rpc="DeviceUpdate",
243 to_topic=to_topic,
244 reply_topic=reply_topic,
245 device=device)
246 returnValue(res)
247
248 def child_device_removed(parent_device_id, child_device_id):
249 raise NotImplementedError()
250
251 @ContainerProxy.wrap_request(None)
252 @inlineCallbacks
253 def device_state_update(self, device_id,
254 oper_status=None,
255 connect_status=None):
256 id = ID()
257 id.id = device_id
258 o_status = IntType()
259 if oper_status or oper_status == OperStatus.UNKNOWN:
260 o_status.val = oper_status
261 else:
262 o_status.val = -1
263 c_status = IntType()
264 if connect_status or connect_status == ConnectStatus.UNKNOWN:
265 c_status.val = connect_status
266 else:
267 c_status.val = -1
268
khenaidoo944aee72019-02-28 11:00:24 -0500269 to_topic = self.get_core_topic(device_id)
270 reply_topic = self.get_adapter_topic()
271
272 # to_topic = createSubTopic(self.core_topic, device_id)
273 # reply_topic = createSubTopic(self.listening_topic, device_id)
Chip Boling67b674a2019-02-08 11:42:18 -0600274 res = yield self.invoke(rpc="DeviceStateUpdate",
275 to_topic=to_topic,
276 reply_topic=reply_topic,
277 device_id=id,
278 oper_status=o_status,
279 connect_status=c_status)
280 returnValue(res)
281
282 @ContainerProxy.wrap_request(None)
283 @inlineCallbacks
284 def children_state_update(self, device_id,
285 oper_status=None,
286 connect_status=None):
287 id = ID()
288 id.id = device_id
289 o_status = IntType()
290 if oper_status or oper_status == OperStatus.UNKNOWN:
291 o_status.val = oper_status
292 else:
293 o_status.val = -1
294 c_status = IntType()
295 if connect_status or connect_status == ConnectStatus.UNKNOWN:
296 c_status.val = connect_status
297 else:
298 c_status.val = -1
299
khenaidoo944aee72019-02-28 11:00:24 -0500300 to_topic = self.get_core_topic(device_id)
301 reply_topic = self.get_adapter_topic()
302
303 # to_topic = createSubTopic(self.core_topic, device_id)
304 # reply_topic = createSubTopic(self.listening_topic, device_id)
Chip Boling67b674a2019-02-08 11:42:18 -0600305 res = yield self.invoke(rpc="ChildrenStateUpdate",
306 to_topic=to_topic,
307 reply_topic=reply_topic,
308 device_id=id,
309 oper_status=o_status,
310 connect_status=c_status)
311 returnValue(res)
312
313 @ContainerProxy.wrap_request(None)
314 @inlineCallbacks
315 def port_state_update(self,
316 device_id,
317 port_type,
318 port_no,
319 oper_status):
320 id = ID()
321 id.id = device_id
322 pt = IntType()
323 pt.val = port_type
324 pNo = IntType()
325 pNo.val = port_no
326 o_status = IntType()
327 o_status.val = oper_status
328
khenaidoo944aee72019-02-28 11:00:24 -0500329 to_topic = self.get_core_topic(device_id)
330 reply_topic = self.get_adapter_topic()
331
332 # to_topic = createSubTopic(self.core_topic, device_id)
333 # reply_topic = createSubTopic(self.listening_topic, device_id)
Chip Boling67b674a2019-02-08 11:42:18 -0600334 res = yield self.invoke(rpc="PortStateUpdate",
335 to_topic=to_topic,
336 reply_topic=reply_topic,
337 device_id=id,
338 port_type=pt,
339 port_no=pNo,
340 oper_status=o_status)
341 returnValue(res)
342
343 @ContainerProxy.wrap_request(None)
344 @inlineCallbacks
345 def child_devices_state_update(self, parent_device_id,
346 oper_status=None,
347 connect_status=None):
348
349 id = ID()
350 id.id = parent_device_id
351 o_status = IntType()
352 if oper_status or oper_status == OperStatus.UNKNOWN:
353 o_status.val = oper_status
354 else:
355 o_status.val = -1
356 c_status = IntType()
357 if connect_status or connect_status == ConnectStatus.UNKNOWN:
358 c_status.val = connect_status
359 else:
360 c_status.val = -1
361
khenaidoo944aee72019-02-28 11:00:24 -0500362 to_topic = self.get_core_topic(parent_device_id)
363 reply_topic = self.get_adapter_topic()
364
365 # to_topic = createSubTopic(self.core_topic, parent_device_id)
366 # reply_topic = createSubTopic(self.listening_topic, parent_device_id)
Chip Boling67b674a2019-02-08 11:42:18 -0600367 res = yield self.invoke(rpc="child_devices_state_update",
368 to_topic=to_topic,
369 reply_topic=reply_topic,
370 parent_device_id=id,
371 oper_status=o_status,
372 connect_status=c_status)
373 returnValue(res)
374
375 def child_devices_removed(parent_device_id):
376 raise NotImplementedError()
377
378 @ContainerProxy.wrap_request(None)
379 @inlineCallbacks
380 def device_pm_config_update(self, device_pm_config, init=False):
381 log.debug("device_pm_config_update")
382 b = BoolType()
383 b.val = init
khenaidoo944aee72019-02-28 11:00:24 -0500384 to_topic = self.get_core_topic(device_pm_config.id)
385 reply_topic = self.get_adapter_topic()
386
387 # to_topic = createSubTopic(self.core_topic, device_pm_config.id)
388 # reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
Chip Boling67b674a2019-02-08 11:42:18 -0600389 res = yield self.invoke(rpc="DevicePMConfigUpdate",
390 to_topic=to_topic,
391 reply_topic=reply_topic,
392 device_pm_config=device_pm_config,
393 init=b)
394 returnValue(res)
395
396 @ContainerProxy.wrap_request(None)
397 @inlineCallbacks
398 def port_created(self, device_id, port):
399 log.debug("port_created")
400 proto_id = ID()
401 proto_id.id = device_id
khenaidoo944aee72019-02-28 11:00:24 -0500402 to_topic = self.get_core_topic(device_id)
403 reply_topic = self.get_adapter_topic()
404
405 # to_topic = createSubTopic(self.core_topic, device_id)
406 # reply_topic = createSubTopic(self.listening_topic, device_id)
Chip Boling67b674a2019-02-08 11:42:18 -0600407 res = yield self.invoke(rpc="PortCreated",
408 to_topic=to_topic,
409 reply_topic=reply_topic,
410 device_id=proto_id,
411 port=port)
412 returnValue(res)
413
Matt Jeanneret0094b3c2019-05-03 07:26:36 -0400414 @ContainerProxy.wrap_request(None)
415 @inlineCallbacks
416 def ports_state_update(self,
417 device_id,
Kent Hagerman0c3da2f2020-07-10 16:48:48 -0400418 port_type_filter,
Matt Jeanneret0094b3c2019-05-03 07:26:36 -0400419 oper_status):
420 log.debug("ports_state_update", device_id=device_id, oper_status=oper_status)
421 id = ID()
422 id.id = device_id
Kent Hagerman0c3da2f2020-07-10 16:48:48 -0400423 t_filter = IntType()
424 t_filter.val = port_type_filter
Matt Jeanneret0094b3c2019-05-03 07:26:36 -0400425 o_status = IntType()
426 o_status.val = oper_status
427
428 to_topic = self.get_core_topic(device_id)
429 reply_topic = self.get_adapter_topic()
430
431 # to_topic = createSubTopic(self.core_topic, device_id)
432 # reply_topic = createSubTopic(self.listening_topic, device_id)
433 res = yield self.invoke(rpc="PortsStateUpdate",
434 to_topic=to_topic,
435 reply_topic=reply_topic,
436 device_id=id,
Kent Hagerman0c3da2f2020-07-10 16:48:48 -0400437 port_type_filter=t_filter,
Matt Jeanneret0094b3c2019-05-03 07:26:36 -0400438 oper_status=o_status)
Kent Hagerman0c3da2f2020-07-10 16:48:48 -0400439 log.debug("ports_state_update_response", device_id=device_id, port_type_filter=port_type_filter, oper_status=oper_status, response=res)
Matt Jeanneret0094b3c2019-05-03 07:26:36 -0400440 returnValue(res)
441
Chip Boling67b674a2019-02-08 11:42:18 -0600442 def port_removed(device_id, port):
443 raise NotImplementedError()
444
445 def ports_enabled(device_id):
446 raise NotImplementedError()
447
448 def ports_disabled(device_id):
449 raise NotImplementedError()
450
451 def ports_oper_status_update(device_id, oper_status):
452 raise NotImplementedError()
453
454 def image_download_update(img_dnld):
455 raise NotImplementedError()
456
457 def image_download_deleted(img_dnld):
458 raise NotImplementedError()
459
460 @ContainerProxy.wrap_request(None)
461 @inlineCallbacks
462 def send_packet_in(self, device_id, port, packet):
463 log.debug("send_packet_in", device_id=device_id)
464 proto_id = ID()
465 proto_id.id = device_id
466 p = IntType()
467 p.val = port
468 pac = Packet()
469 pac.payload = packet
khenaidoo944aee72019-02-28 11:00:24 -0500470 to_topic = self.get_core_topic(device_id)
471 reply_topic = self.get_adapter_topic()
472 # to_topic = createSubTopic(self.core_topic, device_id)
473 # reply_topic = createSubTopic(self.listening_topic, device_id)
Chip Boling67b674a2019-02-08 11:42:18 -0600474 res = yield self.invoke(rpc="PacketIn",
475 to_topic=to_topic,
476 reply_topic=reply_topic,
477 device_id=proto_id,
478 port=p,
479 packet=pac)
480 returnValue(res)
Mahir Gunyel729e3cb2019-10-16 16:58:45 -0700481
482 @ContainerProxy.wrap_request(None)
483 @inlineCallbacks
484 def device_reason_update(self, device_id, reason):
485 id = ID()
486 id.id = device_id
487 rsn = StrType()
488 rsn.val = reason
489 to_topic = self.get_core_topic(device_id)
490 reply_topic = self.get_adapter_topic()
491
492 res = yield self.invoke(rpc="DeviceReasonUpdate",
493 to_topic=to_topic,
494 reply_topic=reply_topic,
495 device_id=id,
496 device_reason=rsn)
497
498 returnValue(res)
Zack Williams84a71e92019-11-15 09:00:19 -0700499
Devmalya Paul0d3abf02019-07-31 18:34:27 -0400500 # ~~~~~~~~~~~~~~~~~~~ Handle event submissions ~~~~~~~~~~~~~~~~~~~~~
serkant.uluderya982a4b62019-03-17 23:29:39 -0700501
502 def filter_alarm(self, device_id, alarm_event):
503 '''
504 TODO
505 alarm filtering functionality is not implemented
Zack Williams84a71e92019-11-15 09:00:19 -0700506 in Voltha 1.x
serkant.uluderya982a4b62019-03-17 23:29:39 -0700507 '''
508 log.warn('filter_alarm is not implemented')
Zack Williams84a71e92019-11-15 09:00:19 -0700509 return
serkant.uluderya982a4b62019-03-17 23:29:39 -0700510 #alarm_filters = self.root_proxy.get('/alarm_filters')
Zack Williams84a71e92019-11-15 09:00:19 -0700511
serkant.uluderya982a4b62019-03-17 23:29:39 -0700512 rule_values = {
513 'id': alarm_event.id,
514 'type': AlarmEventType.AlarmEventType.Name(alarm_event.type),
515 'category': AlarmEventCategory.AlarmEventCategory.Name(
516 alarm_event.category),
517 'severity': AlarmEventSeverity.AlarmEventSeverity.Name(
518 alarm_event.severity),
519 'resource_id': alarm_event.resource_id,
520 'device_id': device_id
521 }
522
523 for alarm_filter in alarm_filters:
524 if alarm_filter.rules:
525 exclude = True
526 for rule in alarm_filter.rules:
527 log.debug("compare-alarm-event",
Zack Williams84a71e92019-11-15 09:00:19 -0700528 key=EventFilterRuleKey.EventFilterRuleKey.Name(
serkant.uluderya982a4b62019-03-17 23:29:39 -0700529 rule.key),
530 actual=rule_values[
Zack Williams84a71e92019-11-15 09:00:19 -0700531 EventFilterRuleKey.EventFilterRuleKey.Name(
serkant.uluderya982a4b62019-03-17 23:29:39 -0700532 rule.key)].lower(),
533 expected=rule.value.lower())
534 exclude = exclude and \
535 (rule_values[
Zack Williams84a71e92019-11-15 09:00:19 -0700536 EventFilterRuleKey.EventFilterRuleKey.Name(
serkant.uluderya982a4b62019-03-17 23:29:39 -0700537 rule.key)].lower() == rule.value.lower())
538 if not exclude:
539 break
540
541 if exclude:
542 log.info("filtered-alarm-event", alarm=alarm_event)
543 return True
544
545 return False
546
547 @inlineCallbacks
Devmalya Paul0d3abf02019-07-31 18:34:27 -0400548 def submit_event(self, event_msg):
serkant.uluderya982a4b62019-03-17 23:29:39 -0700549 try:
Devmalya Paul0d3abf02019-07-31 18:34:27 -0400550 assert isinstance(event_msg, Event)
551 res = yield self.kafka_proxy._send_kafka_message(self.event_default_topic, event_msg)
William Kurkian6e643802019-04-02 12:49:59 -0400552 returnValue(res)
553 except Exception as e:
Devmalya Paul0d3abf02019-07-31 18:34:27 -0400554 log.exception('failed-event-submission',
555 type=type(event_msg), e=e)