[VOL-1034, VOL-1035, VOL-1037] This commit consists of:
1) Implementation of inter-adapter communication using flows
as proxy message between an ONU and its parent OLT.
2) Update the protos to reflect the inter-adapter message structure
3) Cleanup the ponsim adapters to removed unsued references and
general cleanup.
Change-Id: Ibe913a80a96d601fed946d9b9db55bb8d4f2c15a
diff --git a/adapters/kafka/adapter_proxy.py b/adapters/kafka/adapter_proxy.py
new file mode 100644
index 0000000..2d4831a
--- /dev/null
+++ b/adapters/kafka/adapter_proxy.py
@@ -0,0 +1,110 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Agent to play gateway between adapters.
+"""
+
+import structlog
+from uuid import uuid4
+from twisted.internet.defer import inlineCallbacks, returnValue
+from adapters.kafka.container_proxy import ContainerProxy
+from adapters.protos import third_party
+from adapters.protos.core_adapter_pb2 import InterAdapterHeader, \
+ InterAdapterMessage
+import time
+
+_ = third_party
+log = structlog.get_logger()
+
+
+class AdapterProxy(ContainerProxy):
+
+ def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+ super(AdapterProxy, self).__init__(kafka_proxy,
+ core_topic,
+ my_listening_topic)
+
+ def _to_string(self, unicode_str):
+ if unicode_str is not None:
+ if type(unicode_str) == unicode:
+ return unicode_str.encode('ascii', 'ignore')
+ else:
+ return unicode_str
+ else:
+ return ""
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def send_inter_adapter_message(self,
+ msg,
+ type,
+ from_adapter,
+ to_adapter,
+ to_device_id=None,
+ proxy_device_id=None,
+ message_no=None):
+ """
+ Sends a message directly to an adapter. This is typically used to send
+ proxied messages from one adapter to another. An initial ACK response
+ is sent back to the invoking adapter. If there is subsequent response
+ to be sent back (async) then the adapter receiving this request will
+ use this same API to send back the async response.
+ :param msg : GRPC message to send
+ :param type : InterAdapterMessageType of the message to send
+ :param from_adapter: Name of the adapter making the request.
+ :param to_adapter: Name of the remote adapter.
+ :param to_device_id: The ID of the device for to the message is
+ intended. if it's None then the message is not intended to a specific
+ device. Its interpretation is adapter specific.
+ :param proxy_device_id: The ID of the device which will proxy that
+ message. If it's None then there is no specific device to proxy the
+ message. Its interpretation is adapter specific.
+ :param message_no: A unique number for this transaction that the
+ adapter may use to correlate a request and an async response.
+ """
+
+ try:
+ # validate params
+ assert msg
+ assert from_adapter
+ assert to_adapter
+
+ # Build the inter adapter message
+ h = InterAdapterHeader()
+ h.type = type
+ h.from_topic = self._to_string(from_adapter)
+ h.to_topic = self._to_string(to_adapter)
+ h.to_device_id = self._to_string(to_device_id)
+ h.proxy_device_id = self._to_string(proxy_device_id)
+
+ if message_no:
+ h.id = self._to_string(message_no)
+ else:
+ h.id = uuid4().hex
+
+ h.timestamp = int(round(time.time() * 1000))
+ iaMsg = InterAdapterMessage()
+ iaMsg.header.CopyFrom(h)
+ iaMsg.body.Pack(msg)
+
+ log.debug("sending-inter-adapter-message", header=iaMsg.header)
+ res = yield self.invoke(rpc="process_inter_adapter_message",
+ to_topic=iaMsg.header.to_topic,
+ msg=iaMsg)
+ returnValue(res)
+ except Exception as e:
+ log.exception("error-sending-request", e=e)
diff --git a/adapters/kafka/adapter_request_facade.py b/adapters/kafka/adapter_request_facade.py
index 3009206..67f7869 100644
--- a/adapters/kafka/adapter_request_facade.py
+++ b/adapters/kafka/adapter_request_facade.py
@@ -15,35 +15,18 @@
#
"""
-Agent to play gateway between CORE and an individual adapter.
+This facade handles kafka-formatted messages from the Core, extracts the kafka
+formatting and forwards the request to the concrete handler.
"""
-from uuid import uuid4
-import arrow
-import structlog
-from google.protobuf.json_format import MessageToJson
-from scapy.packet import Packet
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks
from zope.interface import implementer
-from adapters.common.event_bus import EventBusClient
-from adapters.common.frameio.frameio import hexify
-from adapters.common.utils.id_generation import create_cluster_logical_device_ids
from adapters.interface import IAdapterInterface
+from adapters.protos.core_adapter_pb2 import IntType, InterAdapterMessage
from adapters.protos.device_pb2 import Device
-
-from adapters.protos import third_party
-from adapters.protos.device_pb2 import Device, Port, PmConfigs
-from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
- AlarmEventSeverity, AlarmEventState, AlarmEventCategory
-from adapters.protos.events_pb2 import KpiEvent
-from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
- LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey
-from adapters.common.utils.registry import registry
-from adapters.common.utils.id_generation import create_cluster_device_id
-from adapters.protos.core_adapter_pb2 import IntType
-from adapters.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, FlowGroupChanges
-import re
+from adapters.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
+ FlowGroupChanges
class MacAddressError(BaseException):
@@ -107,7 +90,6 @@
return True, self.adapter.get_ofp_port_info(d, p.val)
-
def reconcile_device(self, device):
return self.adapter.reconcile_device(device)
@@ -207,3 +189,11 @@
def unsuppress_alarm(self, filter):
return self.adapter.unsuppress_alarm(filter)
+ def process_inter_adapter_message(self, msg):
+ m = InterAdapterMessage()
+ if msg:
+ msg.Unpack(m)
+ else:
+ return (False, m)
+
+ return (True, self.adapter.process_inter_adapter_message(m))
diff --git a/adapters/kafka/container_proxy.py b/adapters/kafka/container_proxy.py
new file mode 100644
index 0000000..79918cd
--- /dev/null
+++ b/adapters/kafka/container_proxy.py
@@ -0,0 +1,114 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+The superclass for all kafka proxy subclasses.
+"""
+
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python import failure
+from zope.interface import implementer
+
+from adapters.common.utils.deferred_utils import DeferredWithTimeout, \
+ TimeOutError
+from adapters.common.utils.registry import IComponent
+
+log = structlog.get_logger()
+
+
+class KafkaMessagingError(BaseException):
+ def __init__(self, error):
+ self.error = error
+
+
+@implementer(IComponent)
+class ContainerProxy(object):
+
+ def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+ self.kafka_proxy = kafka_proxy
+ self.listening_topic = my_listening_topic
+ self.core_topic = core_topic
+ self.default_timeout = 3
+
+ def start(self):
+ log.info('started')
+
+ return self
+
+ def stop(self):
+ log.info('stopped')
+
+ @classmethod
+ def wrap_request(cls, return_cls):
+ def real_wrapper(func):
+ @inlineCallbacks
+ def wrapper(*args, **kw):
+ try:
+ (success, d) = yield func(*args, **kw)
+ if success:
+ log.debug("successful-response", func=func, val=d)
+ if return_cls is not None:
+ rc = return_cls()
+ if d is not None:
+ d.Unpack(rc)
+ returnValue(rc)
+ else:
+ log.debug("successful-response-none", func=func,
+ val=None)
+ returnValue(None)
+ else:
+ log.warn("unsuccessful-request", func=func, args=args,
+ kw=kw)
+ returnValue(d)
+ except Exception as e:
+ log.exception("request-wrapper-exception", func=func, e=e)
+ raise
+
+ return wrapper
+
+ return real_wrapper
+
+ @inlineCallbacks
+ def invoke(self, rpc, to_topic=None, **kwargs):
+ @inlineCallbacks
+ def _send_request(rpc, m_callback, to_topic, **kwargs):
+ try:
+ log.debug("sending-request", rpc=rpc)
+ if to_topic is None:
+ to_topic = self.core_topic
+ result = yield self.kafka_proxy.send_request(rpc=rpc,
+ to_topic=to_topic,
+ reply_topic=self.listening_topic,
+ callback=None,
+ **kwargs)
+ if not m_callback.called:
+ m_callback.callback(result)
+ else:
+ log.debug('timeout-already-occurred', rpc=rpc)
+ except Exception as e:
+ log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
+ if not m_callback.called:
+ m_callback.errback(failure.Failure())
+
+ cb = DeferredWithTimeout(timeout=self.default_timeout)
+ _send_request(rpc, cb, to_topic, **kwargs)
+ try:
+ res = yield cb
+ returnValue(res)
+ except TimeOutError as e:
+ log.warn('invoke-timeout', e=e)
+ raise e
diff --git a/adapters/kafka/core_proxy.py b/adapters/kafka/core_proxy.py
index 512262f..36459ed 100644
--- a/adapters/kafka/core_proxy.py
+++ b/adapters/kafka/core_proxy.py
@@ -1,5 +1,5 @@
#
-# Copyright 2017 the original author or authors.
+# Copyright 2018 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,122 +15,28 @@
#
"""
-Agent to play gateway between CORE and an individual adapter.
+Agent to play gateway between CORE and an adapter.
"""
-from uuid import uuid4
-
-import arrow
import structlog
-from google.protobuf.json_format import MessageToJson
-from scapy.packet import Packet
-from twisted.internet.defer import inlineCallbacks, returnValue
-from twisted.python import failure
-from zope.interface import implementer
-
-from adapters.common.event_bus import EventBusClient
-from adapters.common.frameio.frameio import hexify
-from adapters.common.utils.id_generation import create_cluster_logical_device_ids
-from adapters.interface import IAdapterInterface
-from adapters.protos import third_party
-from adapters.protos.device_pb2 import Device, Port, Ports, PmConfigs
-from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
- AlarmEventSeverity, AlarmEventState, AlarmEventCategory
-from adapters.protos.events_pb2 import KpiEvent
-from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
- LogicalPort, AlarmFilterRuleKey, CoreInstance
-from adapters.common.utils.registry import registry, IComponent
-from adapters.common.utils.id_generation import create_cluster_device_id
-import re
-from adapters.interface import ICoreSouthBoundInterface
-from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
-from adapters.protos.common_pb2 import ID, ConnectStatus, OperStatus
from google.protobuf.message import Message
-from adapters.common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from adapters.kafka.container_proxy import ContainerProxy
+from adapters.protos.common_pb2 import ID, ConnectStatus, OperStatus
+from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
+from adapters.protos.device_pb2 import Device, Ports
+from adapters.protos.voltha_pb2 import CoreInstance
log = structlog.get_logger()
-class KafkaMessagingError(BaseException):
- def __init__(self, error):
- self.error = error
-def wrap_request(return_cls):
- def real_wrapper(func):
- @inlineCallbacks
- def wrapper(*args, **kw):
- try:
- (success, d) = yield func(*args, **kw)
- if success:
- log.debug("successful-response", func=func, val=d)
- if return_cls is not None:
- rc = return_cls()
- if d is not None:
- d.Unpack(rc)
- returnValue(rc)
- else:
- log.debug("successful-response-none", func=func,
- val=None)
- returnValue(None)
- else:
- log.warn("unsuccessful-request", func=func, args=args, kw=kw)
- returnValue(d)
- except Exception as e:
- log.exception("request-wrapper-exception", func=func, e=e)
- raise
- return wrapper
- return real_wrapper
-
-
-@implementer(IComponent, ICoreSouthBoundInterface)
-class CoreProxy(object):
+class CoreProxy(ContainerProxy):
def __init__(self, kafka_proxy, core_topic, my_listening_topic):
- self.kafka_proxy = kafka_proxy
- self.listening_topic = my_listening_topic
- self.core_topic = core_topic
- self.default_timeout = 3
+ super(CoreProxy, self).__init__(kafka_proxy, core_topic,
+ my_listening_topic)
- def start(self):
- log.info('started')
-
- return self
-
- def stop(self):
- log.info('stopped')
-
- @inlineCallbacks
- def invoke(self, rpc, to_topic=None, **kwargs):
- @inlineCallbacks
- def _send_request(rpc, m_callback,to_topic, **kwargs):
- try:
- log.debug("sending-request", rpc=rpc)
- if to_topic is None:
- to_topic = self.core_topic
- result = yield self.kafka_proxy.send_request(rpc=rpc,
- to_topic=to_topic,
- reply_topic=self.listening_topic,
- callback=None,
- **kwargs)
- if not m_callback.called:
- m_callback.callback(result)
- else:
- log.debug('timeout-already-occurred', rpc=rpc)
- except Exception as e:
- log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
- if not m_callback.called:
- m_callback.errback(failure.Failure())
-
- log.debug('invoke-request', rpc=rpc)
- cb = DeferredWithTimeout(timeout=self.default_timeout)
- _send_request(rpc, cb, to_topic, **kwargs)
- try:
- res = yield cb
- returnValue(res)
- except TimeOutError as e:
- log.warn('invoke-timeout', e=e)
- raise e
-
-
- @wrap_request(CoreInstance)
+ @ContainerProxy.wrap_request(CoreInstance)
@inlineCallbacks
def register(self, adapter):
log.debug("register")
@@ -142,7 +48,7 @@
log.exception("registration-exception", e=e)
raise
- @wrap_request(Device)
+ @ContainerProxy.wrap_request(Device)
@inlineCallbacks
def get_device(self, device_id):
log.debug("get-device")
@@ -151,15 +57,12 @@
res = yield self.invoke(rpc="GetDevice", device_id=id)
returnValue(res)
- @wrap_request(Device)
+ @ContainerProxy.wrap_request(Device)
@inlineCallbacks
def get_child_device(self, parent_device_id, **kwargs):
raise NotImplementedError()
- # def add_device(self, device):
- # raise NotImplementedError()
-
- @wrap_request(Ports)
+ @ContainerProxy.wrap_request(Ports)
@inlineCallbacks
def get_ports(self, device_id, port_type):
id = ID()
@@ -179,7 +82,7 @@
def _to_proto(self, **kwargs):
encoded = {}
- for k,v in kwargs.iteritems():
+ for k, v in kwargs.iteritems():
if isinstance(v, Message):
encoded[k] = v
elif type(v) == int:
@@ -196,8 +99,7 @@
encoded[k] = b_proto
return encoded
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def child_device_detected(self,
parent_device_id,
@@ -217,14 +119,13 @@
args = self._to_proto(**kw)
res = yield self.invoke(rpc="ChildDeviceDetected",
parent_device_id=id,
- parent_port_no = ppn,
- child_device_type= cdt,
+ parent_port_no=ppn,
+ child_device_type=cdt,
channel_id=channel,
**args)
returnValue(res)
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def device_update(self, device):
log.debug("device_update")
@@ -234,21 +135,20 @@
def child_device_removed(parent_device_id, child_device_id):
raise NotImplementedError()
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def device_state_update(self, device_id,
- oper_status=None,
- connect_status=None):
+ oper_status=None,
+ connect_status=None):
id = ID()
id.id = device_id
o_status = IntType()
- if oper_status or oper_status==OperStatus.UNKNOWN:
+ if oper_status or oper_status == OperStatus.UNKNOWN:
o_status.val = oper_status
else:
o_status.val = -1
c_status = IntType()
- if connect_status or connect_status==ConnectStatus.UNKNOWN:
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
c_status.val = connect_status
else:
c_status.val = -1
@@ -259,21 +159,20 @@
connect_status=c_status)
returnValue(res)
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def children_state_update(self, device_id,
- oper_status=None,
- connect_status=None):
+ oper_status=None,
+ connect_status=None):
id = ID()
id.id = device_id
o_status = IntType()
- if oper_status or oper_status==OperStatus.UNKNOWN:
+ if oper_status or oper_status == OperStatus.UNKNOWN:
o_status.val = oper_status
else:
o_status.val = -1
c_status = IntType()
- if connect_status or connect_status==ConnectStatus.UNKNOWN:
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
c_status.val = connect_status
else:
c_status.val = -1
@@ -284,7 +183,7 @@
connect_status=c_status)
returnValue(res)
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def port_state_update(self,
device_id,
@@ -307,9 +206,7 @@
oper_status=o_status)
returnValue(res)
-
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def child_devices_state_update(self, parent_device_id,
oper_status=None,
@@ -318,12 +215,12 @@
id = ID()
id.id = parent_device_id
o_status = IntType()
- if oper_status or oper_status==OperStatus.UNKNOWN:
+ if oper_status or oper_status == OperStatus.UNKNOWN:
o_status.val = oper_status
else:
o_status.val = -1
c_status = IntType()
- if connect_status or connect_status==ConnectStatus.UNKNOWN:
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
c_status.val = connect_status
else:
c_status.val = -1
@@ -334,12 +231,10 @@
connect_status=c_status)
returnValue(res)
-
def child_devices_removed(parent_device_id):
raise NotImplementedError()
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def device_pm_config_update(self, device_pm_config, init=False):
log.debug("device_pm_config_update")
@@ -349,16 +244,16 @@
device_pm_config=device_pm_config, init=b)
returnValue(res)
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def port_created(self, device_id, port):
log.debug("port_created")
proto_id = ID()
proto_id.id = device_id
- res = yield self.invoke(rpc="PortCreated", device_id=proto_id, port=port)
+ res = yield self.invoke(rpc="PortCreated", device_id=proto_id,
+ port=port)
returnValue(res)
-
def port_removed(device_id, port):
raise NotImplementedError()
diff --git a/adapters/kafka/kafka_inter_container_library.py b/adapters/kafka/kafka_inter_container_library.py
index f5bb720..3f6f5eb 100644
--- a/adapters/kafka/kafka_inter_container_library.py
+++ b/adapters/kafka/kafka_inter_container_library.py
@@ -14,28 +14,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from zope.interface import Interface, implementer
-from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+import time
+from uuid import uuid4
+
+import structlog
+from afkak.client import KafkaClient
+from afkak.consumer import OFFSET_LATEST, Consumer
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
DeferredQueue, gatherResults
-from afkak.client import KafkaClient
-from afkak.consumer import OFFSET_LATEST, Consumer
-import structlog
-from adapters.common.utils import asleep
-from adapters.protos.core_adapter_pb2 import MessageType, Argument, \
- InterContainerRequestBody, InterContainerMessage, Header, InterContainerResponseBody
-import time
-from uuid import uuid4
-from adapters.common.utils.registry import IComponent
+from zope.interface import implementer
+from adapters.common.utils import asleep
+from adapters.common.utils.registry import IComponent
+from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from adapters.protos.core_adapter_pb2 import MessageType, Argument, \
+ InterContainerRequestBody, InterContainerMessage, Header, \
+ InterContainerResponseBody
log = structlog.get_logger()
+
class KafkaMessagingError(BaseException):
def __init__(self, error):
self.error = error
+
@implementer(IComponent)
class IKafkaMessagingProxy(object):
_kafka_messaging_instance = None
@@ -115,7 +119,6 @@
except Exception as e:
log.exception("Failed-to-start-proxy", e=e)
-
def stop(self):
"""
Invoked to stop the kafka proxy
@@ -135,7 +138,6 @@
except Exception as e:
log.exception("Exception-when-stopping-messaging-proxy:", e=e)
-
@inlineCallbacks
def _wait_until_topic_is_ready(self, client, topic):
e = True
@@ -165,7 +167,8 @@
for partition in partitions]
self.topic_consumer_map[topic] = consumers
- log.debug("_subscribe", topic=topic, consumermap=self.topic_consumer_map)
+ log.debug("_subscribe", topic=topic,
+ consumermap=self.topic_consumer_map)
if target_cls is not None and callback is None:
# Scenario #1
@@ -409,6 +412,7 @@
Default internal method invoked for every batch of messages received
from Kafka.
"""
+
def _toDict(args):
"""
Convert a repeatable Argument type into a python dictionary
@@ -443,24 +447,6 @@
message.ParseFromString(val)
if message.header.type == MessageType.Value("REQUEST"):
- # if self.num_messages == 0:
- # self.init_time = int(round(time.time() * 1000))
- # self.init_received_time = message.header.timestamp
- # log.debug("INIT_TIME", time=self.init_time,
- # time_sent=message.header.timestamp)
- # self.num_messages = self.num_messages + 1
- #
- # self.total_time = self.total_time + current_time - message.header.timestamp
- #
- # if self.num_messages % 10 == 0:
- # log.debug("TOTAL_TIME ...",
- # num=self.num_messages,
- # total=self.total_time,
- # duration=current_time - self.init_time,
- # time_since_first_msg=current_time - self.init_received_time,
- # average=self.total_time / 10)
- # self.total_time = 0
-
# Get the target class for that specific topic
targetted_topic = self._to_string(message.header.to_topic)
msg_body = InterContainerRequestBody()
@@ -497,16 +483,6 @@
elif message.header.type == MessageType.Value("RESPONSE"):
trns_id = self._to_string(message.header.id)
if trns_id in self.transaction_id_deferred_map:
- # self.num_responses = self.num_responses + 1
- # self.total_time_responses = self.total_time_responses + current_time - \
- # message.header.timestamp
- # if self.num_responses % 10 == 0:
- # log.debug("TOTAL RESPONSES ...",
- # num=self.num_responses,
- # total=self.total_time_responses,
- # average=self.total_time_responses / 10)
- # self.total_time_responses = 0
-
resp = self._parse_response(val)
self.transaction_id_deferred_map[trns_id].callback(resp)
@@ -568,9 +544,9 @@
self.transaction_id_deferred_map[
self._to_string(request.header.id)] = wait_for_result
- log.debug("BEFORE-SENDING", to_topic=to_topic, from_topic=reply_topic)
yield self._send_kafka_message(to_topic, request)
- log.debug("AFTER-SENDING", to_topic=to_topic, from_topic=reply_topic)
+ log.debug("message-sent", to_topic=to_topic,
+ from_topic=reply_topic)
if response_required:
res = yield wait_for_result
diff --git a/adapters/kafka/kafka_proxy.py b/adapters/kafka/kafka_proxy.py
index 10fdbf8..c11caa7 100644
--- a/adapters/kafka/kafka_proxy.py
+++ b/adapters/kafka/kafka_proxy.py
@@ -16,7 +16,7 @@
from afkak.client import KafkaClient as _KafkaClient
from afkak.common import (
- PRODUCER_ACK_LOCAL_WRITE, PRODUCER_ACK_NOT_REQUIRED
+ PRODUCER_ACK_NOT_REQUIRED
)
from afkak.producer import Producer as _kafkaProducer
from structlog import get_logger
@@ -24,9 +24,8 @@
from zope.interface import implementer
from adapters.common.utils.consulhelpers import get_endpoint_from_consul
-from adapters.kafka.event_bus_publisher import EventBusPublisher
from adapters.common.utils.registry import IComponent
-import time
+from adapters.kafka.event_bus_publisher import EventBusPublisher
log = get_logger()
@@ -96,21 +95,12 @@
log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
pass
- #try:
- # if self.event_bus_publisher:
- # yield self.event_bus_publisher.stop()
- # self.event_bus_publisher = None
- # log.debug('stopped-event-bus-publisher-kafka-proxy')
- #except Exception, e:
- # log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
- # pass
-
log.debug('stopped-kafka-proxy')
except Exception, e:
self.kclient = None
self.kproducer = None
- #self.event_bus_publisher = None
+ # self.event_bus_publisher = None
log.exception('failed-stopped-kafka-proxy', e=e)
pass
@@ -122,7 +112,8 @@
if self.kafka_endpoint.startswith('@'):
try:
_k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
- self.kafka_endpoint[1:])
+ self.kafka_endpoint[
+ 1:])
log.debug('found-kafka-service', endpoint=_k_endpoint)
except Exception as e:
@@ -160,7 +151,8 @@
self._get_kafka_producer()
# Lets the next message request do the retry if still a failure
if self.kproducer is None:
- log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+ log.error('no-kafka-producer',
+ endpoint=self.kafka_endpoint)
return
# log.debug('sending-kafka-msg', topic=topic, msg=msg)
@@ -206,4 +198,3 @@
# Common method to get the singleton instance of the kafka proxy class
def get_kafka_proxy():
return KafkaProxy._kafka_instance
-