This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.
Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/kafka/client.go b/kafka/client.go
index ad8f01a..b93ad86 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -33,7 +33,7 @@
DefaultProducerFlushFrequency = 5
DefaultProducerFlushMessages = 1
DefaultProducerFlushMaxmessages = 5
- DefaultProducerReturnSuccess = false
+ DefaultProducerReturnSuccess = true
DefaultProducerReturnErrors = true
DefaultProducerRetryMax = 3
DefaultProducerRetryBackoff = time.Millisecond * 100
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index e2210c4..ff3584f 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -589,7 +589,7 @@
// partitions.
replyTopic := &Topic{Name: msg.Header.FromTopic}
key := GetDeviceIdFromTopic(*replyTopic)
- log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "key": key})
+ log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
// TODO: handle error response.
kp.kafkaClient.Send(icm, replyTopic, key)
}
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index f2de01a..8468e42 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -109,13 +109,25 @@
}
}
-func ReturnOnErrors(opt bool) SaramaClientOption {
+func ProducerMaxRetries(num int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerRetryMax = num
+ }
+}
+
+func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerRetryBackOff = duration
+ }
+}
+
+func ProducerReturnOnErrors(opt bool) SaramaClientOption {
return func(args *SaramaClient) {
args.producerReturnErrors = opt
}
}
-func ReturnOnSuccess(opt bool) SaramaClientOption {
+func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
return func(args *SaramaClient) {
args.producerReturnSuccess = opt
}
@@ -376,6 +388,16 @@
// Send message to kafka
sc.producer.Input() <- kafkaMsg
+
+ // Wait for result
+ // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+ select {
+ case ok := <-sc.producer.Successes():
+ log.Debugw("message-sent", log.Fields{"status":ok})
+ case notOk := <-sc.producer.Errors():
+ log.Debugw("error-sending", log.Fields{"status":notOk})
+ return notOk
+ }
return nil
}
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index 978f57d..cf7afd8 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -18,11 +18,11 @@
This facade handles kafka-formatted messages from the Core, extracts the kafka
formatting and forwards the request to the concrete handler.
"""
-
+import structlog
from twisted.internet.defer import inlineCallbacks
from zope.interface import implementer
from twisted.internet import reactor
-
+from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
from python.adapters.interface import IAdapterInterface
from python.protos.core_adapter_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
from python.protos.device_pb2 import Device
@@ -31,6 +31,7 @@
from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
get_messaging_proxy
+log = structlog.get_logger()
class MacAddressError(BaseException):
def __init__(self, error):
@@ -59,31 +60,34 @@
@inlineCallbacks
def start(self):
- self.log.debug('starting')
+ log.debug('starting')
@inlineCallbacks
def stop(self):
- self.log.debug('stopping')
+ log.debug('stopping')
-
+ @inlineCallbacks
def createKafkaDeviceTopic(self, deviceId):
+ log.debug("subscribing-to-topic", device_id=deviceId)
kafka_proxy = get_messaging_proxy()
device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
- kafka_proxy.subscribe(topic=device_topic, target_cls=self)
+ # yield kafka_proxy.create_topic(topic=device_topic)
+ yield kafka_proxy.subscribe(topic=device_topic, target_cls=self, offset=OFFSET_EARLIEST)
+ log.debug("subscribed-to-topic", topic=device_topic)
def adopt_device(self, device):
d = Device()
if device:
device.Unpack(d)
+ # Start the creation of a device specific topic to handle all
+ # subsequent requests from the Core. This adapter instance will
+ # handle all requests for that device.
+ reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+
result = self.adapter.adopt_device(d)
# return True, self.adapter.adopt_device(d)
- # Before we return, create a device specific topic to handle all
- # subsequent requests from the Core. This adapter instance will
- # handle all requests for that device
- reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
-
return True, result
else:
return False, Error(code=ErrorCode.INVALID_PARAMETERS,
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index aaa0c3c..a3de4f1 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -138,6 +138,11 @@
except Exception as e:
log.exception("Exception-when-stopping-messaging-proxy:", e=e)
+
+ @inlineCallbacks
+ def create_topic(self, topic):
+ yield self._wait_until_topic_is_ready(self.kafka_client, topic)
+
@inlineCallbacks
def _wait_until_topic_is_ready(self, client, topic):
e = True
@@ -159,8 +164,9 @@
return self.default_topic
@inlineCallbacks
- def _subscribe(self, topic, callback=None, target_cls=None):
+ def _subscribe(self, topic, offset, callback=None, target_cls=None):
try:
+ log.debug("subscribing-to-topic-start", topic=topic)
yield self._wait_until_topic_is_ready(self.kafka_client, topic)
partitions = self.kafka_client.topic_partitions[topic]
consumers = []
@@ -168,6 +174,7 @@
# First setup the generic callback - all received messages will
# go through that queue
if topic not in self.topic_consumer_map:
+ log.debug("topic-not-in-consumer-map", topic=topic)
consumers = [Consumer(self.kafka_client, topic, partition,
self._enqueue_received_message)
for partition in partitions]
@@ -206,15 +213,18 @@
log.warn("Consumers-failed", failure=failure)
for c in consumers:
- c.start(OFFSET_LATEST).addCallbacks(cb_closed, eb_failed)
+ c.start(offset).addCallbacks(cb_closed, eb_failed)
+
+ log.debug("subscribed-to-topic", topic=topic)
returnValue(True)
except Exception as e:
log.exception("Exception-during-subscription", e=e)
returnValue(False)
+ @inlineCallbacks
def subscribe(self, topic, callback=None, target_cls=None,
- max_retry=3):
+ max_retry=3, offset=OFFSET_LATEST):
"""
Scenario 1: invoked to subscribe to a specific topic with a
target_cls to invoke when a message is received on that topic. This
@@ -246,14 +256,26 @@
return asleep(wait_time)
retry = 0
- while not self._subscribe(topic, callback=callback,
- target_cls=target_cls):
- if retry > max_retry:
- return False
+ subscribed = False
+ while not subscribed:
+ subscribed = yield self._subscribe(topic, callback=callback,
+ target_cls=target_cls, offset=offset)
+ if subscribed:
+ returnValue(True)
+ elif retry > max_retry:
+ returnValue(False)
else:
_backoff("subscription-not-complete", retry)
retry += 1
- return True
+
+ # while not self._subscribe(topic, callback=callback,
+ # target_cls=target_cls):
+ # if retry > max_retry:
+ # return False
+ # else:
+ # _backoff("subscription-not-complete", retry)
+ # retry += 1
+ # return True
def unsubscribe(self, topic):
"""
diff --git a/python/adapters/kafka/kafka_proxy.py b/python/adapters/kafka/kafka_proxy.py
index 6dcb10f..d596334 100644
--- a/python/adapters/kafka/kafka_proxy.py
+++ b/python/adapters/kafka/kafka_proxy.py
@@ -136,6 +136,25 @@
log.exception('failed-get-kafka-producer', e=e)
return
+
+ # @inlineCallbacks
+ # def wait_until_topic_is_ready(self, topic):
+ # # Assumes "auto.create.topics.enable" is set in the broker
+ # # configuration
+ # e = True
+ # while e:
+ # yield self.kclient.load_metadata_for_topics(topic)
+ # e = self.kclient.metadata_error_for_topic(topic)
+ # if e:
+ # log.debug("Topic-not-ready-retrying...", topic=topic)
+
+
+ @inlineCallbacks
+ def create_topic(self, topic):
+ # Assume auto create is enabled on the broker configuration
+ yield self.wait_until_topic_is_ready(topic)
+
+
@inlineCallbacks
def send_message(self, topic, msg):
assert topic is not None
diff --git a/python/adapters/ponsim_olt/ponsim_olt.py b/python/adapters/ponsim_olt/ponsim_olt.py
index df834e5..95d6590 100644
--- a/python/adapters/ponsim_olt/ponsim_olt.py
+++ b/python/adapters/ponsim_olt/ponsim_olt.py
@@ -304,7 +304,6 @@
reactor.callInThread(self.rcv_grpc)
self.log.info('started-frame-grpc-stream')
- # TODO
# Start collecting stats from the device after a brief pause
self.start_kpi_collection(device.id)
except Exception as e:
@@ -359,6 +358,7 @@
def reconcile(self, device):
self.log.info('reconciling-OLT-device')
+ @inlineCallbacks
def _rcv_frame(self, frame):
pkt = Ether(frame)
@@ -373,7 +373,7 @@
inner_shim.payload
)
self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
- self.core_proxy.send_packet_in(device_id=self.device_id,
+ yield self.core_proxy.send_packet_in(device_id=self.device_id,
port=cvid,
packet=str(popped_frame))
elif pkt.haslayer(Raw):
@@ -398,7 +398,7 @@
for frame in self.frames:
self.log.info('received-grpc-frame',
frame_len=len(frame.payload))
- self._rcv_frame(frame.payload)
+ yield self._rcv_frame(frame.payload)
except _Rendezvous, e:
log.warn('grpc-connection-lost', message=e.message)
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 4af4fb0..3ce59a7 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -77,16 +77,20 @@
}
// Use a device topic for the response as we are the only core handling requests for this device
replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+ log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+ return err
+ }
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
- if success {
- // From now on, any unsolicited requests from the adapters for this device will come over the device topic.
- // We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
- if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
- log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
- return err
- }
- }
+ //if success {
+ // // From now on, any unsolicited requests from the adapters for this device will come over the device topic.
+ // // We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
+ // if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+ // log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+ // return err
+ // }
+ //}
return unPackResponse(rpc, device.Id, success, result)
}
diff --git a/rw_core/main.go b/rw_core/main.go
index 77ce304..472072a 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -69,7 +69,11 @@
case "sarama":
return kafka.NewSaramaClient(
kafka.Host(host),
- kafka.Port(port)), nil
+ kafka.Port(port),
+ kafka.ProducerReturnOnErrors(true),
+ kafka.ProducerReturnOnSuccess(true),
+ kafka.ProducerMaxRetries(6),
+ kafka.ProducerRetryBackoff(time.Millisecond * 30)), nil
}
return nil, errors.New("unsupported-client-type")
}
@@ -109,7 +113,7 @@
func (rw *rwCore) start(ctx context.Context) {
log.Info("Starting RW Core components")
- // Setup KV MsgClient
+ // Setup KV Client
log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
err := rw.setKVClient()
if err == nil {
@@ -121,6 +125,7 @@
rw.config.KVTxnKeyDelTime)
}
+ // Setup Kafka Client
if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort); err != nil {
log.Fatal("Unsupported-kafka-client")
}
@@ -214,7 +219,7 @@
}
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
- log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.WarnLevel)
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
defer log.CleanUp()