[VOL-1034, VOL-1035, VOL-1037] This commit consists of:
1) Implementation of inter-adapter communication using flows
as proxy message between an ONU and its parent OLT.
2) Update the protos to reflect the inter-adapter message structure
3) Cleanup the ponsim adapters to removed unsued references and
general cleanup.
Change-Id: Ibe913a80a96d601fed946d9b9db55bb8d4f2c15a
diff --git a/adapters/kafka/kafka_proxy.py b/adapters/kafka/kafka_proxy.py
index 10fdbf8..c11caa7 100644
--- a/adapters/kafka/kafka_proxy.py
+++ b/adapters/kafka/kafka_proxy.py
@@ -16,7 +16,7 @@
from afkak.client import KafkaClient as _KafkaClient
from afkak.common import (
- PRODUCER_ACK_LOCAL_WRITE, PRODUCER_ACK_NOT_REQUIRED
+ PRODUCER_ACK_NOT_REQUIRED
)
from afkak.producer import Producer as _kafkaProducer
from structlog import get_logger
@@ -24,9 +24,8 @@
from zope.interface import implementer
from adapters.common.utils.consulhelpers import get_endpoint_from_consul
-from adapters.kafka.event_bus_publisher import EventBusPublisher
from adapters.common.utils.registry import IComponent
-import time
+from adapters.kafka.event_bus_publisher import EventBusPublisher
log = get_logger()
@@ -96,21 +95,12 @@
log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
pass
- #try:
- # if self.event_bus_publisher:
- # yield self.event_bus_publisher.stop()
- # self.event_bus_publisher = None
- # log.debug('stopped-event-bus-publisher-kafka-proxy')
- #except Exception, e:
- # log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
- # pass
-
log.debug('stopped-kafka-proxy')
except Exception, e:
self.kclient = None
self.kproducer = None
- #self.event_bus_publisher = None
+ # self.event_bus_publisher = None
log.exception('failed-stopped-kafka-proxy', e=e)
pass
@@ -122,7 +112,8 @@
if self.kafka_endpoint.startswith('@'):
try:
_k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
- self.kafka_endpoint[1:])
+ self.kafka_endpoint[
+ 1:])
log.debug('found-kafka-service', endpoint=_k_endpoint)
except Exception as e:
@@ -160,7 +151,8 @@
self._get_kafka_producer()
# Lets the next message request do the retry if still a failure
if self.kproducer is None:
- log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+ log.error('no-kafka-producer',
+ endpoint=self.kafka_endpoint)
return
# log.debug('sending-kafka-msg', topic=topic, msg=msg)
@@ -206,4 +198,3 @@
# Common method to get the singleton instance of the kafka proxy class
def get_kafka_proxy():
return KafkaProxy._kafka_instance
-