blob: c6f0fbc5c6d3eb2d74f0346f2fa83d5e07c959b8 [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""gRPC server endpoint"""
import os
import uuid
from Queue import Queue
from collections import OrderedDict
from os.path import abspath, basename, dirname, join, walk
import grpc
from concurrent import futures
from structlog import get_logger
import zlib
from zope.interface import implementer
from common.utils.grpc_utils import twisted_async
from voltha.core.logical_device_agent import LogicalDeviceAgent
from voltha.protos import voltha_pb2, schema_pb2
from google.protobuf.empty_pb2 import Empty
from voltha.registry import IComponent
log = get_logger()
class SchemaService(schema_pb2.SchemaServiceServicer):
def __init__(self, thread_pool):
self.thread_pool = thread_pool
protos = self._load_schema()
self.schemas = schema_pb2.Schemas(protos=protos,
swagger_from='voltha.proto',
yang_from='voltha.proto')
def stop(self):
pass
def _load_schema(self):
"""Pre-load schema file so that we can serve it up (file sizes
are small enough to do so
"""
proto_dir = abspath(join(dirname(__file__), '../../protos'))
def find_files(dir, suffix):
proto_files = [
join(dir, fname) for fname in os.listdir(dir)
if fname.endswith(suffix)
]
return proto_files
proto_map = OrderedDict() # to have deterministic data
for proto_file in find_files(proto_dir, '.proto'):
with open(proto_file, 'r') as f:
proto_content = f.read()
fname = basename(proto_file)
# assure no two files have the same basename
assert fname not in proto_map
desc_file = proto_file.replace('.proto', '.desc')
with open(desc_file, 'r') as f:
descriptor_content = zlib.compress(f.read())
proto_map[fname] = schema_pb2.ProtoFile(
file_name=fname,
proto=proto_content,
descriptor=descriptor_content
)
return proto_map.values()
def GetSchema(self, request, context):
"""Return current schema files and descriptor"""
return self.schemas
class HealthService(voltha_pb2.HealthServiceServicer):
def __init__(self, thread_pool):
self.thread_pool = thread_pool
def stop(self):
pass
def GetHealthStatus(self, request, context):
"""Return current health status of a Voltha instance
"""
log.info('get-health-status', request=request)
res = voltha_pb2.HealthStatus(
state=voltha_pb2.HealthStatus.HEALTHY
)
return res
'''
class VolthaLogicalLayer(voltha_pb2.VolthaLogicalLayerServicer):
# TODO still a mock
def __init__(self, threadpool):
self.threadpool = threadpool
self.devices = [LogicalDeviceAgent(self, 1)]
self.devices_map = dict((d.info.id, d) for d in self.devices)
self.packet_in_queue = Queue()
self.stopped = False
def stop(self):
self.stopped = True
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Note that all GRPC method implementations are not called on the main
# (twisted) thread. We shall contain them here and not spread calls on
# "foreign" threads elsewhere in the application. So all calls out from
# these methods shall be called with the callFromThread pattern.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@twisted_async
def ListLogicalDevices(self, request, context):
return voltha_pb2.LogicalDevices(
items=[voltha_pb2.LogicalDevice(
id=d.info.id,
datapath_id=d.info.datapath_id,
desc=d.info.desc
) for d in self.devices])
@twisted_async
def GetLogicalDevice(self, request, context):
return self.devices_map[request.id].info
@twisted_async
def ListLogicalDevicePorts(self, request, context):
device_model = self.devices_map[request.id]
return voltha_pb2.LogicalPorts(items=device_model.ports)
@twisted_async
def UpdateFlowTable(self, request, context):
device_model = self.devices_map[request.id]
device_model.update_flow_table(request.flow_mod)
return Empty()
@twisted_async
def ListDeviceFlows(self, request, context):
device_model = self.devices_map[request.id]
flows = device_model.list_flows()
return voltha_pb2.Flows(items=flows)
@twisted_async
def UpdateGroupTable(self, request, context):
device_model = self.devices_map[request.id]
device_model.update_group_table(request.group_mod)
return Empty()
@twisted_async
def ListDeviceFlowGroups(self, request, context):
device_model = self.devices_map[request.id]
groups = device_model.list_groups()
return voltha_pb2.FlowGroups(items=groups)
def StreamPacketsOut(self, request_iterator, context):
@twisted_async
def forward_packet_out(packet_out):
device_model = self.devices_map[packet_out.id]
device_model.packet_out(packet_out.packet_out)
for request in request_iterator:
forward_packet_out(packet_out=request)
return Empty()
def ReceivePacketsIn(self, request, context):
while 1:
packet_in = self.packet_in_queue.get()
yield packet_in
def send_packet_in(self, device_id, ofp_packet_in):
"""Must be called on the twisted thread"""
packet_in = voltha_pb2.PacketIn(id=device_id, packet_in=ofp_packet_in)
self.packet_in_queue.put(packet_in)
'''
@implementer(IComponent)
class VolthaGrpcServer(object):
def __init__(self, port=50055):
self.port = port
log.info('init-grpc-server', port=self.port)
self.thread_pool = futures.ThreadPoolExecutor(max_workers=10)
self.server = grpc.server(self.thread_pool)
self.services = []
def start(self):
log.debug('starting')
# add each service unit to the server and also to the list
for activator_func, service_class in (
(schema_pb2.add_SchemaServiceServicer_to_server, SchemaService),
(voltha_pb2.add_HealthServiceServicer_to_server, HealthService),
):
service = service_class(self.thread_pool)
self.register(activator_func, service)
# open port
self.server.add_insecure_port('[::]:%s' % self.port)
# strat the server
self.server.start()
log.info('started')
return self
def stop(self, grace=0):
log.debug('stopping')
for service in self.services:
service.stop()
self.server.stop(grace)
log.debug('stopped')
def register(self, activator_func, service):
"""
Allow late registration of gRPC servicers
:param activator_func: The gRPC "add_XYZServicer_to_server method
autogenerated by protoc.
:param service: The object implementing the service.
:return: None
"""
self.services.append(service)
activator_func(service, self.server)
# This is to allow running the GRPC server in stand-alone mode
if __name__ == '__main__':
server = VolthaGrpcServer().start()
import time
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
try:
while 1:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop()