blob: 6c1163504028b08b43e7f9c2cc20f23df3f25f45 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#!/usr/bin/env python
Zack Williams41513bf2018-07-07 20:08:35 -07002# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070015
Zsolt Harasztie39523b2016-10-16 19:30:34 -070016import grpc
17from concurrent import futures
18from concurrent.futures import Future
19from twisted.internet import reactor
20from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
21
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070022from common.utils.asleep import asleep
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070023from google.protobuf.empty_pb2 import Empty
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070024
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070025from common.utils.grpc_utils import twisted_async
Zsolt Harasztie39523b2016-10-16 19:30:34 -070026from streaming_pb2 import add_ExperimentalServiceServicer_to_server, \
27 AsyncEvent, ExperimentalServiceServicer, Echo
28
29
30class ShutDown(object):
31 stop = False # semaphore for all loops to stop when this flag is set
32
33
Zsolt Harasztie39523b2016-10-16 19:30:34 -070034class ShuttingDown(Exception): pass
35
36
Zsolt Harasztie39523b2016-10-16 19:30:34 -070037class Service(ExperimentalServiceServicer):
38
39 def __init__(self):
40 self.event_seq = 0
41
42 @twisted_async
43 @inlineCallbacks
44 def GetEcho(self, request, context):
45 print 'got Echo({}) request'.format(request.msg)
46 yield asleep(request.delay)
47 msg = request.msg + ' <<'
48 print ' Echo({}) reply'.format(msg)
49 returnValue(Echo(msg=msg))
50
51 @twisted_async
52 @inlineCallbacks
53 def get_next_event(self):
54 """called on the twisted thread"""
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070055 yield asleep(0.000001)
Zsolt Harasztie39523b2016-10-16 19:30:34 -070056 event = AsyncEvent(seq=self.event_seq, details='foo')
57 self.event_seq += 1
58 returnValue(event)
59
60 def ReceiveStreamedEvents(self, request, context):
61 """called on a thread-pool thread"""
62 print 'got ReceiveStreamedEvents request'
63 while 1:
64 if ShutDown.stop:
65 break
66 yield self.get_next_event()
67
68 def ReceivePackets(self, request, context):
69 pass
70
71 def SendPackets(self, request, context):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070072 count = 0
73 for _ in request:
74 count += 1
75 if count % 1000 == 0:
76 print '%s got %d packets' % (20 * ' ', count)
77 return Empty()
Zsolt Harasztie39523b2016-10-16 19:30:34 -070078
79
Zsolt Harasztie39523b2016-10-16 19:30:34 -070080if __name__ == '__main__':
81 thread_pool = futures.ThreadPoolExecutor(max_workers=10)
82 server = grpc.server(thread_pool)
83 add_ExperimentalServiceServicer_to_server(Service(), server)
Zsolt Harasztie39523b2016-10-16 19:30:34 -070084 server.add_insecure_port('[::]:50050')
85 server.start()
86 def shutdown():
87 ShutDown.stop = True
88 thread_pool.shutdown(wait=True)
89 server.stop(0)
90 reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
91 reactor.run()