blob: c8b2580ef0ff336f180cfa96fda5e24dd0e2f8c8 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
20from Queue import Queue, Empty
21import os
khenaidoo43aa6bd2019-05-29 13:35:13 -040022import uuid
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050023
24from grpc import StatusCode
25from grpc._channel import _Rendezvous
26from structlog import get_logger
27from twisted.internet import reactor
28from twisted.internet import threads
29from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
30
William Kurkianfc0dcda2019-04-08 16:54:36 -040031from voltha_protos.voltha_pb2_grpc import VolthaServiceStub
Manikkaraj kb1a10922019-07-29 12:10:34 -040032from voltha_protos.voltha_pb2 import ID, FlowTableUpdate, MeterModUpdate, \
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050033 FlowGroupTableUpdate, PacketOut
William Kurkianfc0dcda2019-04-08 16:54:36 -040034from voltha_protos.logical_device_pb2 import LogicalPortId
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050035from google.protobuf import empty_pb2
Matteo Scandolo360605d2019-11-05 18:29:17 -080036from binascii import hexlify
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050037
38
39log = get_logger()
40
41
42class GrpcClient(object):
43
khenaidoo43aa6bd2019-05-29 13:35:13 -040044 def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key, core_transaction_key):
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050045
46 self.connection_manager = connection_manager
47 self.channel = channel
48 self.grpc_timeout = grpc_timeout
khenaidoo43aa6bd2019-05-29 13:35:13 -040049 self.grpc_stub = VolthaServiceStub(channel)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050050
Richard Jankowski46464e92019-03-05 11:53:55 -050051 # This is the rw-core cluster to which an OFAgent is bound.
Richard Jankowski2755adf2019-01-17 17:16:48 -050052 # It is the affinity router that forwards all OFAgent
Richard Jankowski46464e92019-03-05 11:53:55 -050053 # requests to a specific rw-core in this back-end cluster.
Richard Jankowski2755adf2019-01-17 17:16:48 -050054 self.core_group_id = ''
Richard Jankowski46464e92019-03-05 11:53:55 -050055 self.core_group_id_key = core_binding_key
Richard Jankowski2755adf2019-01-17 17:16:48 -050056
khenaidoo43aa6bd2019-05-29 13:35:13 -040057 # Since the api-router binds an OFAgent to two RW Cores in a pair and
58 # transparently forward requests between the two then the onus is on
59 # the OFAgent to fulfill part of the function of the api-server which
60 # involves sending a transaction key to both RW Cores for the latter
61 # to figure out which Core will handle the transaction. To prevent
62 # collision between the api-server ID and the one from OFAgent then the
63 # OFAgent ID will be prefixed with "O-".
64 self.core_transaction_key = core_transaction_key
65
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050066 self.stopped = False
67
68 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
69 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
70 self.change_event_queue = DeferredQueue() # queue change events
71
72 def start(self):
Richard Jankowski46464e92019-03-05 11:53:55 -050073 log.debug('starting', grpc_timeout=self.grpc_timeout,
khenaidoo43aa6bd2019-05-29 13:35:13 -040074 core_binding_key=self.core_group_id_key,
75 core_transaction_key=self.core_transaction_key)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050076 self.start_packet_out_stream()
77 self.start_packet_in_stream()
78 self.start_change_event_in_stream()
79 reactor.callLater(0, self.packet_in_forwarder_loop)
80 reactor.callLater(0, self.change_event_processing_loop)
81 log.info('started')
82 return self
83
84 def stop(self):
85 log.debug('stopping')
86 self.stopped = True
87 log.info('stopped')
88
khenaidoo43aa6bd2019-05-29 13:35:13 -040089 def get_core_transaction_metadata(self):
90 return (self.core_transaction_key, "O-" + uuid.uuid4().hex)
91
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050092 def start_packet_out_stream(self):
93
94 def packet_generator():
95 while 1:
96 try:
97 packet = self.packet_out_queue.get(block=True, timeout=1.0)
98 except Empty:
99 if self.stopped:
100 return
101 else:
102 yield packet
103
104 def stream_packets_out():
105 generator = packet_generator()
106 try:
khenaidoo43aa6bd2019-05-29 13:35:13 -0400107 self.grpc_stub.StreamPacketsOut(generator,
108 metadata=((self.core_group_id_key, self.core_group_id),
109 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500110 except _Rendezvous, e:
111 log.error('grpc-exception', status=e.code())
112 if e.code() == StatusCode.UNAVAILABLE:
113 os.system("kill -15 {}".format(os.getpid()))
114
115 reactor.callInThread(stream_packets_out)
116
117 def start_packet_in_stream(self):
118
119 def receive_packet_in_stream():
khenaidoo43aa6bd2019-05-29 13:35:13 -0400120 streaming_rpc_method = self.grpc_stub.ReceivePacketsIn
Richard Jankowskidec93172019-02-12 14:59:28 -0500121 iterator = streaming_rpc_method(empty_pb2.Empty(),
khenaidoo43aa6bd2019-05-29 13:35:13 -0400122 metadata=((self.core_group_id_key, self.core_group_id),
123 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500124 try:
125 for packet_in in iterator:
126 reactor.callFromThread(self.packet_in_queue.put,
127 packet_in)
Richard Jankowskidec93172019-02-12 14:59:28 -0500128 log.debug('enqueued-packet-in',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500129 packet_in=packet_in,
Matteo Scandolo360605d2019-11-05 18:29:17 -0800130 queue_len=len(self.packet_in_queue.pending),
131 packet=hexlify(packet_in.packet_in.data))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500132 except _Rendezvous, e:
133 log.error('grpc-exception', status=e.code())
134 if e.code() == StatusCode.UNAVAILABLE:
135 os.system("kill -15 {}".format(os.getpid()))
136
137 reactor.callInThread(receive_packet_in_stream)
138
139 def start_change_event_in_stream(self):
140
141 def receive_change_events():
khenaidoo43aa6bd2019-05-29 13:35:13 -0400142 streaming_rpc_method = self.grpc_stub.ReceiveChangeEvents
Richard Jankowskidec93172019-02-12 14:59:28 -0500143 iterator = streaming_rpc_method(empty_pb2.Empty(),
khenaidoo43aa6bd2019-05-29 13:35:13 -0400144 metadata=((self.core_group_id_key, self.core_group_id),
145 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500146 try:
147 for event in iterator:
148 reactor.callFromThread(self.change_event_queue.put, event)
Richard Jankowskidec93172019-02-12 14:59:28 -0500149 log.debug('enqueued-change-event',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500150 change_event=event,
151 queue_len=len(self.change_event_queue.pending))
152 except _Rendezvous, e:
153 log.error('grpc-exception', status=e.code())
154 if e.code() == StatusCode.UNAVAILABLE:
155 os.system("kill -15 {}".format(os.getpid()))
156
157 reactor.callInThread(receive_change_events)
158
159 @inlineCallbacks
160 def change_event_processing_loop(self):
161 while True:
162 try:
163 event = yield self.change_event_queue.get()
164 device_id = event.id
165 self.connection_manager.forward_change_event(device_id, event)
166 except Exception, e:
167 log.exception('failed-in-packet-in-handler', e=e)
168 if self.stopped:
169 break
170
171 @inlineCallbacks
172 def packet_in_forwarder_loop(self):
173 while True:
174 packet_in = yield self.packet_in_queue.get()
175 device_id = packet_in.id
176 ofp_packet_in = packet_in.packet_in
Matteo Scandolo360605d2019-11-05 18:29:17 -0800177 log.debug('grpc client to send packet-in', packet=hexlify(packet_in.packet_in.data))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500178 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
179 if self.stopped:
180 break
181
182 def send_packet_out(self, device_id, packet_out):
Matteo Scandolo360605d2019-11-05 18:29:17 -0800183 log.debug('grpc client to send packet-out', packet=hexlify(packet_out.data))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500184 packet_out = PacketOut(id=device_id, packet_out=packet_out)
185 self.packet_out_queue.put(packet_out)
186
187 @inlineCallbacks
188 def get_port(self, device_id, port_id):
189 req = LogicalPortId(id=device_id, port_id=port_id)
190 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400191 self.grpc_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
192 metadata=((self.core_group_id_key, self.core_group_id),
193 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500194 returnValue(res)
195
196 @inlineCallbacks
197 def get_port_list(self, device_id):
198 req = ID(id=device_id)
199 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400200 self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
201 metadata=((self.core_group_id_key, self.core_group_id),
202 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500203 returnValue(res.items)
204
205 @inlineCallbacks
206 def enable_port(self, device_id, port_id):
207 req = LogicalPortId(
208 id=device_id,
209 port_id=port_id
210 )
211 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400212 self.grpc_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
213 metadata=((self.core_group_id_key, self.core_group_id),
214 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500215 returnValue(res)
216
217 @inlineCallbacks
218 def disable_port(self, device_id, port_id):
219 req = LogicalPortId(
220 id=device_id,
221 port_id=port_id
222 )
223 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400224 self.grpc_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
225 metadata=((self.core_group_id_key, self.core_group_id),
226 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500227 returnValue(res)
228
229 @inlineCallbacks
230 def get_device_info(self, device_id):
231 req = ID(id=device_id)
232 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400233 self.grpc_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
234 metadata=((self.core_group_id_key, self.core_group_id),
235 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500236 returnValue(res)
237
238 @inlineCallbacks
239 def update_flow_table(self, device_id, flow_mod):
240 req = FlowTableUpdate(
241 id=device_id,
242 flow_mod=flow_mod
243 )
244 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400245 self.grpc_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
246 metadata=((self.core_group_id_key, self.core_group_id),
247 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500248 returnValue(res)
249
250 @inlineCallbacks
Manikkaraj kb1a10922019-07-29 12:10:34 -0400251 def update_meter_mod_table(self, device_id, meter_mod):
252 log.debug('In update_meter_mod_table grpc')
253 req = MeterModUpdate(
254 id=device_id,
255 meter_mod=meter_mod
256 )
257 res = yield threads.deferToThread(
258 self.grpc_stub.UpdateLogicalDeviceMeterTable, req, timeout=self.grpc_timeout,
259 metadata=((self.core_group_id_key, self.core_group_id),
260 self.get_core_transaction_metadata(),))
261 log.debug('update_meter_mod_table grpc done')
262 returnValue(res)
263
264 @inlineCallbacks
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500265 def update_group_table(self, device_id, group_mod):
266 req = FlowGroupTableUpdate(
267 id=device_id,
268 group_mod=group_mod
269 )
270 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400271 self.grpc_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
272 metadata=((self.core_group_id_key, self.core_group_id),
273 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500274 returnValue(res)
275
276 @inlineCallbacks
277 def list_flows(self, device_id):
278 req = ID(id=device_id)
279 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400280 self.grpc_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
281 metadata=((self.core_group_id_key, self.core_group_id),
282 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500283 returnValue(res.items)
284
285 @inlineCallbacks
286 def list_groups(self, device_id):
287 req = ID(id=device_id)
288 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400289 self.grpc_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
290 metadata=((self.core_group_id_key, self.core_group_id),
291 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500292 returnValue(res.items)
293
294 @inlineCallbacks
295 def list_ports(self, device_id):
296 req = ID(id=device_id)
297 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400298 self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
299 metadata=((self.core_group_id_key, self.core_group_id),
300 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500301 returnValue(res.items)
302
303 @inlineCallbacks
304 def list_logical_devices(self):
305 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400306 self.grpc_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
307 metadata=((self.core_group_id_key, self.core_group_id),
308 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500309 returnValue(res.items)
310
311 @inlineCallbacks
312 def subscribe(self, subscriber):
Richard Jankowski2755adf2019-01-17 17:16:48 -0500313 res, call = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400314 self.grpc_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
315 metadata=((self.core_group_id_key, self.core_group_id),
316 self.get_core_transaction_metadata(),))
Richard Jankowski2755adf2019-01-17 17:16:48 -0500317 returned_metadata = call.initial_metadata()
318
319 # Update the core_group_id if present in the returned metadata
320 if returned_metadata is None:
321 log.debug('header-metadata-missing')
322 else:
323 log.debug('metadata-returned', metadata=returned_metadata)
324 for pair in returned_metadata:
Richard Jankowski46464e92019-03-05 11:53:55 -0500325 if pair[0] == self.core_group_id_key:
Richard Jankowski2755adf2019-01-17 17:16:48 -0500326 self.core_group_id = pair[1]
Richard Jankowski46464e92019-03-05 11:53:55 -0500327 log.debug('core-binding', core_group=self.core_group_id)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500328 returnValue(res)
Manikkaraj kb1a10922019-07-29 12:10:34 -0400329
330 @inlineCallbacks
331 def list_meters(self, device_id):
332 log.debug('list_meters')
333 req = ID(id=device_id)
334 res = yield threads.deferToThread(
335 self.grpc_stub.ListLogicalDeviceMeters, req, timeout=self.grpc_timeout,
336 metadata=((self.core_group_id_key, self.core_group_id),
337 self.get_core_transaction_metadata(),))
338 log.debug('done stat query', resp=res)
339 returnValue(res.items)