Nomralize component start()/stop()
Also fixed the /schema swagger/rest entry. It did not work
because the 3rdparty protobuf_to_dict library cannot handle
Map fields. Changed the two map fields to a single list
entry.
Change-Id: Ib25a528701b67d58d32451687724c8247da6efa5
diff --git a/ofagent/agent.py b/ofagent/agent.py
index ab1b868..cabcc6c 100644
--- a/ofagent/agent.py
+++ b/ofagent/agent.py
@@ -55,13 +55,22 @@
def get_device_id(self):
return self.device_id
- def run(self):
+ def start(self):
+ log.debug('starting')
if self.running:
return
self.running = True
reactor.callLater(0, self.keep_connected)
+ log.info('started')
return self
+ def stop(self):
+ log.debug('stopping')
+ self.connected = False
+ self.exiting = True
+ self.connector.disconnect()
+ log.info('stopped')
+
def resolve_endpoint(self, endpoint):
# TODO allow resolution via consul
host, port = endpoint.split(':', 2)
@@ -79,24 +88,19 @@
log.debug('reconnect', after_delay=self.retry_interval)
yield asleep(self.retry_interval)
- def stop(self):
- self.connected = False
- self.exiting = True
- self.connector.disconnect()
- log.info('stopped')
-
def enter_disconnected(self, event, reason):
"""Internally signal entering disconnected state"""
- log.error(event, reason=reason)
self.connected = False
- self.d_disconnected.callback(None)
+ if not self.exiting:
+ log.error(event, reason=reason)
+ self.d_disconnected.callback(None)
def enter_connected(self):
"""Handle transitioning from disconnected to connected state"""
log.info('connected')
self.connected = True
self.read_buffer = None
- reactor.callLater(0, self.proto_handler.run)
+ reactor.callLater(0, self.proto_handler.start)
# protocol.ClientFactory methods
@@ -110,8 +114,9 @@
self.enter_disconnected('connection-failed', reason)
def clientConnectionLost(self, connector, reason):
- log.error('client-connection-lost',
- reason=reason, connector=connector)
+ if not self.exiting:
+ log.error('client-connection-lost',
+ reason=reason, connector=connector)
def forward_packet_in(self, ofp_packet_in):
self.proto_handler.forward_packet_in(ofp_packet_in)
@@ -147,7 +152,7 @@
return ports
stub = MockRpc()
- agents = [Agent('localhost:6633', 256 + i, stub).run() for i in range(n)]
+ agents = [Agent('localhost:6633', 256 + i, stub).start() for i in range(n)]
def shutdown():
[a.stop() for a in agents]
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 13fb6de..0bb2f89 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -23,13 +23,14 @@
from common.utils.consulhelpers import get_endpoint_from_consul
from structlog import get_logger
import grpc
+from ofagent.protos import third_party
from protos import voltha_pb2
from grpc_client import GrpcClient
from agent import Agent
log = get_logger()
-
+_ = third_party
class ConnectionManager(object):
@@ -52,12 +53,12 @@
self.running = False
- def run(self):
+ def start(self):
if self.running:
return
- log.info('run-connection-manager')
+ log.debug('starting')
self.running = True
@@ -65,19 +66,24 @@
self.channel = self.get_grpc_channel_with_voltha()
# Create shared gRPC API object
- self.grpc_client = GrpcClient(self, self.channel)
+ self.grpc_client = GrpcClient(self, self.channel).start()
# Start monitoring logical devices and manage agents accordingly
reactor.callLater(0, self.monitor_logical_devices)
+ log.info('started')
+
return self
- def shutdown(self):
+ def stop(self):
+ log.debug('stopping')
# clean up all controller connections
- for _, value in enumerate(self.agent_map):
- value.stop()
+ for agent in self.agent_map.itervalues():
+ agent.stop()
self.running = False
- # TODO: close grpc connection to voltha
+ self.grpc_client.stop()
+ del self.channel
+ log.info('stopped')
def resolve_endpoint(self, endpoint):
ip_port_endpoint = endpoint
@@ -170,7 +176,7 @@
device_id = device.id
agent = Agent(self.controller_endpoint, datapath_id,
device_id, self.grpc_client)
- agent.run()
+ agent.start()
self.agent_map[datapath_id] = agent
self.device_id_to_datapath_id_map[device_id] = datapath_id
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index efbc038..5b6b0a8 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -17,8 +17,10 @@
"""
The gRPC client layer for the OpenFlow agent
"""
-from Queue import Queue
+from Queue import Queue, Empty
+from grpc import StatusCode
+from grpc._channel import _Rendezvous
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet import threads
@@ -39,18 +41,35 @@
self.channel = channel
self.logical_stub = VolthaLogicalLayerStub(channel)
+ self.stopped = False
+
self.packet_out_queue = Queue() # queue to send out PacketOut msgs
self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
+
+ def start(self):
+ log.debug('starting')
self.start_packet_out_stream()
self.start_packet_in_stream()
reactor.callLater(0, self.packet_in_forwarder_loop)
+ log.info('started')
+ return self
+
+ def stop(self):
+ log.debug('stopping')
+ self.stopped = True
+ log.info('stopped')
def start_packet_out_stream(self):
def packet_generator():
while 1:
- packet = self.packet_out_queue.get(block=True)
- yield packet
+ try:
+ packet = self.packet_out_queue.get(block=True, timeout=1.0)
+ except Empty:
+ if self.stopped:
+ return
+ else:
+ yield packet
def stream_packets_out():
generator = packet_generator()
@@ -61,8 +80,11 @@
def start_packet_in_stream(self):
def receive_packet_in_stream():
- for packet_in in self.logical_stub.ReceivePacketsIn(NullMessage()):
- reactor.callFromThread(self.packet_in_queue.put, packet_in)
+ streaming_rpc_method = self.logical_stub.ReceivePacketsIn
+ iterator = streaming_rpc_method(NullMessage())
+ for packet_in in iterator:
+ reactor.callFromThread(self.packet_in_queue.put,
+ packet_in)
log.debug('enqued-packet-in',
packet_in=packet_in,
queue_len=len(self.packet_in_queue.pending))
@@ -76,6 +98,8 @@
device_id = packet_in.id
ofp_packet_in = packet_in.packet_in
self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
+ if self.stopped:
+ break
def send_packet_out(self, device_id, packet_out):
packet_out = PacketOut(id=device_id, packet_out=packet_out)
diff --git a/ofagent/loxi/connection.py b/ofagent/loxi/connection.py
index f74ff20..0882ac4 100644
--- a/ofagent/loxi/connection.py
+++ b/ofagent/loxi/connection.py
@@ -45,7 +45,7 @@
self.finished = False
self.read_buffer = None
- def run(self):
+ def start(self):
while not self.finished:
try:
rd, wr, err = select.select([self.sock, self.wakeup_rd], [], [])
diff --git a/ofagent/main.py b/ofagent/main.py
index 5b21692..1d6149e 100755
--- a/ofagent/main.py
+++ b/ofagent/main.py
@@ -16,14 +16,10 @@
#
import argparse
import os
-import sys
import yaml
+from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
-base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-sys.path.append(base_dir)
-sys.path.append(os.path.join(base_dir, '/ofagent/protos/third_party'))
-
from common.utils.dockerhelpers import get_my_containers_name
from common.utils.nethelpers import get_my_primary_local_ipv4
from common.utils.structlog_setup import setup_logging
@@ -216,7 +212,7 @@
self.log.info('starting-internal-components')
args = self.args
self.connection_manager = yield ConnectionManager(
- args.consul, args.grpc_endpoint, args.controller).run()
+ args.consul, args.grpc_endpoint, args.controller).start()
self.log.info('started-internal-services')
@inlineCallbacks
@@ -225,10 +221,9 @@
self.log.info('exiting-on-keyboard-interrupt')
self.exiting = True
if self.connection_manager is not None:
- yield self.connection_manager.shutdown()
+ yield self.connection_manager.stop()
def start_reactor(self):
- from twisted.internet import reactor
reactor.callWhenRunning(
lambda: self.log.info('twisted-reactor-started'))
diff --git a/ofagent/of_protocol_handler.py b/ofagent/of_protocol_handler.py
index e109782..1c284eb 100644
--- a/ofagent/of_protocol_handler.py
+++ b/ofagent/of_protocol_handler.py
@@ -14,7 +14,7 @@
# limitations under the License.
#
import structlog
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
import loxi.of13 as ofp
from converter import to_loxi, pb2dict, to_grpc
@@ -44,9 +44,11 @@
self.rpc = rpc
@inlineCallbacks
- def run(self):
+ def start(self):
"""A new call is made after a fresh reconnect"""
+ log.debug('starting')
+
try:
# send initial hello message
self.cxn.send(ofp.message.hello())
@@ -67,6 +69,14 @@
except Exception, e:
log.exception('exception', e=e)
+ log.info('started')
+ returnValue(self)
+
+ def stop(self):
+ log.debug('stopping')
+ pass # nothing to do yet
+ log.info('stopped')
+
def handle_echo_request(self, req):
self.cxn.send(ofp.message.echo_reply(xid=req.xid))
diff --git a/ofagent/protos/schema_pb2.py b/ofagent/protos/schema_pb2.py
index f8c5e46..9f54c3a 100644
--- a/ofagent/protos/schema_pb2.py
+++ b/ofagent/protos/schema_pb2.py
@@ -20,7 +20,7 @@
name='schema.proto',
package='schema',
syntax='proto3',
- serialized_pb=_b('\n\x0cschema.proto\x12\x06schema\x1a\x1cgoogle/api/annotations.proto\"\xcd\x01\n\x06Schema\x12*\n\x06protos\x18\x01 \x03(\x0b\x32\x1a.schema.Schema.ProtosEntry\x12\x34\n\x0b\x64\x65scriptors\x18\x02 \x03(\x0b\x32\x1f.schema.Schema.DescriptorsEntry\x1a-\n\x0bProtosEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x32\n\x10\x44\x65scriptorsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"\r\n\x0bNullMessage2R\n\rSchemaService\x12\x41\n\tGetSchema\x12\x13.schema.NullMessage\x1a\x0e.schema.Schema\"\x0f\x82\xd3\xe4\x93\x02\t\x12\x07/schemab\x06proto3')
+ serialized_pb=_b('\n\x0cschema.proto\x12\x06schema\x1a\x1cgoogle/api/annotations.proto\"A\n\tProtoFile\x12\x11\n\tfile_name\x18\x01 \x01(\t\x12\r\n\x05proto\x18\x02 \x01(\t\x12\x12\n\ndescriptor\x18\x03 \x01(\x0c\",\n\x07Schemas\x12!\n\x06protos\x18\x01 \x03(\x0b\x32\x11.schema.ProtoFile\"\r\n\x0bNullMessage2S\n\rSchemaService\x12\x42\n\tGetSchema\x12\x13.schema.NullMessage\x1a\x0f.schema.Schemas\"\x0f\x82\xd3\xe4\x93\x02\t\x12\x07/schemab\x06proto3')
,
dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,])
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
@@ -28,60 +28,30 @@
-_SCHEMA_PROTOSENTRY = _descriptor.Descriptor(
- name='ProtosEntry',
- full_name='schema.Schema.ProtosEntry',
+_PROTOFILE = _descriptor.Descriptor(
+ name='ProtoFile',
+ full_name='schema.ProtoFile',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
- name='key', full_name='schema.Schema.ProtosEntry.key', index=0,
+ name='file_name', full_name='schema.ProtoFile.file_name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
- name='value', full_name='schema.Schema.ProtosEntry.value', index=1,
+ name='proto', full_name='schema.ProtoFile.proto', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
- is_extendable=False,
- syntax='proto3',
- extension_ranges=[],
- oneofs=[
- ],
- serialized_start=163,
- serialized_end=208,
-)
-
-_SCHEMA_DESCRIPTORSENTRY = _descriptor.Descriptor(
- name='DescriptorsEntry',
- full_name='schema.Schema.DescriptorsEntry',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
_descriptor.FieldDescriptor(
- name='key', full_name='schema.Schema.DescriptorsEntry.key', index=0,
- number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- _descriptor.FieldDescriptor(
- name='value', full_name='schema.Schema.DescriptorsEntry.value', index=1,
- number=2, type=12, cpp_type=9, label=1,
+ name='descriptor', full_name='schema.ProtoFile.descriptor', index=2,
+ number=3, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
@@ -92,41 +62,35 @@
nested_types=[],
enum_types=[
],
- options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+ options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
- serialized_start=210,
- serialized_end=260,
+ serialized_start=54,
+ serialized_end=119,
)
-_SCHEMA = _descriptor.Descriptor(
- name='Schema',
- full_name='schema.Schema',
+
+_SCHEMAS = _descriptor.Descriptor(
+ name='Schemas',
+ full_name='schema.Schemas',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
- name='protos', full_name='schema.Schema.protos', index=0,
+ name='protos', full_name='schema.Schemas.protos', index=0,
number=1, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
- _descriptor.FieldDescriptor(
- name='descriptors', full_name='schema.Schema.descriptors', index=1,
- number=2, type=11, cpp_type=10, label=3,
- has_default_value=False, default_value=[],
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
],
extensions=[
],
- nested_types=[_SCHEMA_PROTOSENTRY, _SCHEMA_DESCRIPTORSENTRY, ],
+ nested_types=[],
enum_types=[
],
options=None,
@@ -135,8 +99,8 @@
extension_ranges=[],
oneofs=[
],
- serialized_start=55,
- serialized_end=260,
+ serialized_start=121,
+ serialized_end=165,
)
@@ -159,39 +123,28 @@
extension_ranges=[],
oneofs=[
],
- serialized_start=262,
- serialized_end=275,
+ serialized_start=167,
+ serialized_end=180,
)
-_SCHEMA_PROTOSENTRY.containing_type = _SCHEMA
-_SCHEMA_DESCRIPTORSENTRY.containing_type = _SCHEMA
-_SCHEMA.fields_by_name['protos'].message_type = _SCHEMA_PROTOSENTRY
-_SCHEMA.fields_by_name['descriptors'].message_type = _SCHEMA_DESCRIPTORSENTRY
-DESCRIPTOR.message_types_by_name['Schema'] = _SCHEMA
+_SCHEMAS.fields_by_name['protos'].message_type = _PROTOFILE
+DESCRIPTOR.message_types_by_name['ProtoFile'] = _PROTOFILE
+DESCRIPTOR.message_types_by_name['Schemas'] = _SCHEMAS
DESCRIPTOR.message_types_by_name['NullMessage'] = _NULLMESSAGE
-Schema = _reflection.GeneratedProtocolMessageType('Schema', (_message.Message,), dict(
-
- ProtosEntry = _reflection.GeneratedProtocolMessageType('ProtosEntry', (_message.Message,), dict(
- DESCRIPTOR = _SCHEMA_PROTOSENTRY,
- __module__ = 'schema_pb2'
- # @@protoc_insertion_point(class_scope:schema.Schema.ProtosEntry)
- ))
- ,
-
- DescriptorsEntry = _reflection.GeneratedProtocolMessageType('DescriptorsEntry', (_message.Message,), dict(
- DESCRIPTOR = _SCHEMA_DESCRIPTORSENTRY,
- __module__ = 'schema_pb2'
- # @@protoc_insertion_point(class_scope:schema.Schema.DescriptorsEntry)
- ))
- ,
- DESCRIPTOR = _SCHEMA,
+ProtoFile = _reflection.GeneratedProtocolMessageType('ProtoFile', (_message.Message,), dict(
+ DESCRIPTOR = _PROTOFILE,
__module__ = 'schema_pb2'
- # @@protoc_insertion_point(class_scope:schema.Schema)
+ # @@protoc_insertion_point(class_scope:schema.ProtoFile)
))
-_sym_db.RegisterMessage(Schema)
-_sym_db.RegisterMessage(Schema.ProtosEntry)
-_sym_db.RegisterMessage(Schema.DescriptorsEntry)
+_sym_db.RegisterMessage(ProtoFile)
+
+Schemas = _reflection.GeneratedProtocolMessageType('Schemas', (_message.Message,), dict(
+ DESCRIPTOR = _SCHEMAS,
+ __module__ = 'schema_pb2'
+ # @@protoc_insertion_point(class_scope:schema.Schemas)
+ ))
+_sym_db.RegisterMessage(Schemas)
NullMessage = _reflection.GeneratedProtocolMessageType('NullMessage', (_message.Message,), dict(
DESCRIPTOR = _NULLMESSAGE,
@@ -201,10 +154,6 @@
_sym_db.RegisterMessage(NullMessage)
-_SCHEMA_PROTOSENTRY.has_options = True
-_SCHEMA_PROTOSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
-_SCHEMA_DESCRIPTORSENTRY.has_options = True
-_SCHEMA_DESCRIPTORSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
import grpc
from grpc.beta import implementations as beta_implementations
from grpc.beta import interfaces as beta_interfaces
@@ -225,7 +174,7 @@
self.GetSchema = channel.unary_unary(
'/schema.SchemaService/GetSchema',
request_serializer=NullMessage.SerializeToString,
- response_deserializer=Schema.FromString,
+ response_deserializer=Schemas.FromString,
)
@@ -246,7 +195,7 @@
'GetSchema': grpc.unary_unary_rpc_method_handler(
servicer.GetSchema,
request_deserializer=NullMessage.FromString,
- response_serializer=Schema.SerializeToString,
+ response_serializer=Schemas.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
@@ -278,7 +227,7 @@
('schema.SchemaService', 'GetSchema'): NullMessage.FromString,
}
response_serializers = {
- ('schema.SchemaService', 'GetSchema'): Schema.SerializeToString,
+ ('schema.SchemaService', 'GetSchema'): Schemas.SerializeToString,
}
method_implementations = {
('schema.SchemaService', 'GetSchema'): face_utilities.unary_unary_inline(servicer.GetSchema),
@@ -292,7 +241,7 @@
('schema.SchemaService', 'GetSchema'): NullMessage.SerializeToString,
}
response_deserializers = {
- ('schema.SchemaService', 'GetSchema'): Schema.FromString,
+ ('schema.SchemaService', 'GetSchema'): Schemas.FromString,
}
cardinalities = {
'GetSchema': cardinality.Cardinality.UNARY_UNARY,
diff --git a/ofagent/protos/third_party/__init__.py b/ofagent/protos/third_party/__init__.py
index 6dab4e7..0608e8c 100644
--- a/ofagent/protos/third_party/__init__.py
+++ b/ofagent/protos/third_party/__init__.py
@@ -38,7 +38,7 @@
def load_module(self, name):
if name in sys.modules:
return sys.modules[name]
- full_name = 'ofagent.protos.third_party.' + name
+ full_name = 'voltha.protos.third_party.' + name
import_module(full_name)
module = sys.modules[full_name]
sys.modules[name] = module