blob: 8e7b3937de4094a898f8c87261a389429dfca385 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
20from Queue import Queue, Empty
21import os
khenaidoo43aa6bd2019-05-29 13:35:13 -040022import uuid
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050023
24from grpc import StatusCode
25from grpc._channel import _Rendezvous
26from structlog import get_logger
27from twisted.internet import reactor
28from twisted.internet import threads
29from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
30
William Kurkianfc0dcda2019-04-08 16:54:36 -040031from voltha_protos.voltha_pb2_grpc import VolthaServiceStub
Manikkaraj kb1a10922019-07-29 12:10:34 -040032from voltha_protos.voltha_pb2 import ID, FlowTableUpdate, MeterModUpdate, \
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050033 FlowGroupTableUpdate, PacketOut
William Kurkianfc0dcda2019-04-08 16:54:36 -040034from voltha_protos.logical_device_pb2 import LogicalPortId
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050035from google.protobuf import empty_pb2
Matteo Scandolo360605d2019-11-05 18:29:17 -080036from binascii import hexlify
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050037
38
39log = get_logger()
40
41
42class GrpcClient(object):
43
khenaidoo43aa6bd2019-05-29 13:35:13 -040044 def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key, core_transaction_key):
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050045
46 self.connection_manager = connection_manager
47 self.channel = channel
48 self.grpc_timeout = grpc_timeout
khenaidoo43aa6bd2019-05-29 13:35:13 -040049 self.grpc_stub = VolthaServiceStub(channel)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050050
Richard Jankowski46464e92019-03-05 11:53:55 -050051 # This is the rw-core cluster to which an OFAgent is bound.
Richard Jankowski2755adf2019-01-17 17:16:48 -050052 # It is the affinity router that forwards all OFAgent
Richard Jankowski46464e92019-03-05 11:53:55 -050053 # requests to a specific rw-core in this back-end cluster.
Richard Jankowski2755adf2019-01-17 17:16:48 -050054 self.core_group_id = ''
Richard Jankowski46464e92019-03-05 11:53:55 -050055 self.core_group_id_key = core_binding_key
Richard Jankowski2755adf2019-01-17 17:16:48 -050056
khenaidoo43aa6bd2019-05-29 13:35:13 -040057 # Since the api-router binds an OFAgent to two RW Cores in a pair and
58 # transparently forward requests between the two then the onus is on
59 # the OFAgent to fulfill part of the function of the api-server which
60 # involves sending a transaction key to both RW Cores for the latter
61 # to figure out which Core will handle the transaction. To prevent
62 # collision between the api-server ID and the one from OFAgent then the
63 # OFAgent ID will be prefixed with "O-".
64 self.core_transaction_key = core_transaction_key
65
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050066 self.stopped = False
67
68 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
69 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
70 self.change_event_queue = DeferredQueue() # queue change events
71
72 def start(self):
Richard Jankowski46464e92019-03-05 11:53:55 -050073 log.debug('starting', grpc_timeout=self.grpc_timeout,
khenaidoo43aa6bd2019-05-29 13:35:13 -040074 core_binding_key=self.core_group_id_key,
75 core_transaction_key=self.core_transaction_key)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050076 self.start_packet_out_stream()
77 self.start_packet_in_stream()
78 self.start_change_event_in_stream()
79 reactor.callLater(0, self.packet_in_forwarder_loop)
80 reactor.callLater(0, self.change_event_processing_loop)
81 log.info('started')
82 return self
83
84 def stop(self):
David Bainbridge006dc842019-11-22 02:05:32 +000085 log.debug('stop requested')
86 if self.stopped:
87 log.debug('already stopped, no action taken')
88 return
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050089 log.debug('stopping')
90 self.stopped = True
David Bainbridge006dc842019-11-22 02:05:32 +000091 self.connection_manager.grpc_client_terminated()
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050092 log.info('stopped')
93
khenaidoo43aa6bd2019-05-29 13:35:13 -040094 def get_core_transaction_metadata(self):
95 return (self.core_transaction_key, "O-" + uuid.uuid4().hex)
96
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050097 def start_packet_out_stream(self):
98
99 def packet_generator():
David Bainbridge006dc842019-11-22 02:05:32 +0000100 while True:
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500101 try:
102 packet = self.packet_out_queue.get(block=True, timeout=1.0)
103 except Empty:
104 if self.stopped:
105 return
106 else:
107 yield packet
108
109 def stream_packets_out():
110 generator = packet_generator()
111 try:
khenaidoo43aa6bd2019-05-29 13:35:13 -0400112 self.grpc_stub.StreamPacketsOut(generator,
113 metadata=((self.core_group_id_key, self.core_group_id),
114 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500115 except _Rendezvous, e:
116 log.error('grpc-exception', status=e.code())
117 if e.code() == StatusCode.UNAVAILABLE:
David Bainbridge006dc842019-11-22 02:05:32 +0000118 self.stop()
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500119
120 reactor.callInThread(stream_packets_out)
121
122 def start_packet_in_stream(self):
123
124 def receive_packet_in_stream():
khenaidoo43aa6bd2019-05-29 13:35:13 -0400125 streaming_rpc_method = self.grpc_stub.ReceivePacketsIn
Richard Jankowskidec93172019-02-12 14:59:28 -0500126 iterator = streaming_rpc_method(empty_pb2.Empty(),
khenaidoo43aa6bd2019-05-29 13:35:13 -0400127 metadata=((self.core_group_id_key, self.core_group_id),
128 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500129 try:
130 for packet_in in iterator:
131 reactor.callFromThread(self.packet_in_queue.put,
132 packet_in)
Richard Jankowskidec93172019-02-12 14:59:28 -0500133 log.debug('enqueued-packet-in',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500134 packet_in=packet_in,
Matteo Scandolo360605d2019-11-05 18:29:17 -0800135 queue_len=len(self.packet_in_queue.pending),
136 packet=hexlify(packet_in.packet_in.data))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500137 except _Rendezvous, e:
138 log.error('grpc-exception', status=e.code())
139 if e.code() == StatusCode.UNAVAILABLE:
David Bainbridge006dc842019-11-22 02:05:32 +0000140 self.stop()
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500141
142 reactor.callInThread(receive_packet_in_stream)
143
144 def start_change_event_in_stream(self):
145
146 def receive_change_events():
khenaidoo43aa6bd2019-05-29 13:35:13 -0400147 streaming_rpc_method = self.grpc_stub.ReceiveChangeEvents
Richard Jankowskidec93172019-02-12 14:59:28 -0500148 iterator = streaming_rpc_method(empty_pb2.Empty(),
khenaidoo43aa6bd2019-05-29 13:35:13 -0400149 metadata=((self.core_group_id_key, self.core_group_id),
150 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500151 try:
152 for event in iterator:
153 reactor.callFromThread(self.change_event_queue.put, event)
Richard Jankowskidec93172019-02-12 14:59:28 -0500154 log.debug('enqueued-change-event',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500155 change_event=event,
156 queue_len=len(self.change_event_queue.pending))
157 except _Rendezvous, e:
158 log.error('grpc-exception', status=e.code())
159 if e.code() == StatusCode.UNAVAILABLE:
David Bainbridge006dc842019-11-22 02:05:32 +0000160 self.stop()
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500161
162 reactor.callInThread(receive_change_events)
163
164 @inlineCallbacks
165 def change_event_processing_loop(self):
166 while True:
167 try:
168 event = yield self.change_event_queue.get()
169 device_id = event.id
170 self.connection_manager.forward_change_event(device_id, event)
171 except Exception, e:
172 log.exception('failed-in-packet-in-handler', e=e)
173 if self.stopped:
174 break
175
176 @inlineCallbacks
177 def packet_in_forwarder_loop(self):
178 while True:
179 packet_in = yield self.packet_in_queue.get()
180 device_id = packet_in.id
181 ofp_packet_in = packet_in.packet_in
Matteo Scandolo360605d2019-11-05 18:29:17 -0800182 log.debug('grpc client to send packet-in', packet=hexlify(packet_in.packet_in.data))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500183 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
184 if self.stopped:
185 break
186
187 def send_packet_out(self, device_id, packet_out):
Matteo Scandolo360605d2019-11-05 18:29:17 -0800188 log.debug('grpc client to send packet-out', packet=hexlify(packet_out.data))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500189 packet_out = PacketOut(id=device_id, packet_out=packet_out)
190 self.packet_out_queue.put(packet_out)
191
192 @inlineCallbacks
193 def get_port(self, device_id, port_id):
194 req = LogicalPortId(id=device_id, port_id=port_id)
195 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400196 self.grpc_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
197 metadata=((self.core_group_id_key, self.core_group_id),
198 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500199 returnValue(res)
200
201 @inlineCallbacks
202 def get_port_list(self, device_id):
203 req = ID(id=device_id)
204 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400205 self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
206 metadata=((self.core_group_id_key, self.core_group_id),
207 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500208 returnValue(res.items)
209
210 @inlineCallbacks
211 def enable_port(self, device_id, port_id):
212 req = LogicalPortId(
213 id=device_id,
214 port_id=port_id
215 )
216 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400217 self.grpc_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
218 metadata=((self.core_group_id_key, self.core_group_id),
219 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500220 returnValue(res)
221
222 @inlineCallbacks
223 def disable_port(self, device_id, port_id):
224 req = LogicalPortId(
225 id=device_id,
226 port_id=port_id
227 )
228 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400229 self.grpc_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
230 metadata=((self.core_group_id_key, self.core_group_id),
231 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500232 returnValue(res)
233
234 @inlineCallbacks
235 def get_device_info(self, device_id):
236 req = ID(id=device_id)
237 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400238 self.grpc_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
239 metadata=((self.core_group_id_key, self.core_group_id),
240 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500241 returnValue(res)
242
243 @inlineCallbacks
244 def update_flow_table(self, device_id, flow_mod):
245 req = FlowTableUpdate(
246 id=device_id,
247 flow_mod=flow_mod
248 )
249 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400250 self.grpc_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
251 metadata=((self.core_group_id_key, self.core_group_id),
252 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500253 returnValue(res)
254
255 @inlineCallbacks
Manikkaraj kb1a10922019-07-29 12:10:34 -0400256 def update_meter_mod_table(self, device_id, meter_mod):
257 log.debug('In update_meter_mod_table grpc')
258 req = MeterModUpdate(
259 id=device_id,
260 meter_mod=meter_mod
261 )
262 res = yield threads.deferToThread(
263 self.grpc_stub.UpdateLogicalDeviceMeterTable, req, timeout=self.grpc_timeout,
264 metadata=((self.core_group_id_key, self.core_group_id),
265 self.get_core_transaction_metadata(),))
266 log.debug('update_meter_mod_table grpc done')
267 returnValue(res)
268
269 @inlineCallbacks
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500270 def update_group_table(self, device_id, group_mod):
271 req = FlowGroupTableUpdate(
272 id=device_id,
273 group_mod=group_mod
274 )
275 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400276 self.grpc_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
277 metadata=((self.core_group_id_key, self.core_group_id),
278 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500279 returnValue(res)
280
281 @inlineCallbacks
282 def list_flows(self, device_id):
283 req = ID(id=device_id)
284 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400285 self.grpc_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
286 metadata=((self.core_group_id_key, self.core_group_id),
287 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500288 returnValue(res.items)
289
290 @inlineCallbacks
291 def list_groups(self, device_id):
292 req = ID(id=device_id)
293 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400294 self.grpc_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
295 metadata=((self.core_group_id_key, self.core_group_id),
296 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500297 returnValue(res.items)
298
299 @inlineCallbacks
300 def list_ports(self, device_id):
301 req = ID(id=device_id)
302 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400303 self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
304 metadata=((self.core_group_id_key, self.core_group_id),
305 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500306 returnValue(res.items)
307
308 @inlineCallbacks
309 def list_logical_devices(self):
310 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400311 self.grpc_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
312 metadata=((self.core_group_id_key, self.core_group_id),
313 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500314 returnValue(res.items)
315
316 @inlineCallbacks
317 def subscribe(self, subscriber):
Richard Jankowski2755adf2019-01-17 17:16:48 -0500318 res, call = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400319 self.grpc_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
320 metadata=((self.core_group_id_key, self.core_group_id),
321 self.get_core_transaction_metadata(),))
Richard Jankowski2755adf2019-01-17 17:16:48 -0500322 returned_metadata = call.initial_metadata()
323
324 # Update the core_group_id if present in the returned metadata
325 if returned_metadata is None:
326 log.debug('header-metadata-missing')
327 else:
328 log.debug('metadata-returned', metadata=returned_metadata)
329 for pair in returned_metadata:
Richard Jankowski46464e92019-03-05 11:53:55 -0500330 if pair[0] == self.core_group_id_key:
Richard Jankowski2755adf2019-01-17 17:16:48 -0500331 self.core_group_id = pair[1]
Richard Jankowski46464e92019-03-05 11:53:55 -0500332 log.debug('core-binding', core_group=self.core_group_id)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500333 returnValue(res)
Manikkaraj kb1a10922019-07-29 12:10:34 -0400334
335 @inlineCallbacks
336 def list_meters(self, device_id):
337 log.debug('list_meters')
338 req = ID(id=device_id)
339 res = yield threads.deferToThread(
340 self.grpc_stub.ListLogicalDeviceMeters, req, timeout=self.grpc_timeout,
341 metadata=((self.core_group_id_key, self.core_group_id),
342 self.get_core_transaction_metadata(),))
343 log.debug('done stat query', resp=res)
344 returnValue(res.items)