This update consists of the following changes:
1) Add GroupConsumer to the Go sarama_client and modify the Core
code to use a groupConsumer instead of a partition consumer. This
change will ensure that multiple consumers (with different group Ids)
can consume kafka messages from the same topic.
2) Remove afkak kafka client and replace it with confluent kakfa,
a change done in voltha 1.x. Modify the code accordingly.
3) Add a Group Consumer to the Python kakfa client such that
several instances of an Adapter can consume the same messages from
the same topic.
4) Set the datapath_id for the logical device in the Core.
Change-Id: I5d7ced27c9aeca4f6211baa3dc8cb3db861545e4
diff --git a/kafka/client.go b/kafka/client.go
index 1df700e..8cc1999 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -26,9 +26,13 @@
)
const (
+ GroupIdKey = "groupId"
+)
+
+const (
DefaultKafkaHost = "127.0.0.1"
DefaultKafkaPort = 9092
- DefaultGroupName = "rw_core"
+ DefaultGroupName = "voltha"
DefaultSleepOnError = 1
DefaultProducerFlushFrequency = 5
DefaultProducerFlushMessages = 1
@@ -51,7 +55,7 @@
Stop()
CreateTopic(topic *Topic, numPartition int, repFactor int) error
DeleteTopic(topic *Topic) error
- Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error)
+ Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
Send(msg interface{}, topic *Topic, keys ...string) error
}
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 3381fdc..c527619 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -33,12 +33,12 @@
// Initialize the logger - gets the default until the main function setup the logger
func init() {
- log.AddPackage(log.JSON, log.WarnLevel, nil)
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
}
const (
DefaultMaxRetries = 3
- DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
+ DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
)
// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
@@ -172,7 +172,7 @@
log.Info("stopping-intercontainer-proxy")
kp.doneCh <- 1
// TODO : Perform cleanup
- //kp.kafkaClient.Stop()
+ kp.kafkaClient.Stop()
//kp.deleteAllTopicRequestHandlerChannelMap()
//kp.deleteAllTopicResponseChannelMap()
//kp.deleteAllTransactionIdToChannelMap()
@@ -331,6 +331,7 @@
var err error
if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+ return err
}
kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
@@ -676,7 +677,7 @@
for {
select {
case msg := <-subscribedCh:
- //log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
+ log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
if msg.Header.Type == ic.MessageType_RESPONSE {
go kp.dispatchResponse(msg)
}
@@ -700,6 +701,7 @@
// First check whether we already have a channel listening for response on that topic. If there is
// already one then it will be reused. If not, it will be created.
if !kp.isTopicSubscribedForResponse(topic.Name) {
+ log.Debugw("not-subscribed-for-response", log.Fields{"topic": topic.Name, "trnsid": trnsId})
var subscribedCh <-chan *ic.InterContainerMessage
var err error
if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 2df19e5..10c692a 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -30,7 +30,7 @@
)
func init() {
- log.AddPackage(log.JSON, log.WarnLevel, nil)
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
}
type returnErrorFunction func() error
@@ -51,9 +51,10 @@
KafkaPort int
producer sarama.AsyncProducer
consumer sarama.Consumer
- groupConsumer *scc.Consumer
+ groupConsumers map[string]*scc.Consumer
+ consumerGroupPrefix string
consumerType int
- groupName string
+ consumerGroupName string
producerFlushFrequency int
producerFlushMessages int
producerFlushMaxmessages int
@@ -87,6 +88,18 @@
}
}
+func ConsumerGroupPrefix(prefix string) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerGroupPrefix = prefix
+ }
+}
+
+func ConsumerGroupName(name string) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerGroupName = name
+ }
+}
+
func ConsumerType(consumer int) SaramaClientOption {
return func(args *SaramaClient) {
args.consumerType = consumer
@@ -188,6 +201,8 @@
option(client)
}
+ client.groupConsumers = make(map[string]*scc.Consumer)
+
client.lockTopicToConsumerChannelMap = sync.RWMutex{}
client.topicLockMap = make(map[string]*sync.RWMutex)
client.lockOfTopicLockMap = sync.RWMutex{}
@@ -214,15 +229,19 @@
return err
}
- // Create the master consumers
- if err := sc.createConsumer(); err != nil {
- log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
- return err
+ if sc.consumerType == DefaultConsumerType {
+ // Create the master consumers
+ if err := sc.createConsumer(); err != nil {
+ log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
+ return err
+ }
}
// Create the topic to consumers/channel map
sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
+ log.Info("kafka-sarama-client-started")
+
return nil
}
@@ -234,39 +253,38 @@
if sc.producer != nil {
if err := sc.producer.Close(); err != nil {
- panic(err)
+ log.Errorw("closing-producer-failed", log.Fields{"error": err})
}
}
if sc.consumer != nil {
if err := sc.consumer.Close(); err != nil {
- panic(err)
+ log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
}
}
- if sc.groupConsumer != nil {
- if err := sc.groupConsumer.Close(); err != nil {
- panic(err)
+ for key, val := range sc.groupConsumers {
+ log.Debugw("closing-group-consumer", log.Fields{"topic": key})
+ if err := val.Close(); err != nil {
+ log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
}
}
if sc.cAdmin != nil {
if err := sc.cAdmin.Close(); err != nil {
- panic(err)
+ log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
}
}
//TODO: Clear the consumers map
- sc.clearConsumerChannelMap()
+ //sc.clearConsumerChannelMap()
log.Info("sarama-client-stopped")
}
-//CreateTopic creates a topic on the Kafka Broker.
-func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
- sc.lockTopic(topic)
- defer sc.unLockTopic(topic)
-
+//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
+// the invoking function must hold the lock
+func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
// Set the topic details
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(numPartition)
@@ -290,6 +308,15 @@
return nil
}
+//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
+// ensure no two go routines are performing operations on the same topic
+func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
+ return sc.createTopic(topic, numPartition, repFactor)
+}
+
//DeleteTopic removes a topic from the kafka Broker
func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
sc.lockTopic(topic)
@@ -316,7 +343,7 @@
// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
// messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
@@ -338,7 +365,7 @@
// Use the consumerType option to figure out the type of consumer to launch
if sc.consumerType == PartitionConsumer {
if sc.autoCreateTopic {
- if err = sc.CreateTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+ if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
@@ -350,10 +377,26 @@
} else if sc.consumerType == GroupCustomer {
// TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
// does not consume from a precreated topic in some scenarios
- if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, "mytest"); err != nil {
- log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+ //if sc.autoCreateTopic {
+ // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+ // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+ // return nil, err
+ // }
+ //}
+ //groupId := sc.consumerGroupName
+ groupId := getGroupId(kvArgs...)
+ // Include the group prefix
+ if groupId != "" {
+ groupId = sc.consumerGroupPrefix + groupId
+ } else {
+ // Need to use a unique group Id per topic
+ groupId = sc.consumerGroupPrefix + topic.Name
+ }
+ if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
+ log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
+
} else {
log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
return nil, errors.New("unknown-consumer-type")
@@ -416,6 +459,16 @@
return nil
}
+// getGroupId returns the group id from the key-value args.
+func getGroupId(kvArgs ...*KVArg) string {
+ for _, arg := range kvArgs {
+ if arg.Key == GroupIdKey {
+ return arg.Value.(string)
+ }
+ }
+ return ""
+}
+
func (sc *SaramaClient) createClusterAdmin() error {
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
config := sarama.NewConfig()
@@ -623,7 +676,7 @@
}
// createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
config := scc.NewConfig()
config.ClientID = uuid.New().String()
config.Group.Mode = scc.ConsumerModeMultiplex
@@ -632,24 +685,22 @@
//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ //config.Consumer.Offsets.Initial = sarama.OffsetOldest
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
brokers := []string{kafkaFullAddr}
- if groupId == nil {
- g := DefaultGroupName
- groupId = &g
- }
topics := []string{topic.Name}
var consumer *scc.Consumer
var err error
- if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
- log.Errorw("create-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
+ log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
- log.Debugw("create-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+ log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
//time.Sleep(10*time.Second)
- sc.groupConsumer = consumer
+ //sc.groupConsumer = consumer
+ sc.groupConsumers[topic.Name] = consumer
return consumer, nil
}
@@ -704,24 +755,36 @@
func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+ //go func() {
+ // for msg := range consumer.Errors() {
+ // log.Warnw("group-consumers-error", log.Fields{"error": msg.Error()})
+ // }
+ //}()
+ //
+ //go func() {
+ // for ntf := range consumer.Notifications() {
+ // log.Debugw("group-received-notification", log.Fields{"notification": ntf})
+ // }
+ //}()
+
startloop:
for {
select {
case err := <-consumer.Errors():
if err != nil {
- log.Warnw("group-consumers-error", log.Fields{"error": err})
+ log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
} else {
// There is a race condition when this loop is stopped and the consumer is closed where
// the actual error comes as nil
- log.Warn("group-consumers-error")
+ log.Warnw("group-consumers-error-nil", log.Fields{"topic": topic.Name})
}
case msg := <-consumer.Messages():
- //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
if msg == nil {
// There is a race condition when this loop is stopped and the consumer is closed where
// the actual msg comes as nil
break startloop
}
+ log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
msgBody := msg.Value
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
@@ -800,7 +863,7 @@
// TODO: Replace this development partition consumers with a group consumers
var pConsumer *scc.Consumer
var err error
- if pConsumer, err = sc.createGroupConsumer(topic, &groupId, DefaultMaxRetries); err != nil {
+ if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index 0efb811..de8ae0b 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -22,6 +22,7 @@
from twisted.internet.defer import inlineCallbacks
from zope.interface import implementer
from twisted.internet import reactor
+
from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
from python.adapters.interface import IAdapterInterface
from python.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
@@ -29,7 +30,7 @@
from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
FlowGroupChanges, ofp_packet_out
from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
- get_messaging_proxy
+ get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST
log = structlog.get_logger()
@@ -72,7 +73,7 @@
kafka_proxy = get_messaging_proxy()
device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
# yield kafka_proxy.create_topic(topic=device_topic)
- yield kafka_proxy.subscribe(topic=device_topic, target_cls=self, offset=OFFSET_EARLIEST)
+ yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
log.debug("subscribed-to-topic", topic=device_topic)
def adopt_device(self, device):
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index fbb0834..5cad2e8 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -18,8 +18,6 @@
from uuid import uuid4
import structlog
-from afkak.client import KafkaClient
-from afkak.consumer import OFFSET_LATEST, Consumer
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
DeferredQueue, gatherResults
@@ -34,6 +32,9 @@
log = structlog.get_logger()
+KAFKA_OFFSET_LATEST = 'latest'
+KAFKA_OFFSET_EARLIEST = 'earliest'
+
class KafkaMessagingError(BaseException):
def __init__(self, error):
@@ -48,6 +49,7 @@
kafka_host_port,
kv_store,
default_topic,
+ group_id_prefix,
target_cls):
"""
Initialize the kafka proxy. This is a singleton (may change to
@@ -67,15 +69,15 @@
self.kafka_host_port = kafka_host_port
self.kv_store = kv_store
self.default_topic = default_topic
+ self.default_group_id = "_".join((group_id_prefix, default_topic))
self.target_cls = target_cls
self.topic_target_cls_map = {}
- self.topic_consumer_map = {}
self.topic_callback_map = {}
self.subscribers = {}
- self.kafka_client = None
self.kafka_proxy = None
self.transaction_id_deferred_map = {}
self.received_msg_queue = DeferredQueue()
+ self.stopped = False
self.init_time = 0
self.init_received_time = 0
@@ -91,11 +93,7 @@
def start(self):
try:
- # Create the kafka client
- # assert self.kafka_host is not None
- # assert self.kafka_port is not None
- # kafka_host_port = ":".join((self.kafka_host, self.kafka_port))
- self.kafka_client = KafkaClient(self.kafka_host_port)
+ log.debug("KafkaProxy-starting")
# Get the kafka proxy instance. If it does not exist then
# create it
@@ -110,12 +108,17 @@
# Start the queue to handle incoming messages
reactor.callLater(0, self._received_message_processing_loop)
- # Start listening for incoming messages
- reactor.callLater(0, self.subscribe, self.default_topic,
- target_cls=self.target_cls)
+ # Subscribe using the default topic and default group id. Whenever
+ # a message is received on that topic then teh target_cls will be
+ # invoked.
+ reactor.callLater(0, self.subscribe,
+ topic=self.default_topic,
+ target_cls=self.target_cls,
+ group_id=self.default_group_id)
# Setup the singleton instance
IKafkaMessagingProxy._kafka_messaging_instance = self
+ log.debug("KafkaProxy-started")
except Exception as e:
log.exception("Failed-to-start-proxy", e=e)
@@ -126,37 +129,14 @@
"""
log.debug("Stopping-messaging-proxy ...")
try:
- # Stop all the consumers
- deferred_list = []
- for key, values in self.topic_consumer_map.iteritems():
- deferred_list.extend([c.stop() for c in values])
-
- if not deferred_list:
- d = gatherResults(deferred_list)
- d.addCallback(lambda result: self.kafka_client.close())
+ # Stop the kafka proxy. This will stop all the consumers
+ # and producers
+ self.stopped = True
+ self.kafka_proxy.stop()
log.debug("Messaging-proxy-stopped.")
except Exception as e:
log.exception("Exception-when-stopping-messaging-proxy:", e=e)
-
- @inlineCallbacks
- def create_topic(self, topic):
- yield self._wait_until_topic_is_ready(self.kafka_client, topic)
-
- @inlineCallbacks
- def _wait_until_topic_is_ready(self, client, topic):
- e = True
- while e:
- yield client.load_metadata_for_topics(topic)
- e = client.metadata_error_for_topic(topic)
- if e:
- log.debug("Topic-not-ready-retrying...", topic=topic)
-
- def _clear_backoff(self):
- if self.retries:
- log.info('reconnected-to-consul', after_retries=self.retries)
- self.retries = 0
-
def get_target_cls(self):
return self.target_cls
@@ -164,24 +144,13 @@
return self.default_topic
@inlineCallbacks
- def _subscribe(self, topic, offset, callback=None, target_cls=None):
+ def _subscribe_group_consumer(self, group_id, topic, offset, callback=None,
+ target_cls=None):
try:
log.debug("subscribing-to-topic-start", topic=topic)
- yield self._wait_until_topic_is_ready(self.kafka_client, topic)
- partitions = self.kafka_client.topic_partitions[topic]
- consumers = []
-
- # First setup the generic callback - all received messages will
- # go through that queue
- if topic not in self.topic_consumer_map:
- log.debug("topic-not-in-consumer-map", topic=topic)
- consumers = [Consumer(self.kafka_client, topic, partition,
- self._enqueue_received_message)
- for partition in partitions]
- self.topic_consumer_map[topic] = consumers
-
- log.debug("_subscribe", topic=topic,
- consumermap=self.topic_consumer_map)
+ yield self.kafka_proxy.subscribe(topic,
+ self._enqueue_received_group_message,
+ group_id, offset)
if target_cls is not None and callback is None:
# Scenario #1
@@ -198,25 +167,6 @@
else:
log.warn("invalid-parameters")
- def cb_closed(result):
- """
- Called when a consumer cleanly stops.
- """
- log.debug("Consumers-cleanly-stopped")
-
- def eb_failed(failure):
- """
- Called when a consumer fails due to an uncaught exception in the
- processing callback or a network error on shutdown. In this case we
- simply log the error.
- """
- log.warn("Consumers-failed", failure=failure)
-
- for c in consumers:
- c.start(offset).addCallbacks(cb_closed, eb_failed)
-
- log.debug("subscribed-to-topic", topic=topic)
-
returnValue(True)
except Exception as e:
log.exception("Exception-during-subscription", e=e)
@@ -224,7 +174,7 @@
@inlineCallbacks
def subscribe(self, topic, callback=None, target_cls=None,
- max_retry=3, offset=OFFSET_LATEST):
+ max_retry=3, group_id=None, offset=KAFKA_OFFSET_LATEST):
"""
Scenario 1: invoked to subscribe to a specific topic with a
target_cls to invoke when a message is received on that topic. This
@@ -245,6 +195,8 @@
:param max_retry: the number of retries before reporting failure
to subscribe. This caters for scenario where the kafka topic is not
ready.
+ :param group_id: The ID of the group the consumer is subscribing to
+ :param offset: The topic offset on the kafka bus from where message consumption will start
:return: True on success, False on failure
"""
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
@@ -253,13 +205,20 @@
wait_time = RETRY_BACKOFF[min(retries,
len(RETRY_BACKOFF) - 1)]
log.info(msg, retry_in=wait_time)
- return asleep(wait_time)
+ return asleep.asleep(wait_time)
+
+ log.debug("subscribing", topic=topic, group_id=group_id,
+ callback=callback, target=target_cls)
retry = 0
subscribed = False
+ if group_id is None:
+ group_id = self.default_group_id
while not subscribed:
- subscribed = yield self._subscribe(topic, callback=callback,
- target_cls=target_cls, offset=offset)
+ subscribed = yield self._subscribe_group_consumer(group_id, topic,
+ callback=callback,
+ target_cls=target_cls,
+ offset=offset)
if subscribed:
returnValue(True)
elif retry > max_retry:
@@ -268,51 +227,56 @@
_backoff("subscription-not-complete", retry)
retry += 1
- # while not self._subscribe(topic, callback=callback,
- # target_cls=target_cls):
- # if retry > max_retry:
- # return False
- # else:
- # _backoff("subscription-not-complete", retry)
- # retry += 1
- # return True
-
- def unsubscribe(self, topic):
+ def unsubscribe(self, topic, callback=None, target_cls=None):
"""
Invoked when unsubscribing to a topic
- :param topic: topic to unsubscibe from
+ :param topic: topic to unsubscribe from
+ :param callback: the callback used when subscribing to the topic, if any
+ :param target_cls: the targert class used when subscribing to the topic, if any
:return: None on success or Exception on failure
"""
log.debug("Unsubscribing-to-topic", topic=topic)
- def remove_topic(topic):
- if topic in self.topic_consumer_map:
- del self.topic_consumer_map[topic]
-
try:
- if topic in self.topic_consumer_map:
- consumers = self.topic_consumer_map[topic]
- d = gatherResults([c.stop() for c in consumers])
- d.addCallback(remove_topic, topic)
- log.debug("Unsubscribed-to-topic.", topic=topic)
- else:
- log.debug("Topic-does-not-exist.", topic=topic)
+ self.kafka_proxy.unsubscribe(topic,
+ self._enqueue_received_group_message)
+
+ if callback is None and target_cls is None:
+ log.error("both-call-and-target-cls-cannot-be-none",
+ topic=topic)
+ raise KafkaMessagingError(
+ error="both-call-and-target-cls-cannot-be-none")
+
+ if target_cls is not None and topic in self.topic_target_cls_map:
+ del self.topic_target_cls_map[topic]
+
+ if callback is not None and topic in self.topic_callback_map:
+ index = 0
+ for cb in self.topic_callback_map[topic]:
+ if cb == callback:
+ break
+ index += 1
+ if index < len(self.topic_callback_map[topic]):
+ self.topic_callback_map[topic].pop(index)
+
+ if len(self.topic_callback_map[topic]) == 0:
+ del self.topic_callback_map[topic]
except Exception as e:
- log.exception("Exception-when-stopping-messaging-proxy:", e=e)
+ log.exception("Exception-when-unsubscribing-to-topic", topic=topic,
+ e=e)
+ return e
@inlineCallbacks
- def _enqueue_received_message(self, reactor, message_list):
+ def _enqueue_received_group_message(self, msg):
"""
Internal method to continuously queue all received messaged
irrespective of topic
- :param reactor: A requirement by the Twisted Python kafka library
- :param message_list: Received list of messages
+ :param msg: Received message
:return: None on success, Exception on failure
"""
try:
- for m in message_list:
- log.debug("received-msg", msg=m)
- yield self.received_msg_queue.put(m)
+ log.debug("received-msg", msg=msg)
+ yield self.received_msg_queue.put(msg)
except Exception as e:
log.exception("Failed-enqueueing-received-message", e=e)
@@ -327,6 +291,8 @@
try:
message = yield self.received_msg_queue.get()
yield self._process_message(message)
+ if self.stopped:
+ break
except Exception as e:
log.exception("Failed-dequeueing-received-message", e=e)
@@ -457,16 +423,18 @@
current_time = int(round(time.time() * 1000))
# log.debug("Got Message", message=m)
try:
- val = m.message.value
+ val = m.value()
+ # val = m.message.value
# print m.topic
# Go over customized callbacks first
- if m.topic in self.topic_callback_map:
- for c in self.topic_callback_map[m.topic]:
+ m_topic = m.topic()
+ if m_topic in self.topic_callback_map:
+ for c in self.topic_callback_map[m_topic]:
yield c(val)
# Check whether we need to process request/response scenario
- if m.topic not in self.topic_target_cls_map:
+ if m_topic not in self.topic_target_cls_map:
return
# Process request/response scenario
@@ -506,7 +474,8 @@
response.header.to_topic)
self._send_kafka_message(res_topic, response)
- log.debug("Response-sent", response=response.body, to_topic=res_topic)
+ log.debug("Response-sent", response=response.body,
+ to_topic=res_topic)
elif message.header.type == MessageType.Value("RESPONSE"):
trns_id = self._to_string(message.header.id)
if trns_id in self.transaction_id_deferred_map:
diff --git a/python/adapters/kafka/kafka_proxy.py b/python/adapters/kafka/kafka_proxy.py
index d596334..cefc590 100644
--- a/python/adapters/kafka/kafka_proxy.py
+++ b/python/adapters/kafka/kafka_proxy.py
@@ -14,18 +14,19 @@
# limitations under the License.
#
-from afkak.client import KafkaClient as _KafkaClient
-from afkak.common import (
- PRODUCER_ACK_NOT_REQUIRED
-)
-from afkak.producer import Producer as _kafkaProducer
+
+from confluent_kafka import Producer as _kafkaProducer
from structlog import get_logger
+from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.threads import deferToThread
from zope.interface import implementer
from python.common.utils.consulhelpers import get_endpoint_from_consul
-from python.common.utils.registry import IComponent
from event_bus_publisher import EventBusPublisher
+from python.common.utils.registry import IComponent
+from confluent_kafka import Consumer, KafkaError
+import threading
log = get_logger()
@@ -33,7 +34,11 @@
@implementer(IComponent)
class KafkaProxy(object):
"""
- This is a singleton proxy kafka class to hide the kafka client details.
+ This is a singleton proxy kafka class to hide the kafka client details. This
+ proxy uses confluent-kafka-python as the kafka client. Since that client is
+ not a Twisted client then requests to that client are wrapped with
+ twisted.internet.threads.deferToThread to avoid any potential blocking of
+ the Twisted loop.
"""
_kafka_instance = None
@@ -42,6 +47,7 @@
kafka_endpoint='localhost:9092',
ack_timeout=1000,
max_req_attempts=10,
+ consumer_poll_timeout=10,
config={}):
# return an exception if the object already exist
@@ -59,6 +65,10 @@
self.event_bus_publisher = None
self.stopping = False
self.faulty = False
+ self.consumer_poll_timeout = consumer_poll_timeout
+ self.topic_consumer_map = {}
+ self.topic_callbacks_map = {}
+ self.topic_any_map_lock = threading.Lock()
log.debug('initialized', endpoint=kafka_endpoint)
@inlineCallbacks
@@ -77,6 +87,7 @@
def stop(self):
try:
log.debug('stopping-kafka-proxy')
+ self.stopping = True
try:
if self.kclient:
yield self.kclient.close()
@@ -84,16 +95,38 @@
log.debug('stopped-kclient-kafka-proxy')
except Exception, e:
log.exception('failed-stopped-kclient-kafka-proxy', e=e)
- pass
try:
if self.kproducer:
- yield self.kproducer.stop()
+ yield self.kproducer.flush()
self.kproducer = None
log.debug('stopped-kproducer-kafka-proxy')
except Exception, e:
log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
- pass
+
+ # Stop all consumers
+ try:
+ self.topic_any_map_lock.acquire()
+ log.debug('stopping-consumers-kafka-proxy')
+ for _, c in self.topic_consumer_map.iteritems():
+ yield deferToThread(c.close)
+ self.topic_consumer_map.clear()
+ self.topic_callbacks_map.clear()
+ log.debug('stopped-consumers-kafka-proxy')
+ except Exception, e:
+ log.exception('failed-stopped-consumers-kafka-proxy', e=e)
+ finally:
+ self.topic_any_map_lock.release()
+ log.debug('stopping-consumers-kafka-proxy-released-lock')
+
+ # try:
+ # if self.event_bus_publisher:
+ # yield self.event_bus_publisher.stop()
+ # self.event_bus_publisher = None
+ # log.debug('stopped-event-bus-publisher-kafka-proxy')
+ # except Exception, e:
+ # log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
+ # pass
log.debug('stopped-kafka-proxy')
@@ -105,8 +138,7 @@
pass
def _get_kafka_producer(self):
- # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
- # to a local log before sending response
+
try:
if self.kafka_endpoint.startswith('@'):
@@ -124,39 +156,125 @@
return
else:
_k_endpoint = self.kafka_endpoint
-
- self.kclient = _KafkaClient(_k_endpoint)
- self.kproducer = _kafkaProducer(self.kclient,
- req_acks=PRODUCER_ACK_NOT_REQUIRED,
- # req_acks=PRODUCER_ACK_LOCAL_WRITE,
- # ack_timeout=self.ack_timeout,
- # max_req_attempts=self.max_req_attempts)
- )
+ self.kproducer = _kafkaProducer(
+ {'bootstrap.servers': _k_endpoint,
+ }
+ )
+ pass
except Exception, e:
log.exception('failed-get-kafka-producer', e=e)
return
+ @inlineCallbacks
+ def _wait_for_messages(self, consumer, topic):
+ while True:
+ try:
+ msg = yield deferToThread(consumer.poll,
+ self.consumer_poll_timeout)
- # @inlineCallbacks
- # def wait_until_topic_is_ready(self, topic):
- # # Assumes "auto.create.topics.enable" is set in the broker
- # # configuration
- # e = True
- # while e:
- # yield self.kclient.load_metadata_for_topics(topic)
- # e = self.kclient.metadata_error_for_topic(topic)
- # if e:
- # log.debug("Topic-not-ready-retrying...", topic=topic)
+ if self.stopping:
+ log.debug("stop-request-recieved", topic=topic)
+ break
+ if msg is None:
+ continue
+ if msg.error():
+ # This typically is received when there are no more messages
+ # to read from kafka. Ignore.
+ continue
+
+ # Invoke callbacks
+ for cb in self.topic_callbacks_map[topic]:
+ yield cb(msg)
+ except Exception as e:
+ log.debug("exception-receiving-msg", topic=topic, e=e)
@inlineCallbacks
- def create_topic(self, topic):
- # Assume auto create is enabled on the broker configuration
- yield self.wait_until_topic_is_ready(topic)
+ def subscribe(self, topic, callback, groupId, offset='latest'):
+ """
+ subscribe allows a caller to subscribe to a given kafka topic. This API
+ always create a group consumer.
+ :param topic - the topic to subscribe to
+ :param callback - the callback to invoke whenever a message is received
+ on that topic
+ :param groupId - the groupId for this consumer. In the current
+ implementation there is a one-to-one mapping between a topic and a
+ groupId. In other words, once a groupId is used for a given topic then
+ we won't be able to create another groupId for the same topic.
+ :param offset: the kafka offset from where the consumer will start
+ consuming messages
+ """
+ try:
+ self.topic_any_map_lock.acquire()
+ if topic in self.topic_consumer_map:
+ # Just add the callback
+ if topic in self.topic_callbacks_map:
+ self.topic_callbacks_map[topic].append(callback)
+ else:
+ self.topic_callbacks_map[topic] = [callback]
+ return
+ # Create consumer for that topic
+ c = Consumer({
+ 'bootstrap.servers': self.kafka_endpoint,
+ 'group.id': groupId,
+ 'auto.offset.reset': offset
+ })
+ yield deferToThread(c.subscribe, [topic])
+ # c.subscribe([topic])
+ self.topic_consumer_map[topic] = c
+ self.topic_callbacks_map[topic] = [callback]
+ # Start the consumer
+ reactor.callLater(0, self._wait_for_messages, c, topic)
+ except Exception, e:
+ log.exception("topic-subscription-error", e=e)
+ finally:
+ self.topic_any_map_lock.release()
@inlineCallbacks
- def send_message(self, topic, msg):
+ def unsubscribe(self, topic, callback):
+ """
+ Unsubscribe to a given topic. Since there they be multiple callers
+ consuming from the same topic then to ensure only the relevant caller
+ gets unsubscribe then the callback is used as a differentiator. The
+ kafka consumer will be closed when there are no callbacks required.
+ :param topic: topic to unsubscribe
+ :param callback: callback the caller used when subscribing to the topic.
+ If multiple callers have subscribed to a topic using the same callback
+ then the first callback on the list will be removed.
+ :return:None
+ """
+ try:
+ self.topic_any_map_lock.acquire()
+ log.debug("unsubscribing-to-topic", topic=topic)
+ if topic in self.topic_callbacks_map:
+ index = 0
+ for cb in self.topic_callbacks_map[topic]:
+ if cb == callback:
+ break
+ index += 1
+ if index < len(self.topic_callbacks_map[topic]):
+ self.topic_callbacks_map[topic].pop(index)
+
+ if len(self.topic_callbacks_map[topic]) == 0:
+ # Stop the consumer
+ if topic in self.topic_consumer_map:
+ yield deferToThread(
+ self.topic_consumer_map[topic].close)
+ del self.topic_consumer_map[topic]
+ del self.topic_callbacks_map[topic]
+ log.debug("unsubscribed-to-topic", topic=topic)
+ else:
+ log.debug("consumers-for-topic-still-exist", topic=topic,
+ num=len(self.topic_callbacks_map[topic]))
+ except Exception, e:
+ log.exception("topic-unsubscription-error", e=e)
+ finally:
+ self.topic_any_map_lock.release()
+ log.debug("unsubscribing-to-topic-release-lock", topic=topic)
+
+ @inlineCallbacks
+ def send_message(self, topic, msg, key=None):
assert topic is not None
assert msg is not None
@@ -174,22 +292,23 @@
endpoint=self.kafka_endpoint)
return
- # log.debug('sending-kafka-msg', topic=topic, msg=msg)
+ log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg)
msgs = [msg]
- if self.kproducer and self.kclient and \
- self.event_bus_publisher and self.faulty is False:
- # log.debug('sending-kafka-msg-I-am-here0', time=int(round(time.time() * 1000)))
-
- yield self.kproducer.send_messages(topic, msgs=msgs)
- # self.kproducer.send_messages(topic, msgs=msgs)
- # log.debug('sent-kafka-msg', topic=topic, msg=msg)
+ if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
+ d = deferToThread(self.kproducer.produce, topic, msg, key)
+ yield d
+ log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg)
+ # send a lightweight poll to avoid an exception after 100k messages.
+ d1 = deferToThread(self.kproducer.poll, 0)
+ yield d1
else:
return
except Exception, e:
self.faulty = True
- log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
+ log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg,
+ e=e)
# set the kafka producer to None. This is needed if the
# kafka docker went down and comes back up with a different
diff --git a/python/adapters/ponsim_olt/main.py b/python/adapters/ponsim_olt/main.py
index 09b78fc..86b6be0 100755
--- a/python/adapters/ponsim_olt/main.py
+++ b/python/adapters/ponsim_olt/main.py
@@ -63,8 +63,8 @@
core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
interface=os.environ.get('INTERFACE', get_my_primary_interface()),
instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
- kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
- kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
+ kafka_adapter=os.environ.get('KAFKA_ADAPTER', '172.20.10.3:9092'),
+ kafka_cluster=os.environ.get('KAFKA_CLUSTER', '172.20.10.3:9092'),
backend=os.environ.get('BACKEND', 'none'),
retry_interval=os.environ.get('RETRY_INTERVAL', 2),
heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
@@ -380,7 +380,7 @@
# TODO: Add KV Store object reference
kv_store=self.args.backend,
default_topic=self.args.name,
- # Needs to assign a real class
+ group_id_prefix=self.args.instance_id,
target_cls=ponsim_request_handler
)
).start()
diff --git a/python/adapters/ponsim_onu/main.py b/python/adapters/ponsim_onu/main.py
index d6418e9..c4c12db 100755
--- a/python/adapters/ponsim_onu/main.py
+++ b/python/adapters/ponsim_onu/main.py
@@ -379,6 +379,7 @@
# TODO: Add KV Store object reference
kv_store=self.args.backend,
default_topic=self.args.name,
+ group_id_prefix=self.args.instance_id,
target_cls=ponsim_request_handler
)
).start()
diff --git a/rw_core/core/id.go b/rw_core/core/id.go
index b28151f..369b4f1 100644
--- a/rw_core/core/id.go
+++ b/rw_core/core/id.go
@@ -18,7 +18,10 @@
import (
"crypto/rand"
"encoding/hex"
+ "errors"
+ "fmt"
m "math/rand"
+ "strconv"
)
func randomHex(n int) (string, error) {
@@ -49,3 +52,16 @@
// A logical port is a uint32
return m.Uint32()
}
+
+func CreateDataPathId(idInHexString string) (uint64, error) {
+ if idInHexString == "" {
+ return 0, errors.New("id-empty")
+ }
+ // First prepend 0x to the string
+ newId := fmt.Sprintf("0x%s", idInHexString)
+ if d, err := strconv.ParseUint(newId, 0, 64); err != nil {
+ return 0, err
+ } else {
+ return d, nil
+ }
+}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 86b79f9..ee3d82a 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -75,7 +75,16 @@
log.Errorw("error-creating-logical-device", log.Fields{"error": err})
return err
}
+
ld := &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
+
+ // Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
+ var datapathID uint64
+ if datapathID, err = CreateDataPathId(agent.logicalDeviceId); err != nil {
+ log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
+ return err
+ }
+ ld.DatapathId = datapathID
ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
ld.Flows = &ofp.Flows{Items: nil}
@@ -596,7 +605,7 @@
groupsChanged := false
groupId := groupMod.GroupId
if idx := fu.FindGroup(groups, groupId); idx == -1 {
- return errors.New(fmt.Sprintf("group-absent:%s", groupId))
+ return errors.New(fmt.Sprintf("group-absent:%d", groupId))
} else {
//replace existing group entry with new group definition
groupEntry := fd.GroupEntryFromGroupMod(groupMod)
@@ -878,7 +887,7 @@
}
deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
- for deviceId, _ := range deviceNodeIds {
+ for deviceId := range deviceNodeIds {
if deviceId == ld.RootDeviceId {
rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
} else {
@@ -960,9 +969,14 @@
deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+ var err error
for deviceId, value := range deviceRules.GetRules() {
- agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
- agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+ if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
+ log.Error("update-flows-failed", log.Fields{"deviceID":deviceId})
+ }
+ if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
+ log.Error("update-groups-failed", log.Fields{"deviceID":deviceId})
+ }
}
return nil
@@ -996,11 +1010,16 @@
log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+ var err error
for deviceId, value := range deviceRules.GetRules() {
- agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
- agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
- }
+ if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
+ log.Error("update-flows-failed", log.Fields{"deviceID":deviceId})
+ }
+ if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
+ log.Error("update-groups-failed", log.Fields{"deviceID":deviceId})
+ }
+ }
return nil
}
@@ -1009,12 +1028,14 @@
outPort := fd.GetPacketOutPort(packet)
//frame := packet.GetData()
//TODO: Use a channel between the logical agent and the device agent
- agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet)
+ if err := agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet); err != nil {
+ log.Error("packetout-failed", log.Fields{"logicalDeviceID":agent.rootDeviceId})
+ }
}
func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
- packet_in := fd.MkPacketIn(port, packet)
- agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, packet_in)
- log.Debugw("sending-packet-in", log.Fields{"packet-in": packet_in})
+ packetIn := fd.MkPacketIn(port, packet)
+ agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, packetIn)
+ log.Debugw("sending-packet-in", log.Fields{"packet-in": packetIn})
}
diff --git a/rw_core/main.go b/rw_core/main.go
index dd830c1..336e731 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -62,7 +62,7 @@
return nil, errors.New("unsupported-kv-store")
}
-func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
log.Infow("kafka-client-type", log.Fields{"client": clientType})
switch clientType {
@@ -70,9 +70,15 @@
return kafka.NewSaramaClient(
kafka.Host(host),
kafka.Port(port),
+ kafka.ConsumerType(kafka.GroupCustomer),
kafka.ProducerReturnOnErrors(true),
kafka.ProducerReturnOnSuccess(true),
kafka.ProducerMaxRetries(6),
+ kafka.NumPartitions(3),
+ kafka.ConsumerGroupName(instanceID),
+ kafka.ConsumerGroupPrefix(instanceID),
+ kafka.AutoCreateTopic(false),
+ kafka.ProducerFlushFrequency(5),
kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
}
return nil, errors.New("unsupported-client-type")
@@ -126,7 +132,7 @@
}
// Setup Kafka Client
- if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort); err != nil {
+ if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort, rw.config.InstanceID); err != nil {
log.Fatal("Unsupported-kafka-client")
}
@@ -219,7 +225,7 @@
}
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
- //log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
defer log.CleanUp()