Nomralize component start()/stop()

Also fixed the /schema swagger/rest entry. It did not work
because the 3rdparty protobuf_to_dict library cannot handle
Map fields. Changed the two map fields to a single list
entry.

Change-Id: Ib25a528701b67d58d32451687724c8247da6efa5
diff --git a/chameleon/grpc_client/grpc_client.py b/chameleon/grpc_client/grpc_client.py
index 9129b2e..75c78eb 100644
--- a/chameleon/grpc_client/grpc_client.py
+++ b/chameleon/grpc_client/grpc_client.py
@@ -47,31 +47,39 @@
     """
     RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
 
-    def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055'):
+    def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055',
+                 reconnect_callback=None):
         self.consul_endpoint = consul_endpoint
         self.endpoint = endpoint
         self.work_dir = work_dir
+        self.reconnect_callback = reconnect_callback
+
         self.plugin_dir = os.path.abspath(os.path.join(
             os.path.dirname(__file__), '../protoc_plugins'))
 
         self.channel = None
         self.schema = None
         self.retries = 0
-        self.on_reconnect = None
         self.shutting_down = False
         self.connected = False
 
-    def run(self, on_reconnect=None):
-        self.on_reconnect = on_reconnect
+    def start(self):
+        log.debug('starting')
         if not self.connected:
             reactor.callLater(0, self.connect)
+        log.info('started')
         return self
 
-    def shutdown(self):
+    def stop(self):
+        log.debug('stopping')
         if self.shutting_down:
             return
         self.shutting_down = True
-        pass
+        log.info('stopped')
+
+    def set_reconnect_callback(self, reconnect_callback):
+        self.reconnect_callback = reconnect_callback
+        return self
 
     @inlineCallbacks
     def connect(self):
@@ -96,8 +104,8 @@
             self._clear_backoff()
 
             self.connected = True
-            if self.on_reconnect is not None:
-                reactor.callLater(0, self.on_reconnect)
+            if self.reconnect_callback is not None:
+                reactor.callLater(0, self.reconnect_callback)
 
             return
 
@@ -157,7 +165,7 @@
         assert isinstance(self.channel, grpc.Channel)
         stub = SchemaServiceStub(self.channel)
         # try:
-        schema = stub.GetSchema(NullMessage())
+        schemas = stub.GetSchema(NullMessage())
         # except _Rendezvous, e:
         #     if e.code == grpc.StatusCode.UNAVAILABLE:
         #
@@ -168,19 +176,20 @@
         os.system('rm -fr /tmp/%s/*' %
                   self.work_dir.replace('/tmp/', ''))  # safer
 
-        for fname in schema.protos:
-            content = schema.protos[fname]
-            log.debug('saving-proto',
-                      fname=fname, dir=self.work_dir, length=len(content))
-            with open(os.path.join(self.work_dir, fname), 'w') as f:
-                f.write(content)
+        for proto_file in schemas.protos:
+            proto_fname = proto_file.file_name
+            proto_content = proto_file.proto
+            log.debug('saving-proto', fname=proto_fname, dir=self.work_dir,
+                      length=len(proto_content))
+            with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
+                f.write(proto_content)
 
-        for fname in schema.descriptors:
-            content = decompress(schema.descriptors[fname])
-            log.debug('saving-descriptor',
-                      fname=fname, dir=self.work_dir, length=len(content))
-            with open(os.path.join(self.work_dir, fname), 'wb') as f:
-                f.write(content)
+            desc_content = decompress(proto_file.descriptor)
+            desc_fname = proto_fname.replace('.proto', '.desc')
+            log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir,
+                      length=len(desc_content))
+            with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
+                f.write(desc_content)
 
     def _compile_proto_files(self):
         """
diff --git a/chameleon/main.py b/chameleon/main.py
index 14c854b..0065b66 100755
--- a/chameleon/main.py
+++ b/chameleon/main.py
@@ -224,9 +224,9 @@
         self.grpc_client = yield \
             GrpcClient(args.consul, args.work_dir, args.grpc_endpoint)
         self.web_server = yield \
-            WebServer(args.rest_port, args.work_dir, self.grpc_client).run()
-        self.grpc_client.run(
-            on_reconnect=self.web_server.reload_generated_routes)
+            WebServer(args.rest_port, args.work_dir, self.grpc_client).start()
+        self.grpc_client.set_reconnect_callback(
+            self.web_server.reload_generated_routes).start()
         self.log.info('started-internal-services')
 
     @inlineCallbacks
@@ -234,9 +234,9 @@
         """Execute before the reactor is shut down"""
         self.log.info('exiting-on-keyboard-interrupt')
         if self.rest_server is not None:
-            yield self.rest_server.shutdown()
+            yield self.rest_server.stop()
         if self.grpc_client is not None:
-            yield self.grpc_client.shutdown()
+            yield self.grpc_client.stop()
 
     def start_reactor(self):
         from twisted.internet import reactor
diff --git a/chameleon/protoc_plugins/gw_gen.py b/chameleon/protoc_plugins/gw_gen.py
index 5398c32..20249c2 100755
--- a/chameleon/protoc_plugins/gw_gen.py
+++ b/chameleon/protoc_plugins/gw_gen.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import sys
 
 from google.protobuf.compiler import plugin_pb2 as plugin
@@ -56,11 +57,22 @@
         {% else %}
         riase NotImplementedError('cannot handle specific body field list')
         {% endif %}
-        req = dict_to_protobuf({{ method['input_type'] }}, data)
+        try:
+            req = dict_to_protobuf({{ method['input_type'] }}, data)
+        except Exception, e:
+            log.error('cannot-convert-to-protobuf', e=e, data=data)
+            raise
         res = grpc_client.invoke(
             {{ '.'.join([package, method['service']]) }}Stub,
             '{{ method['method'] }}', req)
-        out_data = protobuf_to_dict(res, use_enum_labels=True)
+        try:
+            out_data = protobuf_to_dict(res, use_enum_labels=True)
+        except AttributeError, e:
+            filename = '/tmp/chameleon_failed_to_convert_data.pbd'
+            with file(filename, 'w') as f:
+                f.write(res.SerializeToString())
+            log.error('cannot-convert-from-protobuf', outdata_saved=filename)
+            raise
         request.setHeader('Content-Type', 'application/json')
         log.debug('{{ method_name }}', **out_data)
         return dumps(out_data)
diff --git a/chameleon/protoc_plugins/schema2dict.py b/chameleon/protoc_plugins/schema2dict.py
new file mode 100644
index 0000000..31c0417
--- /dev/null
+++ b/chameleon/protoc_plugins/schema2dict.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Convert a schema.Schema object given on the standard input as protobuf data
+file (from standard output) to a Python dictionary.
+"""
+import json
+import sys
+from chameleon.protos.third_party.google.api import annotations_pb2
+_ = annotations_pb2
+from chameleon.protos import schema_pb2
+from protobuf_to_dict import protobuf_to_dict
+
+
+if __name__ == '__main__':
+
+    data = sys.stdin.read()
+    schemas = schema_pb2.Schemas()
+    schemas.ParseFromString(data)
+
+    data = protobuf_to_dict(schemas, use_enum_labels=True)
+    json.dump(data, sys.stdout)
diff --git a/chameleon/protos/schema.proto b/chameleon/protos/schema.proto
index 530d74a..2a1ec08 100644
--- a/chameleon/protos/schema.proto
+++ b/chameleon/protos/schema.proto
@@ -2,14 +2,18 @@
 
 package schema;
 
-// Proto file and compiled descriptor for this interface
-message Schema {
+// Contains the name and content of a *.proto file
+message ProtoFile {
+    string file_name = 1;  // name of proto file
+    string proto = 2;  // content of proto file
+    bytes descriptor = 3;  // compiled descriptor for proto (zlib compressed)
+}
 
-  // file name -> proto file content
-  map<string, string> protos = 1;
+// Proto files and compiled descriptors for this interface
+message Schemas {
 
-  // file name -> gzip compressed protobuf of descriptor
-  map<string, bytes> descriptors = 2;
+    // Proto files
+    repeated ProtoFile protos = 1;
 
 }
 
@@ -19,7 +23,7 @@
 // Schema services
 service SchemaService {
 
-  // Return active grpc schemas
-  rpc GetSchema(NullMessage) returns (Schema) {}
+    // Return active grpc schemas
+    rpc GetSchema(NullMessage) returns (Schemas) {}
 
 }
diff --git a/chameleon/protos/schema_pb2.py b/chameleon/protos/schema_pb2.py
index eab43f5..f5f17d2 100644
--- a/chameleon/protos/schema_pb2.py
+++ b/chameleon/protos/schema_pb2.py
@@ -19,67 +19,37 @@
   name='schema.proto',
   package='schema',
   syntax='proto3',
-  serialized_pb=_b('\n\x0cschema.proto\x12\x06schema\"\xcd\x01\n\x06Schema\x12*\n\x06protos\x18\x01 \x03(\x0b\x32\x1a.schema.Schema.ProtosEntry\x12\x34\n\x0b\x64\x65scriptors\x18\x02 \x03(\x0b\x32\x1f.schema.Schema.DescriptorsEntry\x1a-\n\x0bProtosEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x32\n\x10\x44\x65scriptorsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"\r\n\x0bNullMessage2C\n\rSchemaService\x12\x32\n\tGetSchema\x12\x13.schema.NullMessage\x1a\x0e.schema.Schema\"\x00\x62\x06proto3')
+  serialized_pb=_b('\n\x0cschema.proto\x12\x06schema\"A\n\tProtoFile\x12\x11\n\tfile_name\x18\x01 \x01(\t\x12\r\n\x05proto\x18\x02 \x01(\t\x12\x12\n\ndescriptor\x18\x03 \x01(\x0c\",\n\x07Schemas\x12!\n\x06protos\x18\x01 \x03(\x0b\x32\x11.schema.ProtoFile\"\r\n\x0bNullMessage2D\n\rSchemaService\x12\x33\n\tGetSchema\x12\x13.schema.NullMessage\x1a\x0f.schema.Schemas\"\x00\x62\x06proto3')
 )
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
 
 
 
 
-_SCHEMA_PROTOSENTRY = _descriptor.Descriptor(
-  name='ProtosEntry',
-  full_name='schema.Schema.ProtosEntry',
+_PROTOFILE = _descriptor.Descriptor(
+  name='ProtoFile',
+  full_name='schema.ProtoFile',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='key', full_name='schema.Schema.ProtosEntry.key', index=0,
+      name='file_name', full_name='schema.ProtoFile.file_name', index=0,
       number=1, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None),
     _descriptor.FieldDescriptor(
-      name='value', full_name='schema.Schema.ProtosEntry.value', index=1,
+      name='proto', full_name='schema.ProtoFile.proto', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=133,
-  serialized_end=178,
-)
-
-_SCHEMA_DESCRIPTORSENTRY = _descriptor.Descriptor(
-  name='DescriptorsEntry',
-  full_name='schema.Schema.DescriptorsEntry',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
     _descriptor.FieldDescriptor(
-      name='key', full_name='schema.Schema.DescriptorsEntry.key', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    _descriptor.FieldDescriptor(
-      name='value', full_name='schema.Schema.DescriptorsEntry.value', index=1,
-      number=2, type=12, cpp_type=9, label=1,
+      name='descriptor', full_name='schema.ProtoFile.descriptor', index=2,
+      number=3, type=12, cpp_type=9, label=1,
       has_default_value=False, default_value=_b(""),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
@@ -90,41 +60,35 @@
   nested_types=[],
   enum_types=[
   ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=180,
-  serialized_end=230,
+  serialized_start=24,
+  serialized_end=89,
 )
 
-_SCHEMA = _descriptor.Descriptor(
-  name='Schema',
-  full_name='schema.Schema',
+
+_SCHEMAS = _descriptor.Descriptor(
+  name='Schemas',
+  full_name='schema.Schemas',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='protos', full_name='schema.Schema.protos', index=0,
+      name='protos', full_name='schema.Schemas.protos', index=0,
       number=1, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None),
-    _descriptor.FieldDescriptor(
-      name='descriptors', full_name='schema.Schema.descriptors', index=1,
-      number=2, type=11, cpp_type=10, label=3,
-      has_default_value=False, default_value=[],
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
   ],
   extensions=[
   ],
-  nested_types=[_SCHEMA_PROTOSENTRY, _SCHEMA_DESCRIPTORSENTRY, ],
+  nested_types=[],
   enum_types=[
   ],
   options=None,
@@ -133,8 +97,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=25,
-  serialized_end=230,
+  serialized_start=91,
+  serialized_end=135,
 )
 
 
@@ -157,39 +121,28 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=232,
-  serialized_end=245,
+  serialized_start=137,
+  serialized_end=150,
 )
 
-_SCHEMA_PROTOSENTRY.containing_type = _SCHEMA
-_SCHEMA_DESCRIPTORSENTRY.containing_type = _SCHEMA
-_SCHEMA.fields_by_name['protos'].message_type = _SCHEMA_PROTOSENTRY
-_SCHEMA.fields_by_name['descriptors'].message_type = _SCHEMA_DESCRIPTORSENTRY
-DESCRIPTOR.message_types_by_name['Schema'] = _SCHEMA
+_SCHEMAS.fields_by_name['protos'].message_type = _PROTOFILE
+DESCRIPTOR.message_types_by_name['ProtoFile'] = _PROTOFILE
+DESCRIPTOR.message_types_by_name['Schemas'] = _SCHEMAS
 DESCRIPTOR.message_types_by_name['NullMessage'] = _NULLMESSAGE
 
-Schema = _reflection.GeneratedProtocolMessageType('Schema', (_message.Message,), dict(
-
-  ProtosEntry = _reflection.GeneratedProtocolMessageType('ProtosEntry', (_message.Message,), dict(
-    DESCRIPTOR = _SCHEMA_PROTOSENTRY,
-    __module__ = 'schema_pb2'
-    # @@protoc_insertion_point(class_scope:schema.Schema.ProtosEntry)
-    ))
-  ,
-
-  DescriptorsEntry = _reflection.GeneratedProtocolMessageType('DescriptorsEntry', (_message.Message,), dict(
-    DESCRIPTOR = _SCHEMA_DESCRIPTORSENTRY,
-    __module__ = 'schema_pb2'
-    # @@protoc_insertion_point(class_scope:schema.Schema.DescriptorsEntry)
-    ))
-  ,
-  DESCRIPTOR = _SCHEMA,
+ProtoFile = _reflection.GeneratedProtocolMessageType('ProtoFile', (_message.Message,), dict(
+  DESCRIPTOR = _PROTOFILE,
   __module__ = 'schema_pb2'
-  # @@protoc_insertion_point(class_scope:schema.Schema)
+  # @@protoc_insertion_point(class_scope:schema.ProtoFile)
   ))
-_sym_db.RegisterMessage(Schema)
-_sym_db.RegisterMessage(Schema.ProtosEntry)
-_sym_db.RegisterMessage(Schema.DescriptorsEntry)
+_sym_db.RegisterMessage(ProtoFile)
+
+Schemas = _reflection.GeneratedProtocolMessageType('Schemas', (_message.Message,), dict(
+  DESCRIPTOR = _SCHEMAS,
+  __module__ = 'schema_pb2'
+  # @@protoc_insertion_point(class_scope:schema.Schemas)
+  ))
+_sym_db.RegisterMessage(Schemas)
 
 NullMessage = _reflection.GeneratedProtocolMessageType('NullMessage', (_message.Message,), dict(
   DESCRIPTOR = _NULLMESSAGE,
@@ -199,10 +152,6 @@
 _sym_db.RegisterMessage(NullMessage)
 
 
-_SCHEMA_PROTOSENTRY.has_options = True
-_SCHEMA_PROTOSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
-_SCHEMA_DESCRIPTORSENTRY.has_options = True
-_SCHEMA_DESCRIPTORSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 import grpc
 from grpc.beta import implementations as beta_implementations
 from grpc.beta import interfaces as beta_interfaces
@@ -223,7 +172,7 @@
     self.GetSchema = channel.unary_unary(
         '/schema.SchemaService/GetSchema',
         request_serializer=NullMessage.SerializeToString,
-        response_deserializer=Schema.FromString,
+        response_deserializer=Schemas.FromString,
         )
 
 
@@ -244,7 +193,7 @@
       'GetSchema': grpc.unary_unary_rpc_method_handler(
           servicer.GetSchema,
           request_deserializer=NullMessage.FromString,
-          response_serializer=Schema.SerializeToString,
+          response_serializer=Schemas.SerializeToString,
       ),
   }
   generic_handler = grpc.method_handlers_generic_handler(
@@ -276,7 +225,7 @@
     ('schema.SchemaService', 'GetSchema'): NullMessage.FromString,
   }
   response_serializers = {
-    ('schema.SchemaService', 'GetSchema'): Schema.SerializeToString,
+    ('schema.SchemaService', 'GetSchema'): Schemas.SerializeToString,
   }
   method_implementations = {
     ('schema.SchemaService', 'GetSchema'): face_utilities.unary_unary_inline(servicer.GetSchema),
@@ -290,7 +239,7 @@
     ('schema.SchemaService', 'GetSchema'): NullMessage.SerializeToString,
   }
   response_deserializers = {
-    ('schema.SchemaService', 'GetSchema'): Schema.FromString,
+    ('schema.SchemaService', 'GetSchema'): Schemas.FromString,
   }
   cardinalities = {
     'GetSchema': cardinality.Cardinality.UNARY_UNARY,
diff --git a/chameleon/web_server/web_server.py b/chameleon/web_server/web_server.py
index c46ad88..c96be0f 100644
--- a/chameleon/web_server/web_server.py
+++ b/chameleon/web_server/web_server.py
@@ -47,11 +47,22 @@
         self.shutting_down = False
 
     @inlineCallbacks
-    def run(self):
+    def start(self):
+        log.debug('starting')
         yield self._open_endpoint()
+        log.info('started')
         returnValue(self)
 
     @inlineCallbacks
+    def stop(self):
+        log.debug('stopping')
+        self.shutting_down = True
+        if self.tcp_port is not None:
+            assert isinstance(self.tcp_port, Port)
+            yield self.tcp_port.socket.close()
+        log.info('stopped')
+
+    @inlineCallbacks
     def _open_endpoint(self):
         endpoint = endpoints.TCP4ServerEndpoint(reactor, self.port)
         self.site = Site(self.app.resource())
@@ -59,13 +70,6 @@
         log.info('web-server-started', port=self.port)
         self.endpoint = endpoint
 
-    @inlineCallbacks
-    def shutdown(self):
-        self.shutting_down = True
-        if self.tcp_port is not None:
-            assert isinstance(self.tcp_port, Port)
-            yield self.tcp_port.socket.close()
-
     def reload_generated_routes(self):
         for fname in os.listdir(self.work_dir):
             if fname.endswith('_gw.py'):
diff --git a/ofagent/agent.py b/ofagent/agent.py
index ab1b868..cabcc6c 100644
--- a/ofagent/agent.py
+++ b/ofagent/agent.py
@@ -55,13 +55,22 @@
     def get_device_id(self):
         return self.device_id
 
-    def run(self):
+    def start(self):
+        log.debug('starting')
         if self.running:
             return
         self.running = True
         reactor.callLater(0, self.keep_connected)
+        log.info('started')
         return self
 
+    def stop(self):
+        log.debug('stopping')
+        self.connected = False
+        self.exiting = True
+        self.connector.disconnect()
+        log.info('stopped')
+
     def resolve_endpoint(self, endpoint):
         # TODO allow resolution via consul
         host, port = endpoint.split(':', 2)
@@ -79,24 +88,19 @@
             log.debug('reconnect', after_delay=self.retry_interval)
             yield asleep(self.retry_interval)
 
-    def stop(self):
-        self.connected = False
-        self.exiting = True
-        self.connector.disconnect()
-        log.info('stopped')
-
     def enter_disconnected(self, event, reason):
         """Internally signal entering disconnected state"""
-        log.error(event, reason=reason)
         self.connected = False
-        self.d_disconnected.callback(None)
+        if not self.exiting:
+            log.error(event, reason=reason)
+            self.d_disconnected.callback(None)
 
     def enter_connected(self):
         """Handle transitioning from disconnected to connected state"""
         log.info('connected')
         self.connected = True
         self.read_buffer = None
-        reactor.callLater(0, self.proto_handler.run)
+        reactor.callLater(0, self.proto_handler.start)
 
     # protocol.ClientFactory methods
 
@@ -110,8 +114,9 @@
         self.enter_disconnected('connection-failed', reason)
 
     def clientConnectionLost(self, connector, reason):
-        log.error('client-connection-lost',
-                  reason=reason, connector=connector)
+        if not self.exiting:
+            log.error('client-connection-lost',
+                      reason=reason, connector=connector)
 
     def forward_packet_in(self, ofp_packet_in):
         self.proto_handler.forward_packet_in(ofp_packet_in)
@@ -147,7 +152,7 @@
             return ports
 
     stub = MockRpc()
-    agents = [Agent('localhost:6633', 256 + i, stub).run() for i in range(n)]
+    agents = [Agent('localhost:6633', 256 + i, stub).start() for i in range(n)]
 
     def shutdown():
         [a.stop() for a in agents]
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 13fb6de..0bb2f89 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -23,13 +23,14 @@
 from common.utils.consulhelpers import get_endpoint_from_consul
 from structlog import get_logger
 import grpc
+from ofagent.protos import third_party
 from protos import voltha_pb2
 from grpc_client import GrpcClient
 
 from agent import Agent
 
 log = get_logger()
-
+_ = third_party
 
 class ConnectionManager(object):
 
@@ -52,12 +53,12 @@
 
         self.running = False
 
-    def run(self):
+    def start(self):
 
         if self.running:
             return
 
-        log.info('run-connection-manager')
+        log.debug('starting')
 
         self.running = True
 
@@ -65,19 +66,24 @@
         self.channel = self.get_grpc_channel_with_voltha()
 
         # Create shared gRPC API object
-        self.grpc_client = GrpcClient(self, self.channel)
+        self.grpc_client = GrpcClient(self, self.channel).start()
 
         # Start monitoring logical devices and manage agents accordingly
         reactor.callLater(0, self.monitor_logical_devices)
 
+        log.info('started')
+
         return self
 
-    def shutdown(self):
+    def stop(self):
+        log.debug('stopping')
         # clean up all controller connections
-        for _, value in enumerate(self.agent_map):
-            value.stop()
+        for agent in self.agent_map.itervalues():
+            agent.stop()
         self.running = False
-        # TODO: close grpc connection to voltha
+        self.grpc_client.stop()
+        del self.channel
+        log.info('stopped')
 
     def resolve_endpoint(self, endpoint):
         ip_port_endpoint = endpoint
@@ -170,7 +176,7 @@
         device_id = device.id
         agent = Agent(self.controller_endpoint, datapath_id,
                       device_id, self.grpc_client)
-        agent.run()
+        agent.start()
         self.agent_map[datapath_id] = agent
         self.device_id_to_datapath_id_map[device_id] = datapath_id
 
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index efbc038..5b6b0a8 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -17,8 +17,10 @@
 """
 The gRPC client layer for the OpenFlow agent
 """
-from Queue import Queue
+from Queue import Queue, Empty
 
+from grpc import StatusCode
+from grpc._channel import _Rendezvous
 from structlog import get_logger
 from twisted.internet import reactor
 from twisted.internet import threads
@@ -39,18 +41,35 @@
         self.channel = channel
         self.logical_stub = VolthaLogicalLayerStub(channel)
 
+        self.stopped = False
+
         self.packet_out_queue = Queue()  # queue to send out PacketOut msgs
         self.packet_in_queue = DeferredQueue()  # queue to receive PacketIn
+
+    def start(self):
+        log.debug('starting')
         self.start_packet_out_stream()
         self.start_packet_in_stream()
         reactor.callLater(0, self.packet_in_forwarder_loop)
+        log.info('started')
+        return self
+
+    def stop(self):
+        log.debug('stopping')
+        self.stopped = True
+        log.info('stopped')
 
     def start_packet_out_stream(self):
 
         def packet_generator():
             while 1:
-                packet = self.packet_out_queue.get(block=True)
-                yield packet
+                try:
+                    packet = self.packet_out_queue.get(block=True, timeout=1.0)
+                except Empty:
+                    if self.stopped:
+                        return
+                else:
+                    yield packet
 
         def stream_packets_out():
             generator = packet_generator()
@@ -61,8 +80,11 @@
     def start_packet_in_stream(self):
 
         def receive_packet_in_stream():
-            for packet_in in self.logical_stub.ReceivePacketsIn(NullMessage()):
-                reactor.callFromThread(self.packet_in_queue.put, packet_in)
+            streaming_rpc_method = self.logical_stub.ReceivePacketsIn
+            iterator = streaming_rpc_method(NullMessage())
+            for packet_in in iterator:
+                reactor.callFromThread(self.packet_in_queue.put,
+                                       packet_in)
                 log.debug('enqued-packet-in',
                           packet_in=packet_in,
                           queue_len=len(self.packet_in_queue.pending))
@@ -76,6 +98,8 @@
             device_id = packet_in.id
             ofp_packet_in = packet_in.packet_in
             self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
+            if self.stopped:
+                break
 
     def send_packet_out(self, device_id, packet_out):
         packet_out = PacketOut(id=device_id, packet_out=packet_out)
diff --git a/ofagent/loxi/connection.py b/ofagent/loxi/connection.py
index f74ff20..0882ac4 100644
--- a/ofagent/loxi/connection.py
+++ b/ofagent/loxi/connection.py
@@ -45,7 +45,7 @@
         self.finished = False
         self.read_buffer = None
 
-    def run(self):
+    def start(self):
         while not self.finished:
             try:
                 rd, wr, err = select.select([self.sock, self.wakeup_rd], [], [])
diff --git a/ofagent/main.py b/ofagent/main.py
index 5b21692..1d6149e 100755
--- a/ofagent/main.py
+++ b/ofagent/main.py
@@ -16,14 +16,10 @@
 #
 import argparse
 import os
-import sys
 import yaml
+from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks
 
-base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-sys.path.append(base_dir)
-sys.path.append(os.path.join(base_dir, '/ofagent/protos/third_party'))
-
 from common.utils.dockerhelpers import get_my_containers_name
 from common.utils.nethelpers import get_my_primary_local_ipv4
 from common.utils.structlog_setup import setup_logging
@@ -216,7 +212,7 @@
         self.log.info('starting-internal-components')
         args = self.args
         self.connection_manager = yield ConnectionManager(
-            args.consul, args.grpc_endpoint, args.controller).run()
+            args.consul, args.grpc_endpoint, args.controller).start()
         self.log.info('started-internal-services')
 
     @inlineCallbacks
@@ -225,10 +221,9 @@
         self.log.info('exiting-on-keyboard-interrupt')
         self.exiting = True
         if self.connection_manager is not None:
-            yield self.connection_manager.shutdown()
+            yield self.connection_manager.stop()
 
     def start_reactor(self):
-        from twisted.internet import reactor
         reactor.callWhenRunning(
             lambda: self.log.info('twisted-reactor-started'))
 
diff --git a/ofagent/of_protocol_handler.py b/ofagent/of_protocol_handler.py
index e109782..1c284eb 100644
--- a/ofagent/of_protocol_handler.py
+++ b/ofagent/of_protocol_handler.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 #
 import structlog
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
 
 import loxi.of13 as ofp
 from converter import to_loxi, pb2dict, to_grpc
@@ -44,9 +44,11 @@
         self.rpc = rpc
 
     @inlineCallbacks
-    def run(self):
+    def start(self):
         """A new call is made after a fresh reconnect"""
 
+        log.debug('starting')
+
         try:
             # send initial hello message
             self.cxn.send(ofp.message.hello())
@@ -67,6 +69,14 @@
         except Exception, e:
             log.exception('exception', e=e)
 
+        log.info('started')
+        returnValue(self)
+
+    def stop(self):
+        log.debug('stopping')
+        pass  # nothing to do yet
+        log.info('stopped')
+
     def handle_echo_request(self, req):
         self.cxn.send(ofp.message.echo_reply(xid=req.xid))
 
diff --git a/ofagent/protos/schema_pb2.py b/ofagent/protos/schema_pb2.py
index f8c5e46..9f54c3a 100644
--- a/ofagent/protos/schema_pb2.py
+++ b/ofagent/protos/schema_pb2.py
@@ -20,7 +20,7 @@
   name='schema.proto',
   package='schema',
   syntax='proto3',
-  serialized_pb=_b('\n\x0cschema.proto\x12\x06schema\x1a\x1cgoogle/api/annotations.proto\"\xcd\x01\n\x06Schema\x12*\n\x06protos\x18\x01 \x03(\x0b\x32\x1a.schema.Schema.ProtosEntry\x12\x34\n\x0b\x64\x65scriptors\x18\x02 \x03(\x0b\x32\x1f.schema.Schema.DescriptorsEntry\x1a-\n\x0bProtosEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x32\n\x10\x44\x65scriptorsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"\r\n\x0bNullMessage2R\n\rSchemaService\x12\x41\n\tGetSchema\x12\x13.schema.NullMessage\x1a\x0e.schema.Schema\"\x0f\x82\xd3\xe4\x93\x02\t\x12\x07/schemab\x06proto3')
+  serialized_pb=_b('\n\x0cschema.proto\x12\x06schema\x1a\x1cgoogle/api/annotations.proto\"A\n\tProtoFile\x12\x11\n\tfile_name\x18\x01 \x01(\t\x12\r\n\x05proto\x18\x02 \x01(\t\x12\x12\n\ndescriptor\x18\x03 \x01(\x0c\",\n\x07Schemas\x12!\n\x06protos\x18\x01 \x03(\x0b\x32\x11.schema.ProtoFile\"\r\n\x0bNullMessage2S\n\rSchemaService\x12\x42\n\tGetSchema\x12\x13.schema.NullMessage\x1a\x0f.schema.Schemas\"\x0f\x82\xd3\xe4\x93\x02\t\x12\x07/schemab\x06proto3')
   ,
   dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,])
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
@@ -28,60 +28,30 @@
 
 
 
-_SCHEMA_PROTOSENTRY = _descriptor.Descriptor(
-  name='ProtosEntry',
-  full_name='schema.Schema.ProtosEntry',
+_PROTOFILE = _descriptor.Descriptor(
+  name='ProtoFile',
+  full_name='schema.ProtoFile',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='key', full_name='schema.Schema.ProtosEntry.key', index=0,
+      name='file_name', full_name='schema.ProtoFile.file_name', index=0,
       number=1, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None),
     _descriptor.FieldDescriptor(
-      name='value', full_name='schema.Schema.ProtosEntry.value', index=1,
+      name='proto', full_name='schema.ProtoFile.proto', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=163,
-  serialized_end=208,
-)
-
-_SCHEMA_DESCRIPTORSENTRY = _descriptor.Descriptor(
-  name='DescriptorsEntry',
-  full_name='schema.Schema.DescriptorsEntry',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
     _descriptor.FieldDescriptor(
-      name='key', full_name='schema.Schema.DescriptorsEntry.key', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    _descriptor.FieldDescriptor(
-      name='value', full_name='schema.Schema.DescriptorsEntry.value', index=1,
-      number=2, type=12, cpp_type=9, label=1,
+      name='descriptor', full_name='schema.ProtoFile.descriptor', index=2,
+      number=3, type=12, cpp_type=9, label=1,
       has_default_value=False, default_value=_b(""),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
@@ -92,41 +62,35 @@
   nested_types=[],
   enum_types=[
   ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=210,
-  serialized_end=260,
+  serialized_start=54,
+  serialized_end=119,
 )
 
-_SCHEMA = _descriptor.Descriptor(
-  name='Schema',
-  full_name='schema.Schema',
+
+_SCHEMAS = _descriptor.Descriptor(
+  name='Schemas',
+  full_name='schema.Schemas',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='protos', full_name='schema.Schema.protos', index=0,
+      name='protos', full_name='schema.Schemas.protos', index=0,
       number=1, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None),
-    _descriptor.FieldDescriptor(
-      name='descriptors', full_name='schema.Schema.descriptors', index=1,
-      number=2, type=11, cpp_type=10, label=3,
-      has_default_value=False, default_value=[],
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
   ],
   extensions=[
   ],
-  nested_types=[_SCHEMA_PROTOSENTRY, _SCHEMA_DESCRIPTORSENTRY, ],
+  nested_types=[],
   enum_types=[
   ],
   options=None,
@@ -135,8 +99,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=55,
-  serialized_end=260,
+  serialized_start=121,
+  serialized_end=165,
 )
 
 
@@ -159,39 +123,28 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=262,
-  serialized_end=275,
+  serialized_start=167,
+  serialized_end=180,
 )
 
-_SCHEMA_PROTOSENTRY.containing_type = _SCHEMA
-_SCHEMA_DESCRIPTORSENTRY.containing_type = _SCHEMA
-_SCHEMA.fields_by_name['protos'].message_type = _SCHEMA_PROTOSENTRY
-_SCHEMA.fields_by_name['descriptors'].message_type = _SCHEMA_DESCRIPTORSENTRY
-DESCRIPTOR.message_types_by_name['Schema'] = _SCHEMA
+_SCHEMAS.fields_by_name['protos'].message_type = _PROTOFILE
+DESCRIPTOR.message_types_by_name['ProtoFile'] = _PROTOFILE
+DESCRIPTOR.message_types_by_name['Schemas'] = _SCHEMAS
 DESCRIPTOR.message_types_by_name['NullMessage'] = _NULLMESSAGE
 
-Schema = _reflection.GeneratedProtocolMessageType('Schema', (_message.Message,), dict(
-
-  ProtosEntry = _reflection.GeneratedProtocolMessageType('ProtosEntry', (_message.Message,), dict(
-    DESCRIPTOR = _SCHEMA_PROTOSENTRY,
-    __module__ = 'schema_pb2'
-    # @@protoc_insertion_point(class_scope:schema.Schema.ProtosEntry)
-    ))
-  ,
-
-  DescriptorsEntry = _reflection.GeneratedProtocolMessageType('DescriptorsEntry', (_message.Message,), dict(
-    DESCRIPTOR = _SCHEMA_DESCRIPTORSENTRY,
-    __module__ = 'schema_pb2'
-    # @@protoc_insertion_point(class_scope:schema.Schema.DescriptorsEntry)
-    ))
-  ,
-  DESCRIPTOR = _SCHEMA,
+ProtoFile = _reflection.GeneratedProtocolMessageType('ProtoFile', (_message.Message,), dict(
+  DESCRIPTOR = _PROTOFILE,
   __module__ = 'schema_pb2'
-  # @@protoc_insertion_point(class_scope:schema.Schema)
+  # @@protoc_insertion_point(class_scope:schema.ProtoFile)
   ))
-_sym_db.RegisterMessage(Schema)
-_sym_db.RegisterMessage(Schema.ProtosEntry)
-_sym_db.RegisterMessage(Schema.DescriptorsEntry)
+_sym_db.RegisterMessage(ProtoFile)
+
+Schemas = _reflection.GeneratedProtocolMessageType('Schemas', (_message.Message,), dict(
+  DESCRIPTOR = _SCHEMAS,
+  __module__ = 'schema_pb2'
+  # @@protoc_insertion_point(class_scope:schema.Schemas)
+  ))
+_sym_db.RegisterMessage(Schemas)
 
 NullMessage = _reflection.GeneratedProtocolMessageType('NullMessage', (_message.Message,), dict(
   DESCRIPTOR = _NULLMESSAGE,
@@ -201,10 +154,6 @@
 _sym_db.RegisterMessage(NullMessage)
 
 
-_SCHEMA_PROTOSENTRY.has_options = True
-_SCHEMA_PROTOSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
-_SCHEMA_DESCRIPTORSENTRY.has_options = True
-_SCHEMA_DESCRIPTORSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 import grpc
 from grpc.beta import implementations as beta_implementations
 from grpc.beta import interfaces as beta_interfaces
@@ -225,7 +174,7 @@
     self.GetSchema = channel.unary_unary(
         '/schema.SchemaService/GetSchema',
         request_serializer=NullMessage.SerializeToString,
-        response_deserializer=Schema.FromString,
+        response_deserializer=Schemas.FromString,
         )
 
 
@@ -246,7 +195,7 @@
       'GetSchema': grpc.unary_unary_rpc_method_handler(
           servicer.GetSchema,
           request_deserializer=NullMessage.FromString,
-          response_serializer=Schema.SerializeToString,
+          response_serializer=Schemas.SerializeToString,
       ),
   }
   generic_handler = grpc.method_handlers_generic_handler(
@@ -278,7 +227,7 @@
     ('schema.SchemaService', 'GetSchema'): NullMessage.FromString,
   }
   response_serializers = {
-    ('schema.SchemaService', 'GetSchema'): Schema.SerializeToString,
+    ('schema.SchemaService', 'GetSchema'): Schemas.SerializeToString,
   }
   method_implementations = {
     ('schema.SchemaService', 'GetSchema'): face_utilities.unary_unary_inline(servicer.GetSchema),
@@ -292,7 +241,7 @@
     ('schema.SchemaService', 'GetSchema'): NullMessage.SerializeToString,
   }
   response_deserializers = {
-    ('schema.SchemaService', 'GetSchema'): Schema.FromString,
+    ('schema.SchemaService', 'GetSchema'): Schemas.FromString,
   }
   cardinalities = {
     'GetSchema': cardinality.Cardinality.UNARY_UNARY,
diff --git a/ofagent/protos/third_party/__init__.py b/ofagent/protos/third_party/__init__.py
index 6dab4e7..0608e8c 100644
--- a/ofagent/protos/third_party/__init__.py
+++ b/ofagent/protos/third_party/__init__.py
@@ -38,7 +38,7 @@
     def load_module(self, name):
         if name in sys.modules:
             return sys.modules[name]
-        full_name = 'ofagent.protos.third_party.' + name
+        full_name = 'voltha.protos.third_party.' + name
         import_module(full_name)
         module = sys.modules[full_name]
         sys.modules[name] = module
diff --git a/voltha/adapters/microsemi/chat.py b/voltha/adapters/microsemi/chat.py
index 484f6c3..80b026b 100755
--- a/voltha/adapters/microsemi/chat.py
+++ b/voltha/adapters/microsemi/chat.py
@@ -628,7 +628,7 @@
         self.iface = iface
         self.finished = False
 
-    def run(self):
+    def start(self):
         self.sock = s = conf.L2listen( type=ETH_P_ALL, iface=self.iface, filter='inbound')
         while not self.finished:
             try:
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 1c8c57b..39836b8 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -28,6 +28,8 @@
 from common.utils.asleep import asleep
 from worker import Worker
 
+log = get_logger()
+
 
 class StaleMembershipEntryException(Exception):
     pass
@@ -48,11 +50,6 @@
 
     CONNECT_RETRY_INTERVAL_SEC = 1
     RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
-    #LEADER_KEY = 'service/voltha/leader'
-
-    #MEMBERSHIP_PREFIX = 'service/voltha/members/'
-    #ASSIGNMENT_PREFIX = 'service/voltha/assignments/'
-    #WORKLOAD_PREFIX = 'service/voltha/work/'
 
     # Public methods:
 
@@ -62,15 +59,12 @@
                  instance_id,
                  rest_port,
                  config,
-                 consul='localhost:8500',
-                 leader_class=Leader):
+                 consul='localhost:8500'):
 
-        self.log = get_logger()
-        self.log.info('initializing-coordinator')
+        log.info('initializing-coordinator')
         self.config = config['coordinator']
         self.worker_config = config['worker']
         self.leader_config = config['leader']
-        #self.log.info('config: %r' % self.config)
         self.membership_watch_relatch_delay = config.get(
             'membership_watch_relatch_delay', 0.1)
         self.tracking_loop_delay = config.get(
@@ -106,19 +100,23 @@
         # TODO need to handle reconnect events properly
         self.consul = Consul(host=host, port=port)
 
-        reactor.callLater(0, self._async_init)
-        self.log.info('initialized-coordinator')
-
         self.wait_for_leader_deferreds = []
 
+    def start(self):
+        log.debug('starting')
+        reactor.callLater(0, self._async_init)
+        log.info('started')
+
     @inlineCallbacks
-    def shutdown(self):
+    def stop(self):
+        log.debug('stopping')
         self.shutting_down = True
         yield self._delete_session()  # this will delete the leader lock too
-        yield self.worker.halt()
+        yield self.worker.stop()
         if self.leader is not None:
             yield self.leader.halt()
             self.leader = None
+        log.info('stopped')
 
     def wait_for_a_leader(self):
         """
@@ -158,12 +156,12 @@
         wait_time = self.RETRY_BACKOFF[min(self.retries,
                                            len(self.RETRY_BACKOFF) - 1)]
         self.retries += 1
-        self.log.error(msg, retry_in=wait_time)
+        log.error(msg, retry_in=wait_time)
         return asleep(wait_time)
 
     def _clear_backoff(self):
         if self.retries:
-            self.log.info('reconnected-to-consul', after_retries=self.retries)
+            log.info('reconnected-to-consul', after_retries=self.retries)
             self.retries = 0
 
     @inlineCallbacks
@@ -174,9 +172,9 @@
             try:
                 result = yield self.consul.session.renew(
                     session_id=self.session_id)
-                self.log.debug('just renewed session', result=result)
+                log.debug('just renewed session', result=result)
             except Exception, e:
-                self.log.exception('could-not-renew-session', e=e)
+                log.exception('could-not-renew-session', e=e)
 
         @inlineCallbacks
         def _create_session():
@@ -184,7 +182,7 @@
             # create consul session
             self.session_id = yield self.consul.session.create(
                 behavior='delete', ttl=10, lock_delay=1)
-            self.log.info('created-consul-session', session_id=self.session_id)
+            log.info('created-consul-session', session_id=self.session_id)
 
             # start renewing session it 3 times within the ttl
             lc = LoopingCall(_renew_session)
@@ -217,14 +215,14 @@
                 (index, record) = yield self._retry(self.consul.kv.get,
                                                     self.membership_record_key,
                                                     index=index)
-                self.log.debug('membership-record-change-detected',
+                log.debug('membership-record-change-detected',
                                index=index, record=record)
                 if record is None or record['Session'] != self.session_id:
-                    self.log.debug('remaking-membership-record')
+                    log.debug('remaking-membership-record')
                     yield self._retry(self._do_create_membership_record)
 
         except Exception, e:
-            self.log.exception('unexpected-error-leader-trackin', e=e)
+            log.exception('unexpected-error-leader-trackin', e=e)
 
         finally:
             # except in shutdown, the loop must continue (after a short delay)
@@ -245,7 +243,7 @@
             # is then the value under the leader key service/voltha/leader.
 
             # attempt acquire leader lock
-            self.log.debug('leadership-attempt')
+            log.debug('leadership-attempt')
             result = yield self._retry(self.consul.kv.put,
                                        self.leader_prefix,
                                        self.instance_id,
@@ -258,7 +256,7 @@
             # the returned record can be None. Handle it.
             (index, record) = yield self._retry(self.consul.kv.get,
                                                 self.leader_prefix)
-            self.log.debug('leadership-key',
+            log.debug('leadership-key',
                            i_am_leader=result, index=index, record=record)
 
             if record is not None:
@@ -280,7 +278,7 @@
                 (index, updated) = yield self._retry(self.consul.kv.get,
                                                      self.leader_prefix,
                                                      index=index)
-                self.log.debug('leader-key-change',
+                log.debug('leader-key-change',
                                index=index, updated=updated)
                 if updated is None or updated != last:
                     # leadership has changed or vacated (or forcefully
@@ -289,7 +287,7 @@
                 last = updated
 
         except Exception, e:
-            self.log.exception('unexpected-error-leader-trackin', e=e)
+            log.exception('unexpected-error-leader-trackin', e=e)
 
         finally:
             # except in shutdown, the loop must continue (after a short delay)
@@ -324,16 +322,16 @@
             d.callback(leader_id)
 
     def _just_gained_leadership(self):
-        self.log.info('became-leader')
+        log.info('became-leader')
         self.leader = Leader(self)
         return self.leader.start()
 
     def _just_lost_leadership(self):
-        self.log.info('lost-leadership')
+        log.info('lost-leadership')
         return self._halt_leader()
 
     def _halt_leader(self):
-        d = self.leader.halt()
+        d = self.leader.stop()
         self.leader = None
         return d
 
@@ -352,7 +350,7 @@
                 yield self._backoff('stale-membership-record-in-the-way')
             except Exception, e:
                 if not self.shutting_down:
-                    self.log.exception(e)
+                    log.exception(e)
                 yield self._backoff('unknown-error')
 
         returnValue(result)
diff --git a/voltha/core/device_model.py b/voltha/core/device_model.py
index 3a25e7f..262e18f 100644
--- a/voltha/core/device_model.py
+++ b/voltha/core/device_model.py
@@ -22,11 +22,12 @@
 
 import structlog
 
+from voltha.protos import third_party
 from voltha.protos import voltha_pb2
 from voltha.protos import openflow_13_pb2 as ofp
 
 log = structlog.get_logger()
-
+_ = third_party
 
 def mac_str_to_tuple(mac):
     return tuple(int(d, 16) for d in mac.split(':'))
@@ -406,11 +407,12 @@
         print threading.current_thread().name
         print 'PACKET_OUT:', ofp_packet_out
         # TODO for debug purposes, lets turn this around and send it back
-        # self.packet_in(ofp.ofp_packet_in(
-        #     buffer_id=ofp_packet_out.buffer_id,
-        #     reason=ofp.OFPR_NO_MATCH,
-        #     data=ofp_packet_out.data
-        # ))
+        if 0:
+            self.packet_in(ofp.ofp_packet_in(
+                buffer_id=ofp_packet_out.buffer_id,
+                reason=ofp.OFPR_NO_MATCH,
+                data=ofp_packet_out.data
+            ))
 
 
 
diff --git a/voltha/leader.py b/voltha/leader.py
index 7f1e177..ddb15ee 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -24,6 +24,8 @@
 
 from common.utils.asleep import asleep
 
+log = get_logger()
+
 
 class Leader(object):
     """
@@ -36,8 +38,6 @@
     ID_EXTRACTOR = '^(%s)([^/]+)$'
     ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
 
-    log = get_logger()
-
     # Public methods:
 
     def __init__(self, coordinator):
@@ -61,13 +61,14 @@
 
     @inlineCallbacks
     def start(self):
-        self.log.info('leader-started')
+        log.debug('starting')
         yield self._validate_workload()
         yield self._start_tracking_assignments()
+        log.info('started')
 
-    def halt(self):
+    def stop(self):
         """Suspend leadership duties immediately"""
-        self.log.info('leader-halted')
+        log.debug('stopping')
         self.halted = True
 
         # any active cancellations, releases, etc., should happen here
@@ -75,6 +76,8 @@
             if not self.reassignment_soak_timer.called:
                 self.reassignment_soak_timer.cancel()
 
+        log.info('stopped')
+
     # Private methods:
 
     @inlineCallbacks
@@ -114,14 +117,14 @@
             workload = [m.group(2) for m in matches if m is not None]
 
             if workload != self.workload:
-                self.log.info('workload-changed',
+                log.info('workload-changed',
                               old_workload_count=len(self.workload),
                               new_workload_count=len(workload))
                 self.workload = workload
                 self._restart_reassignment_soak_timer()
 
         except Exception, e:
-            self.log.exception('workload-track-error', e=e)
+            log.exception('workload-track-error', e=e)
             yield asleep(
                 self.coord.leader_config.get(
                     self.coord.leader_config[
@@ -143,14 +146,14 @@
             members = [m.group(2) for m in matches if m is not None]
 
             if members != self.members:
-                self.log.info('membership-changed',
+                log.info('membership-changed',
                               old_members_count=len(self.members),
                               new_members_count=len(members))
                 self.members = members
                 self._restart_reassignment_soak_timer()
 
         except Exception, e:
-            self.log.exception('members-track-error', e=e)
+            log.exception('members-track-error', e=e)
             yield asleep(
                 self.coord.leader_config.get(
                     self.coord.leader_config[
@@ -174,7 +177,7 @@
     @inlineCallbacks
     def _reassign_work(self):
 
-        self.log.info('reassign-work')
+        log.info('reassign-work')
 
         # Plan
         #
@@ -207,7 +210,7 @@
                 for work in self.workload
             ]
             for (member, work) in sorted(wanted_assignments.iteritems()):
-                self.log.info('assignment',
+                log.info('assignment',
                               member=member, work_count=len(work))
 
             # Step 2: discover current assignment (from consul)
@@ -254,5 +257,5 @@
                         + member_id + '/' + work_id, '')
 
         except Exception, e:
-            self.log.exception('failed-reassignment', e=e)
+            log.exception('failed-reassignment', e=e)
             self._restart_reassignment_soak_timer()  # try again in a while
diff --git a/voltha/main.py b/voltha/main.py
index 3d03653..a9a0157 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -25,10 +25,6 @@
 import yaml
 from twisted.internet.defer import inlineCallbacks
 
-base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-sys.path.append(base_dir)
-sys.path.append(os.path.join(base_dir, '/voltha/protos/third_party'))
-
 from voltha.coordinator import Coordinator
 from common.utils.dockerhelpers import get_my_containers_name
 from common.utils.nethelpers import get_my_primary_interface, \
@@ -243,10 +239,10 @@
             rest_port=self.args.rest_port,
             instance_id=self.args.instance_id,
             config=self.config,
-            consul=self.args.consul)
+            consul=self.args.consul).start()
         init_rest_service(self.args.rest_port)
 
-        self.grpc_server = VolthaGrpcServer(self.args.grpc_port).run()
+        self.grpc_server = VolthaGrpcServer(self.args.grpc_port).start()
 
         # initialize kafka proxy singleton
         self.kafka_proxy = KafkaProxy(self.args.consul, self.args.kafka)
@@ -258,9 +254,9 @@
         """Execute before the reactor is shut down"""
         self.log.info('exiting-on-keyboard-interrupt')
         if self.coordinator is not None:
-            yield self.coordinator.shutdown()
+            yield self.coordinator.stop()
         if self.grpc_server is not None:
-            yield self.grpc_server.shutdown()
+            yield self.grpc_server.stop()
 
     def start_reactor(self):
         from twisted.internet import reactor
diff --git a/voltha/northbound/grpc/grpc_server.py b/voltha/northbound/grpc/grpc_server.py
index 489e0ff..9c0417f 100644
--- a/voltha/northbound/grpc/grpc_server.py
+++ b/voltha/northbound/grpc/grpc_server.py
@@ -18,6 +18,7 @@
 import os
 import uuid
 from Queue import Queue
+from collections import OrderedDict
 from os.path import abspath, basename, dirname, join, walk
 import grpc
 from concurrent import futures
@@ -33,12 +34,13 @@
 
 class SchemaService(schema_pb2.SchemaServiceServicer):
 
-    def __init__(self):
-        proto_map, descriptor_map = self._load_schema()
-        self.schema = schema_pb2.Schema(
-            protos=proto_map,
-            descriptors=descriptor_map
-        )
+    def __init__(self, thread_pool):
+        self.thread_pool = thread_pool
+        protos = self._load_schema()
+        self.schemas = schema_pb2.Schemas(protos=protos)
+
+    def stop(self):
+        pass
 
     def _load_schema(self):
         """Pre-load schema file so that we can serve it up (file sizes
@@ -53,29 +55,29 @@
             ]
             return proto_files
 
-        proto_map = {}
-        for fpath in find_files(proto_dir, '.proto'):
-            with open(fpath, 'r') as f:
-                content = f.read()
-            fname = basename(fpath)
+        proto_map = OrderedDict()  # to have deterministic data
+        for proto_file in find_files(proto_dir, '.proto'):
+            with open(proto_file, 'r') as f:
+                proto_content = f.read()
+            fname = basename(proto_file)
             # assure no two files have the same basename
             assert fname not in proto_map
-            proto_map[fname] = content
 
-        descriptor_map = {}
-        for fpath in find_files(proto_dir, '.desc'):
-            with open(fpath, 'r') as f:
-                content = f.read()
-            fname = basename(fpath)
-            # assure no two files have the same basename
-            assert fname not in descriptor_map
-            descriptor_map[fname] = zlib.compress(content)
+            desc_file = proto_file.replace('.proto', '.desc')
+            with open(desc_file, 'r') as f:
+                descriptor_content = zlib.compress(f.read())
 
-        return proto_map, descriptor_map
+            proto_map[fname] = schema_pb2.ProtoFile(
+                file_name=fname,
+                proto=proto_content,
+                descriptor=descriptor_content
+            )
+
+        return proto_map.values()
 
     def GetSchema(self, request, context):
         """Return current schema files and descriptor"""
-        return self.schema
+        return self.schemas
 
 
 class HealthService(voltha_pb2.HealthServiceServicer):
@@ -83,6 +85,9 @@
     def __init__(self, thread_pool):
         self.thread_pool = thread_pool
 
+    def stop(self):
+        pass
+
     def GetHealthStatus(self, request, context):
         """Return current health status of a Voltha instance
         """
@@ -107,6 +112,9 @@
         )) for id in (uuid.uuid5(uuid.NAMESPACE_OID, str(i)).get_hex()
                       for i in xrange(1000, 1005)))
 
+    def stop(self):
+        pass
+
     def GetAddress(self, request, context):
         log.info('get-address', request=request)
         return self.db[request.id]
@@ -146,6 +154,11 @@
         self.devices_map = dict((d.info.id, d) for d in self.devices)
         self.packet_in_queue = Queue()
 
+        self.stopped = False
+
+    def stop(self):
+        self.stopped = True
+
     # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     # Note that all GRPC method implementations are not called on the main
     # (twisted) thread. We shall contain them here and not spread calls on
@@ -205,6 +218,8 @@
         for request in request_iterator:
             forward_packet_out(packet_out=request)
 
+        return voltha_pb2.NullMessage()
+
     def ReceivePacketsIn(self, request, context):
         while 1:
             packet_in = self.packet_in_queue.get()
@@ -223,32 +238,44 @@
         log.info('init-grpc-server', port=self.port)
         self.thread_pool = futures.ThreadPoolExecutor(max_workers=10)
         self.server = grpc.server(self.thread_pool)
+        self.services = []
 
-        schema_pb2.add_SchemaServiceServicer_to_server(
-            SchemaService(), self.server)
-        voltha_pb2.add_HealthServiceServicer_to_server(
-            HealthService(self.thread_pool), self.server)
-        voltha_pb2.add_ExampleServiceServicer_to_server(
-            ExampleService(self.thread_pool), self.server)
-        voltha_pb2.add_VolthaLogicalLayerServicer_to_server(
-            VolthaLogicalLayer(self.thread_pool), self.server)
+    def start(self):
+        log.debug('starting')
 
+        # add each service unit to the server and also to the list
+        for activator_func, service_class in (
+            (schema_pb2.add_SchemaServiceServicer_to_server, SchemaService),
+            (voltha_pb2.add_HealthServiceServicer_to_server, HealthService),
+            (voltha_pb2.add_ExampleServiceServicer_to_server, ExampleService),
+            (voltha_pb2.add_VolthaLogicalLayerServicer_to_server, VolthaLogicalLayer)
+        ):
+            service = service_class(self.thread_pool)
+            self.services.append(service)
+            activator_func(service, self.server)
+
+        # open port
         self.server.add_insecure_port('[::]:%s' % self.port)
 
-    def run(self):
-        log.info('starting-grpc-server')
+        # strat the server
         self.server.start()
+
+        log.info('started')
         return self
 
-    def shutdown(self, grace=0):
+    def stop(self, grace=0):
+        log.debug('stopping')
+        for service in self.services:
+            service.stop()
         self.server.stop(grace)
+        log.debug('stopped')
 
 
 # This is to allow running the GRPC server in stand-alone mode
 
 if __name__ == '__main__':
 
-    server = VolthaGrpcServer().run()
+    server = VolthaGrpcServer().start()
 
     import time
     _ONE_DAY_IN_SECONDS = 60 * 60 * 24
@@ -256,6 +283,6 @@
         while 1:
             time.sleep(_ONE_DAY_IN_SECONDS)
     except KeyboardInterrupt:
-        server.shutdown()
+        server.stop()
 
 
diff --git a/voltha/protos/schema.desc b/voltha/protos/schema.desc
index 578b0e0..bb16d0d 100644
--- a/voltha/protos/schema.desc
+++ b/voltha/protos/schema.desc
Binary files differ
diff --git a/voltha/protos/schema.proto b/voltha/protos/schema.proto
index 8f9b643..9e97c7f 100644
--- a/voltha/protos/schema.proto
+++ b/voltha/protos/schema.proto
@@ -4,14 +4,18 @@
 
 import "google/api/annotations.proto";
 
-// Proto file and compiled descriptor for this interface
-message Schema {
+// Contains the name and content of a *.proto file
+message ProtoFile {
+    string file_name = 1;  // name of proto file
+    string proto = 2;  // content of proto file
+    bytes descriptor = 3;  // compiled descriptor for proto (zlib compressed)
+}
 
-  // file name -> proto file content
-  map<string, string> protos = 1;
+// Proto files and compiled descriptors for this interface
+message Schemas {
 
-  // file name -> gzip compressed protobuf of descriptor
-  map<string, bytes> descriptors = 2;
+    // Proto files
+    repeated ProtoFile protos = 1;
 
 }
 
@@ -21,10 +25,11 @@
 // Schema services
 service SchemaService {
 
-  // Return active grpc schemas
-  rpc GetSchema(NullMessage) returns (Schema) {
-    option (google.api.http) = {
-      get: "/schema"
-    };
-  }
+    // Return active grpc schemas
+    rpc GetSchema(NullMessage) returns (Schemas) {
+        option (google.api.http) = {
+            get: "/schema"
+        };
+    }
+
 }
diff --git a/voltha/protos/schema_pb2.py b/voltha/protos/schema_pb2.py
index f8c5e46..9f54c3a 100644
--- a/voltha/protos/schema_pb2.py
+++ b/voltha/protos/schema_pb2.py
@@ -20,7 +20,7 @@
   name='schema.proto',
   package='schema',
   syntax='proto3',
-  serialized_pb=_b('\n\x0cschema.proto\x12\x06schema\x1a\x1cgoogle/api/annotations.proto\"\xcd\x01\n\x06Schema\x12*\n\x06protos\x18\x01 \x03(\x0b\x32\x1a.schema.Schema.ProtosEntry\x12\x34\n\x0b\x64\x65scriptors\x18\x02 \x03(\x0b\x32\x1f.schema.Schema.DescriptorsEntry\x1a-\n\x0bProtosEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x32\n\x10\x44\x65scriptorsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"\r\n\x0bNullMessage2R\n\rSchemaService\x12\x41\n\tGetSchema\x12\x13.schema.NullMessage\x1a\x0e.schema.Schema\"\x0f\x82\xd3\xe4\x93\x02\t\x12\x07/schemab\x06proto3')
+  serialized_pb=_b('\n\x0cschema.proto\x12\x06schema\x1a\x1cgoogle/api/annotations.proto\"A\n\tProtoFile\x12\x11\n\tfile_name\x18\x01 \x01(\t\x12\r\n\x05proto\x18\x02 \x01(\t\x12\x12\n\ndescriptor\x18\x03 \x01(\x0c\",\n\x07Schemas\x12!\n\x06protos\x18\x01 \x03(\x0b\x32\x11.schema.ProtoFile\"\r\n\x0bNullMessage2S\n\rSchemaService\x12\x42\n\tGetSchema\x12\x13.schema.NullMessage\x1a\x0f.schema.Schemas\"\x0f\x82\xd3\xe4\x93\x02\t\x12\x07/schemab\x06proto3')
   ,
   dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,])
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
@@ -28,60 +28,30 @@
 
 
 
-_SCHEMA_PROTOSENTRY = _descriptor.Descriptor(
-  name='ProtosEntry',
-  full_name='schema.Schema.ProtosEntry',
+_PROTOFILE = _descriptor.Descriptor(
+  name='ProtoFile',
+  full_name='schema.ProtoFile',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='key', full_name='schema.Schema.ProtosEntry.key', index=0,
+      name='file_name', full_name='schema.ProtoFile.file_name', index=0,
       number=1, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None),
     _descriptor.FieldDescriptor(
-      name='value', full_name='schema.Schema.ProtosEntry.value', index=1,
+      name='proto', full_name='schema.ProtoFile.proto', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=163,
-  serialized_end=208,
-)
-
-_SCHEMA_DESCRIPTORSENTRY = _descriptor.Descriptor(
-  name='DescriptorsEntry',
-  full_name='schema.Schema.DescriptorsEntry',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
     _descriptor.FieldDescriptor(
-      name='key', full_name='schema.Schema.DescriptorsEntry.key', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    _descriptor.FieldDescriptor(
-      name='value', full_name='schema.Schema.DescriptorsEntry.value', index=1,
-      number=2, type=12, cpp_type=9, label=1,
+      name='descriptor', full_name='schema.ProtoFile.descriptor', index=2,
+      number=3, type=12, cpp_type=9, label=1,
       has_default_value=False, default_value=_b(""),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
@@ -92,41 +62,35 @@
   nested_types=[],
   enum_types=[
   ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=210,
-  serialized_end=260,
+  serialized_start=54,
+  serialized_end=119,
 )
 
-_SCHEMA = _descriptor.Descriptor(
-  name='Schema',
-  full_name='schema.Schema',
+
+_SCHEMAS = _descriptor.Descriptor(
+  name='Schemas',
+  full_name='schema.Schemas',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='protos', full_name='schema.Schema.protos', index=0,
+      name='protos', full_name='schema.Schemas.protos', index=0,
       number=1, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None),
-    _descriptor.FieldDescriptor(
-      name='descriptors', full_name='schema.Schema.descriptors', index=1,
-      number=2, type=11, cpp_type=10, label=3,
-      has_default_value=False, default_value=[],
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
   ],
   extensions=[
   ],
-  nested_types=[_SCHEMA_PROTOSENTRY, _SCHEMA_DESCRIPTORSENTRY, ],
+  nested_types=[],
   enum_types=[
   ],
   options=None,
@@ -135,8 +99,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=55,
-  serialized_end=260,
+  serialized_start=121,
+  serialized_end=165,
 )
 
 
@@ -159,39 +123,28 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=262,
-  serialized_end=275,
+  serialized_start=167,
+  serialized_end=180,
 )
 
-_SCHEMA_PROTOSENTRY.containing_type = _SCHEMA
-_SCHEMA_DESCRIPTORSENTRY.containing_type = _SCHEMA
-_SCHEMA.fields_by_name['protos'].message_type = _SCHEMA_PROTOSENTRY
-_SCHEMA.fields_by_name['descriptors'].message_type = _SCHEMA_DESCRIPTORSENTRY
-DESCRIPTOR.message_types_by_name['Schema'] = _SCHEMA
+_SCHEMAS.fields_by_name['protos'].message_type = _PROTOFILE
+DESCRIPTOR.message_types_by_name['ProtoFile'] = _PROTOFILE
+DESCRIPTOR.message_types_by_name['Schemas'] = _SCHEMAS
 DESCRIPTOR.message_types_by_name['NullMessage'] = _NULLMESSAGE
 
-Schema = _reflection.GeneratedProtocolMessageType('Schema', (_message.Message,), dict(
-
-  ProtosEntry = _reflection.GeneratedProtocolMessageType('ProtosEntry', (_message.Message,), dict(
-    DESCRIPTOR = _SCHEMA_PROTOSENTRY,
-    __module__ = 'schema_pb2'
-    # @@protoc_insertion_point(class_scope:schema.Schema.ProtosEntry)
-    ))
-  ,
-
-  DescriptorsEntry = _reflection.GeneratedProtocolMessageType('DescriptorsEntry', (_message.Message,), dict(
-    DESCRIPTOR = _SCHEMA_DESCRIPTORSENTRY,
-    __module__ = 'schema_pb2'
-    # @@protoc_insertion_point(class_scope:schema.Schema.DescriptorsEntry)
-    ))
-  ,
-  DESCRIPTOR = _SCHEMA,
+ProtoFile = _reflection.GeneratedProtocolMessageType('ProtoFile', (_message.Message,), dict(
+  DESCRIPTOR = _PROTOFILE,
   __module__ = 'schema_pb2'
-  # @@protoc_insertion_point(class_scope:schema.Schema)
+  # @@protoc_insertion_point(class_scope:schema.ProtoFile)
   ))
-_sym_db.RegisterMessage(Schema)
-_sym_db.RegisterMessage(Schema.ProtosEntry)
-_sym_db.RegisterMessage(Schema.DescriptorsEntry)
+_sym_db.RegisterMessage(ProtoFile)
+
+Schemas = _reflection.GeneratedProtocolMessageType('Schemas', (_message.Message,), dict(
+  DESCRIPTOR = _SCHEMAS,
+  __module__ = 'schema_pb2'
+  # @@protoc_insertion_point(class_scope:schema.Schemas)
+  ))
+_sym_db.RegisterMessage(Schemas)
 
 NullMessage = _reflection.GeneratedProtocolMessageType('NullMessage', (_message.Message,), dict(
   DESCRIPTOR = _NULLMESSAGE,
@@ -201,10 +154,6 @@
 _sym_db.RegisterMessage(NullMessage)
 
 
-_SCHEMA_PROTOSENTRY.has_options = True
-_SCHEMA_PROTOSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
-_SCHEMA_DESCRIPTORSENTRY.has_options = True
-_SCHEMA_DESCRIPTORSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 import grpc
 from grpc.beta import implementations as beta_implementations
 from grpc.beta import interfaces as beta_interfaces
@@ -225,7 +174,7 @@
     self.GetSchema = channel.unary_unary(
         '/schema.SchemaService/GetSchema',
         request_serializer=NullMessage.SerializeToString,
-        response_deserializer=Schema.FromString,
+        response_deserializer=Schemas.FromString,
         )
 
 
@@ -246,7 +195,7 @@
       'GetSchema': grpc.unary_unary_rpc_method_handler(
           servicer.GetSchema,
           request_deserializer=NullMessage.FromString,
-          response_serializer=Schema.SerializeToString,
+          response_serializer=Schemas.SerializeToString,
       ),
   }
   generic_handler = grpc.method_handlers_generic_handler(
@@ -278,7 +227,7 @@
     ('schema.SchemaService', 'GetSchema'): NullMessage.FromString,
   }
   response_serializers = {
-    ('schema.SchemaService', 'GetSchema'): Schema.SerializeToString,
+    ('schema.SchemaService', 'GetSchema'): Schemas.SerializeToString,
   }
   method_implementations = {
     ('schema.SchemaService', 'GetSchema'): face_utilities.unary_unary_inline(servicer.GetSchema),
@@ -292,7 +241,7 @@
     ('schema.SchemaService', 'GetSchema'): NullMessage.SerializeToString,
   }
   response_deserializers = {
-    ('schema.SchemaService', 'GetSchema'): Schema.FromString,
+    ('schema.SchemaService', 'GetSchema'): Schemas.FromString,
   }
   cardinalities = {
     'GetSchema': cardinality.Cardinality.UNARY_UNARY,
diff --git a/voltha/worker.py b/voltha/worker.py
index 8d0206c..d8b11a9 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -18,10 +18,12 @@
 from structlog import get_logger
 from twisted.internet import reactor
 from twisted.internet.base import DelayedCall
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
 
 from common.utils.asleep import asleep
 
+log = get_logger()
+
 
 class Worker(object):
     """
@@ -32,8 +34,6 @@
 
     ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
 
-    log = get_logger()
-
     # Public methods:
 
     def __init__(self, instance_id, coordinator):
@@ -53,14 +53,17 @@
 
     @inlineCallbacks
     def start(self):
-        self.log.info('worker-started')
+        log.debug('starting')
         yield self._start_tracking_my_assignments()
+        log.info('started')
+        returnValue(self)
 
-    def halt(self):
-        self.log.info('worker-halted')
+    def stop(self):
+        log.debug('stopping')
         if isinstance(self.assignment_soak_timer, DelayedCall):
             if not self.assignment_soak_timer.called:
                 self.assignment_soak_timer.cancel()
+        log.info('stopped')
 
     # Private methods:
 
@@ -96,7 +99,7 @@
                 self._stash_and_restart_soak_timer(my_workload)
 
         except Exception, e:
-            self.log.exception('assignments-track-error', e=e)
+            log.exception('assignments-track-error', e=e)
             yield asleep(
                 self.coord.worker_config.get(
                     self.coord.worker_config[
@@ -109,7 +112,7 @@
 
     def _stash_and_restart_soak_timer(self, candidate_workload):
 
-        self.log.debug('re-start-assignment-soaking')
+        log.debug('re-start-assignment-soaking')
 
         if self.assignment_soak_timer is not None:
             if not self.assignment_soak_timer.called:
@@ -124,7 +127,7 @@
         Called when finally the dust has settled on our assignments.
         :return: None
         """
-        self.log.info('my-assignments-changed',
+        log.info('my-assignments-changed',
                       old_count=len(self.my_workload),
                       new_count=len(self.my_candidate_workload))
         self.my_workload, self.my_candidate_workload = \