packet-in escapes twisted thread

Change-Id: I8dc0a5d51745643b4f8a33ef35bf5af95c5abbd3
diff --git a/voltha/adapters/openolt/openolt_data_model.py b/voltha/adapters/openolt/openolt_data_model.py
index afd5ce7..52bb511 100644
--- a/voltha/adapters/openolt/openolt_data_model.py
+++ b/voltha/adapters/openolt/openolt_data_model.py
@@ -16,7 +16,7 @@
 import collections
 import structlog
 import socket
-from scapy.layers.l2 import Ether
+from structlog import get_logger
 
 from voltha.adapters.openolt.openolt_utils import OpenoltUtils
 from voltha.protos.device_pb2 import Port, Device
@@ -30,6 +30,7 @@
 from voltha.protos.logical_device_pb2 import LogicalDevice
 from voltha.registry import registry
 
+log = get_logger()
 
 # Onu info cache is hashed on onu_id, serial number, gemport_id
 OnuId = collections.namedtuple('OnuId', ['intf_id', 'onu_id'])
@@ -327,7 +328,7 @@
         self.adapter_agent.receive_proxied_message(onu_device.proxy_address,
                                                    pkt)
 
-    def onu_send_packet_in(self, intf_type, intf_id, port_no, gemport_id, pkt):
+    def logical_port_num(self, intf_type, intf_id, port_no, gemport_id):
         if intf_type == "pon":
             if port_no:
                 logical_port_num = port_no
@@ -342,13 +343,10 @@
             logical_port_num = self.platform.intf_id_to_port_no(
                 intf_id,
                 Port.ETHERNET_NNI)
+        else:
+            raise ValueError('invalid intf_type=%s' % intf_type)
 
-        ether_pkt = Ether(pkt)
-
-        self.adapter_agent.send_packet_in(
-            logical_device_id=self.logical_device_id,
-            logical_port_no=logical_port_num,
-            packet=str(ether_pkt))
+        return logical_port_num
 
     # #######################################################################
     #
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index d750416..d3c55f9 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -25,6 +25,7 @@
 from voltha.adapters.openolt.openolt_utils import OpenoltUtils
 from voltha.adapters.openolt.openolt_grpc import OpenoltGrpc
 from voltha.adapters.openolt.openolt_indications import OpenoltIndications
+from voltha.adapters.openolt.openolt_kafka_admin import KAdmin
 
 
 class OpenoltDevice(object):
@@ -103,6 +104,8 @@
 
         self.device_info = None
 
+        self._kadmin = KAdmin()
+        self._kadmin.delete_topics(['openolt.ind'])
         self._grpc = None
         self.go_state_init()
 
diff --git a/voltha/adapters/openolt/openolt_indications.py b/voltha/adapters/openolt/openolt_indications.py
index c5401d3..54415ba 100644
--- a/voltha/adapters/openolt/openolt_indications.py
+++ b/voltha/adapters/openolt/openolt_indications.py
@@ -19,6 +19,8 @@
 from simplejson import loads
 from twisted.internet import reactor
 import structlog
+from scapy.layers.l2 import Ether, Packet
+from common.frameio.frameio import hexify
 
 from voltha.adapters.openolt.protos import openolt_pb2
 from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
@@ -65,7 +67,7 @@
         elif ind.HasField('omci_ind'):
             reactor.callFromThread(self.device.omci_indication, ind.omci_ind)
         elif ind.HasField('pkt_ind'):
-            reactor.callFromThread(self.device.packet_indication, ind.pkt_ind)
+            self.send_packet_in(ind.pkt_ind)
         elif ind.HasField('port_stats'):
             reactor.callFromThread(
                 self.device.stats_mgr.port_statistics_indication,
@@ -79,3 +81,40 @@
                 self.device.alarm_mgr.process_alarms, ind.alarm_ind)
         else:
             self.log.warn('unknown indication type')
+
+    def send_packet_in(self, pkt_indication):
+        self.log.debug("packet indication",
+                       intf_type=pkt_indication.intf_type,
+                       intf_id=pkt_indication.intf_id,
+                       port_no=pkt_indication.port_no,
+                       cookie=pkt_indication.cookie,
+                       gemport_id=pkt_indication.gemport_id,
+                       flow_id=pkt_indication.flow_id)
+        try:
+            logical_port_num = self.device.data_model.logical_port_num(
+                pkt_indication.intf_type,
+                pkt_indication.intf_id,
+                pkt_indication.port_no,
+                pkt_indication.gemport_id)
+        except ValueError:
+            self.log.error('No logical port found',
+                           intf_type=pkt_indication.intf_type,
+                           intf_id=pkt_indication.intf_id,
+                           port_no=pkt_indication.port_no,
+                           gemport_id=pkt_indication.gemport_id)
+            return
+
+        ether_pkt = Ether(pkt_indication.pkt)
+
+        if isinstance(ether_pkt, Packet):
+            ether_pkt = str(ether_pkt)
+
+        logical_device_id = self.device.data_model.logical_device_id
+        topic = 'packet-in:' + logical_device_id
+
+        self.log.debug('send-packet-in', logical_device_id=logical_device_id,
+                       logical_port_num=logical_port_num,
+                       packet=hexify(ether_pkt))
+
+        self.device.data_model.adapter_agent.event_bus.publish(
+            topic, (logical_port_num, str(ether_pkt)))
diff --git a/voltha/adapters/openolt/openolt_kafka_admin.py b/voltha/adapters/openolt/openolt_kafka_admin.py
new file mode 100644
index 0000000..bca17f0
--- /dev/null
+++ b/voltha/adapters/openolt/openolt_kafka_admin.py
@@ -0,0 +1,332 @@
+#!/usr/bin/env python
+#
+# Copyright 2018 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Example Admin clients.
+#
+
+from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigSource
+from confluent_kafka import KafkaException
+import sys
+import threading
+import logging
+from structlog import get_logger
+
+from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+
+logging.basicConfig()
+
+log = get_logger()
+
+class KAdmin(object):
+    def __init__(self):
+        kafka_proxy = get_kafka_proxy()
+        if kafka_proxy and not kafka_proxy.is_faulty():
+            kafka_endpoint = kafka_proxy.kafka_endpoint
+            log.debug('kafka-proxy-available', endpoint=kafka_endpoint)
+        else:
+            log.error('kafka-proxy-unavailable')
+            return
+
+        # Create Admin client
+        self.admin_client = AdminClient({'bootstrap.servers': kafka_endpoint})
+
+    def delete_topics(self, topics):
+        kafka_delete_topics(self.admin_client, topics, timeout=0)
+
+
+def kafka_create_topics(a, topics):
+    """ Create topics """
+
+    new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in topics]
+    # Call create_topics to asynchronously create topics, a dict
+    # of <topic,future> is returned.
+    fs = a.create_topics(new_topics)
+
+    # Wait for operation to finish.
+    # Timeouts are preferably controlled by passing request_timeout=15.0
+    # to the create_topics() call.
+    # All futures will finish at the same time.
+    for topic, f in fs.items():
+        try:
+            f.result()  # The result itself is None
+            print("Topic {} created".format(topic))
+        except Exception as e:
+            print("Failed to create topic {}: {}".format(topic, e))
+
+
+def kafka_delete_topics(a, topics, timeout=30):
+    """ delete topics """
+
+    # Call delete_topics to asynchronously delete topics, a future is returned.
+    # By default this operation on the broker returns immediately while
+    # topics are deleted in the background. But here we give it some time (30s)
+    # to propagate in the cluster before returning.
+    #
+    # Returns a dict of <topic,future>.
+    fs = a.delete_topics(topics, operation_timeout=timeout)
+
+    # Wait for operation to finish.
+    for topic, f in fs.items():
+        try:
+            f.result()  # The result itself is None
+            print("Topic {} deleted".format(topic))
+        except Exception as e:
+            print("Failed to delete topic {}: {}".format(topic, e))
+
+
+def kafka_create_partitions(a, topics):
+    """ create partitions """
+
+    new_parts = [NewPartitions(topic, int(new_total_count)) for
+                 topic, new_total_count in zip(topics[0::2], topics[1::2])]
+
+    # Try switching validate_only to True to only validate the operation
+    # on the broker but not actually perform it.
+    fs = a.create_partitions(new_parts, validate_only=False)
+
+    # Wait for operation to finish.
+    for topic, f in fs.items():
+        try:
+            f.result()  # The result itself is None
+            print("Additional partitions created for topic {}".format(topic))
+        except Exception as e:
+            print("Failed to add partitions to topic {}: {}".format(topic, e))
+
+
+def print_config(config, depth):
+    print('%40s = %-50s  [%s,is:read-only=%r,default=%r,sensitive=%r,synonym=%r,synonyms=%s]' %
+          ((' ' * depth) + config.name, config.value, ConfigSource(config.source),
+           config.is_read_only, config.is_default,
+           config.is_sensitive, config.is_synonym,
+           ["%s:%s" % (x.name, ConfigSource(x.source))
+            for x in iter(config.synonyms.values())]))
+
+
+def kafka_describe_configs(a, args):
+    """ describe configs """
+
+    resources = [ConfigResource(restype, resname) for
+                 restype, resname in zip(args[0::2], args[1::2])]
+
+    fs = a.describe_configs(resources)
+
+    # Wait for operation to finish.
+    for res, f in fs.items():
+        try:
+            configs = f.result()
+            for config in iter(configs.values()):
+                print_config(config, 1)
+
+        except KafkaException as e:
+            print("Failed to describe {}: {}".format(res, e))
+        except Exception:
+            raise
+
+
+def kafka_alter_configs(a, args):
+    """ Alter configs atomically, replacing non-specified
+    configuration properties with their default values.
+    """
+
+    resources = []
+    for restype, resname, configs in zip(args[0::3], args[1::3], args[2::3]):
+        resource = ConfigResource(restype, resname)
+        resources.append(resource)
+        for k, v in [conf.split('=') for conf in configs.split(',')]:
+            resource.set_config(k, v)
+
+    fs = a.alter_configs(resources)
+
+    # Wait for operation to finish.
+    for res, f in fs.items():
+        try:
+            f.result()  # empty, but raises exception on failure
+            print("{} configuration successfully altered".format(res))
+        except Exception:
+            raise
+
+
+def kafka_delta_alter_configs(a, args):
+    """
+    The AlterConfigs Kafka API requires all configuration to be passed,
+    any left out configuration properties will revert to their default settings.
+
+    This example shows how to just modify the supplied configuration entries
+    by first reading the configuration from the broker, updating the supplied
+    configuration with the broker configuration (without overwriting), and
+    then writing it all back.
+
+    The async nature of futures is also show-cased, which makes this example
+    a bit more complex than it needs to be in the synchronous case.
+    """
+
+    # Convert supplied config to resources.
+    # We can reuse the same resources both for describe_configs and
+    # alter_configs.
+    resources = []
+    for restype, resname, configs in zip(args[0::3], args[1::3], args[2::3]):
+        resource = ConfigResource(restype, resname)
+        resources.append(resource)
+        for k, v in [conf.split('=') for conf in configs.split(',')]:
+            resource.set_config(k, v)
+
+    # Set up a locked counter and an Event (for signaling) to track when the
+    # second level of futures are done. This is a bit of contrived example
+    # due to no other asynchronous mechanism being used, so we'll need
+    # to wait on something to signal completion.
+
+    class WaitZero(object):
+        def __init__(self, waitcnt):
+            self.cnt = waitcnt
+            self.lock = threading.Lock()
+            self.event = threading.Event()
+
+        def decr(self):
+            """ Decrement cnt by 1"""
+            with self.lock:
+                assert self.cnt > 0
+                self.cnt -= 1
+            self.event.set()
+
+        def wait(self):
+            """ Wait until cnt reaches 0 """
+            self.lock.acquire()
+            while self.cnt > 0:
+                self.lock.release()
+                self.event.wait()
+                self.event.clear()
+                self.lock.acquire()
+            self.lock.release()
+
+        def __len__(self):
+            with self.lock:
+                return self.cnt
+
+    wait_zero = WaitZero(len(resources))
+
+    # Read existing configuration from cluster
+    fs = a.describe_configs(resources)
+
+    def delta_alter_configs_done(fut, resource):
+        e = fut.exception()
+        if e is not None:
+            print("Config update for {} failed: {}".format(resource, e))
+        else:
+            print("Config for {} updated".format(resource))
+        wait_zero.decr()
+
+    def delta_alter_configs(resource, remote_config):
+        print("Updating {} supplied config entries {} with {} config entries read from cluster".format(
+            len(resource), resource, len(remote_config)))
+        # Only set configuration that is not default
+        for k, entry in [(k, v) for k, v in remote_config.items() if not v.is_default]:
+            resource.set_config(k, entry.value, overwrite=False)
+
+        fs = a.alter_configs([resource])
+        fs[resource].add_done_callback(lambda fut: delta_alter_configs_done(fut, resource))
+
+    # For each resource's future set up a completion callback
+    # that in turn calls alter_configs() on that single resource.
+    # This is ineffective since the resources can usually go in
+    # one single alter_configs() call, but we're also show-casing
+    # the futures here.
+    for res, f in fs.items():
+        f.add_done_callback(lambda fut, resource=res: delta_alter_configs(resource, fut.result()))
+
+    # Wait for done callbacks to be triggered and operations to complete.
+    print("Waiting for {} resource updates to finish".format(len(wait_zero)))
+    wait_zero.wait()
+
+
+def kafka_list(a, args):
+    """ list topics and cluster metadata """
+
+    if len(args) == 0:
+        what = "all"
+    else:
+        what = args[0]
+
+    md = a.list_topics(timeout=10)
+
+    print("Cluster {} metadata (response from broker {}):".format(md.cluster_id, md.orig_broker_name))
+
+    if what in ("all", "brokers"):
+        print(" {} brokers:".format(len(md.brokers)))
+        for b in iter(md.brokers.values()):
+            if b.id == md.controller_id:
+                print("  {}  (controller)".format(b))
+            else:
+                print("  {}".format(b))
+
+    if what not in ("all", "topics"):
+        return
+
+    print(" {} topics:".format(len(md.topics)))
+    for t in iter(md.topics.values()):
+        if t.error is not None:
+            errstr = ": {}".format(t.error)
+        else:
+            errstr = ""
+
+        print("  \"{}\" with {} partition(s){}".format(t, len(t.partitions), errstr))
+
+        for p in iter(t.partitions.values()):
+            if p.error is not None:
+                errstr = ": {}".format(p.error)
+            else:
+                errstr = ""
+
+            print("    partition {} leader: {}, replicas: {}, isrs: {}".format(
+                p.id, p.leader, p.replicas, p.isrs, errstr))
+
+
+if __name__ == '__main__':
+    if len(sys.argv) < 3:
+        sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
+        sys.stderr.write('operations:\n')
+        sys.stderr.write(' create_topics <topic1> <topic2> ..\n')
+        sys.stderr.write(' delete_topics <topic1> <topic2> ..\n')
+        sys.stderr.write(' create_partitions <topic1> <new_total_count1> <topic2> <new_total_count2> ..\n')
+        sys.stderr.write(' describe_configs <resource_type1> <resource_name1> <resource2> <resource_name2> ..\n')
+        sys.stderr.write(' alter_configs <resource_type1> <resource_name1> ' +
+                         '<config=val,config2=val2> <resource_type2> <resource_name2> <config..> ..\n')
+        sys.stderr.write(' delta_alter_configs <resource_type1> <resource_name1> ' +
+                         '<config=val,config2=val2> <resource_type2> <resource_name2> <config..> ..\n')
+        sys.stderr.write(' list [<all|topics|brokers>]\n')
+        sys.exit(1)
+
+    broker = sys.argv[1]
+    operation = sys.argv[2]
+    args = sys.argv[3:]
+
+    # Create Admin client
+    a = AdminClient({'bootstrap.servers': broker})
+
+    opsmap = {'create_topics': kafka_create_topics,
+              'delete_topics': kafka_delete_topics,
+              'create_partitions': kafka_create_partitions,
+              'describe_configs': kafka_describe_configs,
+              'alter_configs': kafka_alter_configs,
+              'delta_alter_configs': kafka_delta_alter_configs,
+              'list': kafka_list}
+
+    if operation not in opsmap:
+        sys.stderr.write('Unknown operation: %s\n' % operation)
+        sys.exit(1)
+
+    opsmap[operation](a, args)
diff --git a/voltha/adapters/openolt/openolt_kafka_consumer.py b/voltha/adapters/openolt/openolt_kafka_consumer.py
index 6d6e1f1..aa53db7 100644
--- a/voltha/adapters/openolt/openolt_kafka_consumer.py
+++ b/voltha/adapters/openolt/openolt_kafka_consumer.py
@@ -32,7 +32,7 @@
             self.kafka_endpoint = kafka_proxy.kafka_endpoint
             log.debug('kafka-proxy-available', endpoint=self.kafka_endpoint)
         else:
-            self.log.error('kafka-proxy-unavailable')
+            log.error('kafka-proxy-unavailable')
 
         conf = {'bootstrap.servers': self.kafka_endpoint,
                 'group.id': "mygroup"}