blob: a1678c5f1792c981033fa46375d9a1279ff6970d [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070020from Queue import Queue, Empty
alshabib06b449c2017-01-15 17:33:16 -060021import os
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070022
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070023from grpc import StatusCode
24from grpc._channel import _Rendezvous
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070025from structlog import get_logger
26from twisted.internet import reactor
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070027from twisted.internet import threads
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070028from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070029
Scott Bakerd865fa22018-11-07 11:45:28 -080030from protos.voltha_pb2 import ID, FlowTableUpdate, MeterModUpdate, \
Jonathan Hart398e4072018-05-30 16:54:00 -070031 FlowGroupTableUpdate, PacketOut
Scott Bakerd865fa22018-11-07 11:45:28 -080032from protos.voltha_pb2_grpc import VolthaLocalServiceStub
Jonathan Hart398e4072018-05-30 16:54:00 -070033from protos.logical_device_pb2 import LogicalPortId
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080034from google.protobuf import empty_pb2
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070035
36
37log = get_logger()
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070038
39
40class GrpcClient(object):
41
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040042 def __init__(self, connection_manager, channel, grpc_timeout):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070043
44 self.connection_manager = connection_manager
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070045 self.channel = channel
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040046 self.grpc_timeout = grpc_timeout
Zsolt Haraszti66862032016-11-28 14:28:39 -080047 self.local_stub = VolthaLocalServiceStub(channel)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070048
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070049 self.stopped = False
50
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070051 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
52 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080053 self.change_event_queue = DeferredQueue() # queue change events
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070054
55 def start(self):
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040056 log.debug('starting', grpc_timeout=self.grpc_timeout)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070057 self.start_packet_out_stream()
58 self.start_packet_in_stream()
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080059 self.start_change_event_in_stream()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070060 reactor.callLater(0, self.packet_in_forwarder_loop)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080061 reactor.callLater(0, self.change_event_processing_loop)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070062 log.info('started')
63 return self
64
65 def stop(self):
66 log.debug('stopping')
67 self.stopped = True
68 log.info('stopped')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070069
70 def start_packet_out_stream(self):
71
72 def packet_generator():
73 while 1:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070074 try:
75 packet = self.packet_out_queue.get(block=True, timeout=1.0)
76 except Empty:
77 if self.stopped:
78 return
79 else:
80 yield packet
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070081
82 def stream_packets_out():
83 generator = packet_generator()
alshabib06b449c2017-01-15 17:33:16 -060084 try:
85 self.local_stub.StreamPacketsOut(generator)
86 except _Rendezvous, e:
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040087 log.error('grpc-exception', status=e.code())
alshabib06b449c2017-01-15 17:33:16 -060088 if e.code() == StatusCode.UNAVAILABLE:
89 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070090
91 reactor.callInThread(stream_packets_out)
92
93 def start_packet_in_stream(self):
94
95 def receive_packet_in_stream():
Zsolt Haraszti66862032016-11-28 14:28:39 -080096 streaming_rpc_method = self.local_stub.ReceivePacketsIn
Zsolt Haraszti9ed54292017-01-09 18:28:32 -080097 iterator = streaming_rpc_method(empty_pb2.Empty())
alshabib06b449c2017-01-15 17:33:16 -060098 try:
99 for packet_in in iterator:
100 reactor.callFromThread(self.packet_in_queue.put,
101 packet_in)
102 log.debug('enqued-packet-in',
103 packet_in=packet_in,
104 queue_len=len(self.packet_in_queue.pending))
105 except _Rendezvous, e:
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400106 log.error('grpc-exception', status=e.code())
alshabib06b449c2017-01-15 17:33:16 -0600107 if e.code() == StatusCode.UNAVAILABLE:
108 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700109
110 reactor.callInThread(receive_packet_in_stream)
111
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800112 def start_change_event_in_stream(self):
113
114 def receive_change_events():
115 streaming_rpc_method = self.local_stub.ReceiveChangeEvents
Zsolt Haraszti9ed54292017-01-09 18:28:32 -0800116 iterator = streaming_rpc_method(empty_pb2.Empty())
alshabib06b449c2017-01-15 17:33:16 -0600117 try:
118 for event in iterator:
119 reactor.callFromThread(self.change_event_queue.put, event)
120 log.debug('enqued-change-event',
121 change_event=event,
122 queue_len=len(self.change_event_queue.pending))
123 except _Rendezvous, e:
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400124 log.error('grpc-exception', status=e.code())
alshabib06b449c2017-01-15 17:33:16 -0600125 if e.code() == StatusCode.UNAVAILABLE:
126 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800127
128 reactor.callInThread(receive_change_events)
129
130 @inlineCallbacks
131 def change_event_processing_loop(self):
132 while True:
133 try:
134 event = yield self.change_event_queue.get()
135 device_id = event.id
136 self.connection_manager.forward_change_event(device_id, event)
137 except Exception, e:
138 log.exception('failed-in-packet-in-handler', e=e)
139 if self.stopped:
140 break
141
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700142 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700143 def packet_in_forwarder_loop(self):
144 while True:
145 packet_in = yield self.packet_in_queue.get()
146 device_id = packet_in.id
147 ofp_packet_in = packet_in.packet_in
148 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700149 if self.stopped:
150 break
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700151
152 def send_packet_out(self, device_id, packet_out):
153 packet_out = PacketOut(id=device_id, packet_out=packet_out)
154 self.packet_out_queue.put(packet_out)
155
156 @inlineCallbacks
Jonathan Hart8d21c322018-04-17 07:42:02 -0700157 def get_port(self, device_id, port_id):
158 req = LogicalPortId(id=device_id, port_id=port_id)
159 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400160 self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout)
Jonathan Hart8d21c322018-04-17 07:42:02 -0700161 returnValue(res)
162
163 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700164 def get_port_list(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700165 req = ID(id=device_id)
166 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400167 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700168 returnValue(res.items)
169
170 @inlineCallbacks
Jonathan Hart8d21c322018-04-17 07:42:02 -0700171 def enable_port(self, device_id, port_id):
172 req = LogicalPortId(
173 id=device_id,
174 port_id=port_id
175 )
176 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400177 self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout)
Jonathan Hart8d21c322018-04-17 07:42:02 -0700178 returnValue(res)
179
180 @inlineCallbacks
181 def disable_port(self, device_id, port_id):
182 req = LogicalPortId(
183 id=device_id,
184 port_id=port_id
185 )
186 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400187 self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout)
Jonathan Hart8d21c322018-04-17 07:42:02 -0700188 returnValue(res)
189
190 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700191 def get_device_info(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700192 req = ID(id=device_id)
193 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400194 self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700195 returnValue(res)
196
197 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700198 def update_flow_table(self, device_id, flow_mod):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700199 req = FlowTableUpdate(
200 id=device_id,
201 flow_mod=flow_mod
202 )
203 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400204 self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700205 returnValue(res)
206
207 @inlineCallbacks
Koray Kokten8592a232018-08-27 07:41:14 +0000208 def update_meter_mod_table(self, device_id, meter_mod):
209 req = MeterModUpdate(
210 id=device_id,
211 meter_mod=meter_mod
212 )
213 res = yield threads.deferToThread(
214 self.local_stub.UpdateLogicalDeviceMeterTable, req)
215 returnValue(res)
216
217 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700218 def update_group_table(self, device_id, group_mod):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800219 req = FlowGroupTableUpdate(
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700220 id=device_id,
221 group_mod=group_mod
222 )
223 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400224 self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700225 returnValue(res)
226
227 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700228 def list_flows(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700229 req = ID(id=device_id)
230 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400231 self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700232 returnValue(res.items)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700233
234 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700235 def list_groups(self, device_id):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700236 req = ID(id=device_id)
237 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400238 self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700239 returnValue(res.items)
Nicolas Palpacuerfd7b8b12018-06-15 13:58:06 -0400240
241 @inlineCallbacks
242 def list_ports(self, device_id):
243 req = ID(id=device_id)
244 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400245 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
246 returnValue(res.items)
247
248 @inlineCallbacks
249 def list_logical_devices(self):
250 res = yield threads.deferToThread(
251 self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout)
252 returnValue(res.items)
253
254 @inlineCallbacks
Nicolas Palpacuer75ba77f2018-08-27 17:26:57 -0400255 def list_reachable_logical_devices(self):
256 res = yield threads.deferToThread(
257 self.local_stub.ListReachableLogicalDevices, empty_pb2.Empty(),
258 timeout=self.grpc_timeout)
259 returnValue(res.items)
260
261 @inlineCallbacks
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400262 def subscribe(self, subscriber):
263 res = yield threads.deferToThread(
264 self.local_stub.Subscribe, subscriber, timeout=self.grpc_timeout)
265 returnValue(res)
Koray Kokten8592a232018-08-27 07:41:14 +0000266
267
268 @inlineCallbacks
269 def get_meter_stats(self, device_id):
270 req = ID(id=device_id)
271 res = yield threads.deferToThread(
272 self.local_stub.GetMeterStatsOfLogicalDevice, req)
273 returnValue(res.items)