blob: 6e9765f42ec902a2d7c7e98a045c3630c4141a21 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070020from Queue import Queue, Empty
alshabib06b449c2017-01-15 17:33:16 -060021import os
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070022
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070023from grpc import StatusCode
24from grpc._channel import _Rendezvous
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070025from structlog import get_logger
26from twisted.internet import reactor
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070027from twisted.internet import threads
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070028from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070029
Scott Bakerd865fa22018-11-07 11:45:28 -080030from protos.voltha_pb2 import ID, FlowTableUpdate, MeterModUpdate, \
Jonathan Hart398e4072018-05-30 16:54:00 -070031 FlowGroupTableUpdate, PacketOut
Scott Bakerd865fa22018-11-07 11:45:28 -080032from protos.voltha_pb2_grpc import VolthaLocalServiceStub
Jonathan Hart398e4072018-05-30 16:54:00 -070033from protos.logical_device_pb2 import LogicalPortId
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080034from google.protobuf import empty_pb2
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070035
36
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070037class GrpcClient(object):
38
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040039 def __init__(self, connection_manager, channel, grpc_timeout):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070040
Zack Williams18357ed2018-11-14 10:41:08 -070041 self.log = get_logger()
42
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070043 self.connection_manager = connection_manager
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070044 self.channel = channel
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040045 self.grpc_timeout = grpc_timeout
Zsolt Haraszti66862032016-11-28 14:28:39 -080046 self.local_stub = VolthaLocalServiceStub(channel)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070047
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070048 self.stopped = False
49
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070050 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
51 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080052 self.change_event_queue = DeferredQueue() # queue change events
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070053
Shad Ansari9109a282019-05-03 01:24:57 -070054 self.count_pkt_in = 0
55 self.count_pkt_out = 0
56
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070057 def start(self):
Zack Williams18357ed2018-11-14 10:41:08 -070058 self.log.debug('starting', grpc_timeout=self.grpc_timeout)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070059 self.start_packet_out_stream()
60 self.start_packet_in_stream()
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080061 self.start_change_event_in_stream()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070062 reactor.callLater(0, self.packet_in_forwarder_loop)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080063 reactor.callLater(0, self.change_event_processing_loop)
Zack Williams18357ed2018-11-14 10:41:08 -070064 self.log.info('started')
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070065 return self
66
67 def stop(self):
Zack Williams18357ed2018-11-14 10:41:08 -070068 self.log.debug('stopping')
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070069 self.stopped = True
Zack Williams18357ed2018-11-14 10:41:08 -070070 self.log.info('stopped')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070071
72 def start_packet_out_stream(self):
73
74 def packet_generator():
75 while 1:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070076 try:
77 packet = self.packet_out_queue.get(block=True, timeout=1.0)
78 except Empty:
79 if self.stopped:
80 return
81 else:
Shad Ansari9109a282019-05-03 01:24:57 -070082 self.count_pkt_out += 1
83 self.log.debug('counters grpc_client OUT - {}'.format(self.count_pkt_out))
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070084 yield packet
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070085
86 def stream_packets_out():
87 generator = packet_generator()
alshabib06b449c2017-01-15 17:33:16 -060088 try:
89 self.local_stub.StreamPacketsOut(generator)
90 except _Rendezvous, e:
Zack Williams18357ed2018-11-14 10:41:08 -070091 self.log.error('grpc-exception', status=e.code())
alshabib06b449c2017-01-15 17:33:16 -060092 if e.code() == StatusCode.UNAVAILABLE:
93 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070094
95 reactor.callInThread(stream_packets_out)
96
97 def start_packet_in_stream(self):
98
99 def receive_packet_in_stream():
Zsolt Haraszti66862032016-11-28 14:28:39 -0800100 streaming_rpc_method = self.local_stub.ReceivePacketsIn
Zsolt Haraszti9ed54292017-01-09 18:28:32 -0800101 iterator = streaming_rpc_method(empty_pb2.Empty())
alshabib06b449c2017-01-15 17:33:16 -0600102 try:
103 for packet_in in iterator:
104 reactor.callFromThread(self.packet_in_queue.put,
105 packet_in)
Zack Williams18357ed2018-11-14 10:41:08 -0700106 self.log.debug('enqued-packet-in',
alshabib06b449c2017-01-15 17:33:16 -0600107 packet_in=packet_in,
108 queue_len=len(self.packet_in_queue.pending))
Shad Ansari9109a282019-05-03 01:24:57 -0700109 self.count_pkt_in += 1
110 self.log.debug('counters grpc_client IN - {}'.format(self.count_pkt_in))
alshabib06b449c2017-01-15 17:33:16 -0600111 except _Rendezvous, e:
Zack Williams18357ed2018-11-14 10:41:08 -0700112 self.log.error('grpc-exception', status=e.code())
alshabib06b449c2017-01-15 17:33:16 -0600113 if e.code() == StatusCode.UNAVAILABLE:
114 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700115
116 reactor.callInThread(receive_packet_in_stream)
117
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800118 def start_change_event_in_stream(self):
119
120 def receive_change_events():
121 streaming_rpc_method = self.local_stub.ReceiveChangeEvents
Zsolt Haraszti9ed54292017-01-09 18:28:32 -0800122 iterator = streaming_rpc_method(empty_pb2.Empty())
alshabib06b449c2017-01-15 17:33:16 -0600123 try:
124 for event in iterator:
125 reactor.callFromThread(self.change_event_queue.put, event)
Zack Williams18357ed2018-11-14 10:41:08 -0700126 self.log.debug('enqued-change-event',
alshabib06b449c2017-01-15 17:33:16 -0600127 change_event=event,
128 queue_len=len(self.change_event_queue.pending))
129 except _Rendezvous, e:
Zack Williams18357ed2018-11-14 10:41:08 -0700130 self.log.error('grpc-exception', status=e.code())
alshabib06b449c2017-01-15 17:33:16 -0600131 if e.code() == StatusCode.UNAVAILABLE:
132 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800133
134 reactor.callInThread(receive_change_events)
135
136 @inlineCallbacks
137 def change_event_processing_loop(self):
138 while True:
139 try:
140 event = yield self.change_event_queue.get()
141 device_id = event.id
142 self.connection_manager.forward_change_event(device_id, event)
143 except Exception, e:
Zack Williams18357ed2018-11-14 10:41:08 -0700144 self.log.exception('failed-in-packet-in-handler', e=e)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800145 if self.stopped:
146 break
147
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700148 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700149 def packet_in_forwarder_loop(self):
150 while True:
151 packet_in = yield self.packet_in_queue.get()
152 device_id = packet_in.id
153 ofp_packet_in = packet_in.packet_in
Saurav Dasaa9247f2019-06-07 11:36:55 -0700154 self.log.debug('grpc client to send packet-in')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700155 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700156 if self.stopped:
157 break
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700158
159 def send_packet_out(self, device_id, packet_out):
Saurav Dasaa9247f2019-06-07 11:36:55 -0700160 self.log.debug('grpc client to send packet-out')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700161 packet_out = PacketOut(id=device_id, packet_out=packet_out)
162 self.packet_out_queue.put(packet_out)
163
164 @inlineCallbacks
Jonathan Hart8d21c322018-04-17 07:42:02 -0700165 def get_port(self, device_id, port_id):
166 req = LogicalPortId(id=device_id, port_id=port_id)
167 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400168 self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout)
Jonathan Hart8d21c322018-04-17 07:42:02 -0700169 returnValue(res)
170
171 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700172 def get_port_list(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700173 req = ID(id=device_id)
174 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400175 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700176 returnValue(res.items)
177
178 @inlineCallbacks
Jonathan Hart8d21c322018-04-17 07:42:02 -0700179 def enable_port(self, device_id, port_id):
180 req = LogicalPortId(
181 id=device_id,
182 port_id=port_id
183 )
184 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400185 self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout)
Jonathan Hart8d21c322018-04-17 07:42:02 -0700186 returnValue(res)
187
188 @inlineCallbacks
189 def disable_port(self, device_id, port_id):
190 req = LogicalPortId(
191 id=device_id,
192 port_id=port_id
193 )
194 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400195 self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout)
Jonathan Hart8d21c322018-04-17 07:42:02 -0700196 returnValue(res)
197
198 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700199 def get_device_info(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700200 req = ID(id=device_id)
201 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400202 self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700203 returnValue(res)
204
205 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700206 def update_flow_table(self, device_id, flow_mod):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700207 req = FlowTableUpdate(
208 id=device_id,
209 flow_mod=flow_mod
210 )
211 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400212 self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700213 returnValue(res)
214
215 @inlineCallbacks
Koray Kokten8592a232018-08-27 07:41:14 +0000216 def update_meter_mod_table(self, device_id, meter_mod):
217 req = MeterModUpdate(
218 id=device_id,
219 meter_mod=meter_mod
220 )
221 res = yield threads.deferToThread(
Gamze Abaka53cc0a22019-01-31 12:06:11 +0000222 self.local_stub.UpdateLogicalDeviceMeterTable, req, timeout=self.grpc_timeout)
Koray Kokten8592a232018-08-27 07:41:14 +0000223 returnValue(res)
224
225 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700226 def update_group_table(self, device_id, group_mod):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800227 req = FlowGroupTableUpdate(
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700228 id=device_id,
229 group_mod=group_mod
230 )
231 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400232 self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700233 returnValue(res)
234
235 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700236 def list_flows(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700237 req = ID(id=device_id)
238 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400239 self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700240 returnValue(res.items)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700241
242 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700243 def list_groups(self, device_id):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700244 req = ID(id=device_id)
245 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400246 self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700247 returnValue(res.items)
Nicolas Palpacuerfd7b8b12018-06-15 13:58:06 -0400248
249 @inlineCallbacks
Gamze Abaka53cc0a22019-01-31 12:06:11 +0000250 def list_meters(self, device_id):
251 req = ID(id=device_id)
252 res = yield threads.deferToThread(
253 self.local_stub.ListLogicalDeviceMeters, req, timeout=self.grpc_timeout)
254 returnValue(res.items)
255
256 @inlineCallbacks
Nicolas Palpacuerfd7b8b12018-06-15 13:58:06 -0400257 def list_ports(self, device_id):
258 req = ID(id=device_id)
259 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400260 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
261 returnValue(res.items)
262
263 @inlineCallbacks
264 def list_logical_devices(self):
265 res = yield threads.deferToThread(
266 self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout)
267 returnValue(res.items)
268
269 @inlineCallbacks
Nicolas Palpacuer75ba77f2018-08-27 17:26:57 -0400270 def list_reachable_logical_devices(self):
271 res = yield threads.deferToThread(
272 self.local_stub.ListReachableLogicalDevices, empty_pb2.Empty(),
273 timeout=self.grpc_timeout)
274 returnValue(res.items)
275
276 @inlineCallbacks
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400277 def subscribe(self, subscriber):
278 res = yield threads.deferToThread(
279 self.local_stub.Subscribe, subscriber, timeout=self.grpc_timeout)
280 returnValue(res)