blob: 8e7b3937de4094a898f8c87261a389429dfca385 [file] [log] [blame]
Zack Williams9731cdc2019-11-22 15:42:30 -07001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
20from Queue import Queue, Empty
21import os
22import uuid
23
24from grpc import StatusCode
25from grpc._channel import _Rendezvous
26from structlog import get_logger
27from twisted.internet import reactor
28from twisted.internet import threads
29from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
30
31from voltha_protos.voltha_pb2_grpc import VolthaServiceStub
32from voltha_protos.voltha_pb2 import ID, FlowTableUpdate, MeterModUpdate, \
33 FlowGroupTableUpdate, PacketOut
34from voltha_protos.logical_device_pb2 import LogicalPortId
35from google.protobuf import empty_pb2
36from binascii import hexlify
37
38
39log = get_logger()
40
41
42class GrpcClient(object):
43
44 def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key, core_transaction_key):
45
46 self.connection_manager = connection_manager
47 self.channel = channel
48 self.grpc_timeout = grpc_timeout
49 self.grpc_stub = VolthaServiceStub(channel)
50
51 # This is the rw-core cluster to which an OFAgent is bound.
52 # It is the affinity router that forwards all OFAgent
53 # requests to a specific rw-core in this back-end cluster.
54 self.core_group_id = ''
55 self.core_group_id_key = core_binding_key
56
57 # Since the api-router binds an OFAgent to two RW Cores in a pair and
58 # transparently forward requests between the two then the onus is on
59 # the OFAgent to fulfill part of the function of the api-server which
60 # involves sending a transaction key to both RW Cores for the latter
61 # to figure out which Core will handle the transaction. To prevent
62 # collision between the api-server ID and the one from OFAgent then the
63 # OFAgent ID will be prefixed with "O-".
64 self.core_transaction_key = core_transaction_key
65
66 self.stopped = False
67
68 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
69 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
70 self.change_event_queue = DeferredQueue() # queue change events
71
72 def start(self):
73 log.debug('starting', grpc_timeout=self.grpc_timeout,
74 core_binding_key=self.core_group_id_key,
75 core_transaction_key=self.core_transaction_key)
76 self.start_packet_out_stream()
77 self.start_packet_in_stream()
78 self.start_change_event_in_stream()
79 reactor.callLater(0, self.packet_in_forwarder_loop)
80 reactor.callLater(0, self.change_event_processing_loop)
81 log.info('started')
82 return self
83
84 def stop(self):
85 log.debug('stop requested')
86 if self.stopped:
87 log.debug('already stopped, no action taken')
88 return
89 log.debug('stopping')
90 self.stopped = True
91 self.connection_manager.grpc_client_terminated()
92 log.info('stopped')
93
94 def get_core_transaction_metadata(self):
95 return (self.core_transaction_key, "O-" + uuid.uuid4().hex)
96
97 def start_packet_out_stream(self):
98
99 def packet_generator():
100 while True:
101 try:
102 packet = self.packet_out_queue.get(block=True, timeout=1.0)
103 except Empty:
104 if self.stopped:
105 return
106 else:
107 yield packet
108
109 def stream_packets_out():
110 generator = packet_generator()
111 try:
112 self.grpc_stub.StreamPacketsOut(generator,
113 metadata=((self.core_group_id_key, self.core_group_id),
114 self.get_core_transaction_metadata(),))
115 except _Rendezvous, e:
116 log.error('grpc-exception', status=e.code())
117 if e.code() == StatusCode.UNAVAILABLE:
118 self.stop()
119
120 reactor.callInThread(stream_packets_out)
121
122 def start_packet_in_stream(self):
123
124 def receive_packet_in_stream():
125 streaming_rpc_method = self.grpc_stub.ReceivePacketsIn
126 iterator = streaming_rpc_method(empty_pb2.Empty(),
127 metadata=((self.core_group_id_key, self.core_group_id),
128 self.get_core_transaction_metadata(),))
129 try:
130 for packet_in in iterator:
131 reactor.callFromThread(self.packet_in_queue.put,
132 packet_in)
133 log.debug('enqueued-packet-in',
134 packet_in=packet_in,
135 queue_len=len(self.packet_in_queue.pending),
136 packet=hexlify(packet_in.packet_in.data))
137 except _Rendezvous, e:
138 log.error('grpc-exception', status=e.code())
139 if e.code() == StatusCode.UNAVAILABLE:
140 self.stop()
141
142 reactor.callInThread(receive_packet_in_stream)
143
144 def start_change_event_in_stream(self):
145
146 def receive_change_events():
147 streaming_rpc_method = self.grpc_stub.ReceiveChangeEvents
148 iterator = streaming_rpc_method(empty_pb2.Empty(),
149 metadata=((self.core_group_id_key, self.core_group_id),
150 self.get_core_transaction_metadata(),))
151 try:
152 for event in iterator:
153 reactor.callFromThread(self.change_event_queue.put, event)
154 log.debug('enqueued-change-event',
155 change_event=event,
156 queue_len=len(self.change_event_queue.pending))
157 except _Rendezvous, e:
158 log.error('grpc-exception', status=e.code())
159 if e.code() == StatusCode.UNAVAILABLE:
160 self.stop()
161
162 reactor.callInThread(receive_change_events)
163
164 @inlineCallbacks
165 def change_event_processing_loop(self):
166 while True:
167 try:
168 event = yield self.change_event_queue.get()
169 device_id = event.id
170 self.connection_manager.forward_change_event(device_id, event)
171 except Exception, e:
172 log.exception('failed-in-packet-in-handler', e=e)
173 if self.stopped:
174 break
175
176 @inlineCallbacks
177 def packet_in_forwarder_loop(self):
178 while True:
179 packet_in = yield self.packet_in_queue.get()
180 device_id = packet_in.id
181 ofp_packet_in = packet_in.packet_in
182 log.debug('grpc client to send packet-in', packet=hexlify(packet_in.packet_in.data))
183 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
184 if self.stopped:
185 break
186
187 def send_packet_out(self, device_id, packet_out):
188 log.debug('grpc client to send packet-out', packet=hexlify(packet_out.data))
189 packet_out = PacketOut(id=device_id, packet_out=packet_out)
190 self.packet_out_queue.put(packet_out)
191
192 @inlineCallbacks
193 def get_port(self, device_id, port_id):
194 req = LogicalPortId(id=device_id, port_id=port_id)
195 res = yield threads.deferToThread(
196 self.grpc_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
197 metadata=((self.core_group_id_key, self.core_group_id),
198 self.get_core_transaction_metadata(),))
199 returnValue(res)
200
201 @inlineCallbacks
202 def get_port_list(self, device_id):
203 req = ID(id=device_id)
204 res = yield threads.deferToThread(
205 self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
206 metadata=((self.core_group_id_key, self.core_group_id),
207 self.get_core_transaction_metadata(),))
208 returnValue(res.items)
209
210 @inlineCallbacks
211 def enable_port(self, device_id, port_id):
212 req = LogicalPortId(
213 id=device_id,
214 port_id=port_id
215 )
216 res = yield threads.deferToThread(
217 self.grpc_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
218 metadata=((self.core_group_id_key, self.core_group_id),
219 self.get_core_transaction_metadata(),))
220 returnValue(res)
221
222 @inlineCallbacks
223 def disable_port(self, device_id, port_id):
224 req = LogicalPortId(
225 id=device_id,
226 port_id=port_id
227 )
228 res = yield threads.deferToThread(
229 self.grpc_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
230 metadata=((self.core_group_id_key, self.core_group_id),
231 self.get_core_transaction_metadata(),))
232 returnValue(res)
233
234 @inlineCallbacks
235 def get_device_info(self, device_id):
236 req = ID(id=device_id)
237 res = yield threads.deferToThread(
238 self.grpc_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
239 metadata=((self.core_group_id_key, self.core_group_id),
240 self.get_core_transaction_metadata(),))
241 returnValue(res)
242
243 @inlineCallbacks
244 def update_flow_table(self, device_id, flow_mod):
245 req = FlowTableUpdate(
246 id=device_id,
247 flow_mod=flow_mod
248 )
249 res = yield threads.deferToThread(
250 self.grpc_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
251 metadata=((self.core_group_id_key, self.core_group_id),
252 self.get_core_transaction_metadata(),))
253 returnValue(res)
254
255 @inlineCallbacks
256 def update_meter_mod_table(self, device_id, meter_mod):
257 log.debug('In update_meter_mod_table grpc')
258 req = MeterModUpdate(
259 id=device_id,
260 meter_mod=meter_mod
261 )
262 res = yield threads.deferToThread(
263 self.grpc_stub.UpdateLogicalDeviceMeterTable, req, timeout=self.grpc_timeout,
264 metadata=((self.core_group_id_key, self.core_group_id),
265 self.get_core_transaction_metadata(),))
266 log.debug('update_meter_mod_table grpc done')
267 returnValue(res)
268
269 @inlineCallbacks
270 def update_group_table(self, device_id, group_mod):
271 req = FlowGroupTableUpdate(
272 id=device_id,
273 group_mod=group_mod
274 )
275 res = yield threads.deferToThread(
276 self.grpc_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
277 metadata=((self.core_group_id_key, self.core_group_id),
278 self.get_core_transaction_metadata(),))
279 returnValue(res)
280
281 @inlineCallbacks
282 def list_flows(self, device_id):
283 req = ID(id=device_id)
284 res = yield threads.deferToThread(
285 self.grpc_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
286 metadata=((self.core_group_id_key, self.core_group_id),
287 self.get_core_transaction_metadata(),))
288 returnValue(res.items)
289
290 @inlineCallbacks
291 def list_groups(self, device_id):
292 req = ID(id=device_id)
293 res = yield threads.deferToThread(
294 self.grpc_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
295 metadata=((self.core_group_id_key, self.core_group_id),
296 self.get_core_transaction_metadata(),))
297 returnValue(res.items)
298
299 @inlineCallbacks
300 def list_ports(self, device_id):
301 req = ID(id=device_id)
302 res = yield threads.deferToThread(
303 self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
304 metadata=((self.core_group_id_key, self.core_group_id),
305 self.get_core_transaction_metadata(),))
306 returnValue(res.items)
307
308 @inlineCallbacks
309 def list_logical_devices(self):
310 res = yield threads.deferToThread(
311 self.grpc_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
312 metadata=((self.core_group_id_key, self.core_group_id),
313 self.get_core_transaction_metadata(),))
314 returnValue(res.items)
315
316 @inlineCallbacks
317 def subscribe(self, subscriber):
318 res, call = yield threads.deferToThread(
319 self.grpc_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
320 metadata=((self.core_group_id_key, self.core_group_id),
321 self.get_core_transaction_metadata(),))
322 returned_metadata = call.initial_metadata()
323
324 # Update the core_group_id if present in the returned metadata
325 if returned_metadata is None:
326 log.debug('header-metadata-missing')
327 else:
328 log.debug('metadata-returned', metadata=returned_metadata)
329 for pair in returned_metadata:
330 if pair[0] == self.core_group_id_key:
331 self.core_group_id = pair[1]
332 log.debug('core-binding', core_group=self.core_group_id)
333 returnValue(res)
334
335 @inlineCallbacks
336 def list_meters(self, device_id):
337 log.debug('list_meters')
338 req = ID(id=device_id)
339 res = yield threads.deferToThread(
340 self.grpc_stub.ListLogicalDeviceMeters, req, timeout=self.grpc_timeout,
341 metadata=((self.core_group_id_key, self.core_group_id),
342 self.get_core_transaction_metadata(),))
343 log.debug('done stat query', resp=res)
344 returnValue(res.items)