blob: 508ce5cb0b28d01b4c0e4cf2d93786c75a987e11 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
20from Queue import Queue, Empty
21import os
khenaidoo43aa6bd2019-05-29 13:35:13 -040022import uuid
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050023
24from grpc import StatusCode
25from grpc._channel import _Rendezvous
26from structlog import get_logger
27from twisted.internet import reactor
28from twisted.internet import threads
29from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
30
William Kurkianfc0dcda2019-04-08 16:54:36 -040031from voltha_protos.voltha_pb2_grpc import VolthaServiceStub
32from voltha_protos.voltha_pb2 import ID, FlowTableUpdate, \
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050033 FlowGroupTableUpdate, PacketOut
William Kurkianfc0dcda2019-04-08 16:54:36 -040034from voltha_protos.logical_device_pb2 import LogicalPortId
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050035from google.protobuf import empty_pb2
36
37
38log = get_logger()
39
40
41class GrpcClient(object):
42
khenaidoo43aa6bd2019-05-29 13:35:13 -040043 def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key, core_transaction_key):
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050044
45 self.connection_manager = connection_manager
46 self.channel = channel
47 self.grpc_timeout = grpc_timeout
khenaidoo43aa6bd2019-05-29 13:35:13 -040048 self.grpc_stub = VolthaServiceStub(channel)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050049
Richard Jankowski46464e92019-03-05 11:53:55 -050050 # This is the rw-core cluster to which an OFAgent is bound.
Richard Jankowski2755adf2019-01-17 17:16:48 -050051 # It is the affinity router that forwards all OFAgent
Richard Jankowski46464e92019-03-05 11:53:55 -050052 # requests to a specific rw-core in this back-end cluster.
Richard Jankowski2755adf2019-01-17 17:16:48 -050053 self.core_group_id = ''
Richard Jankowski46464e92019-03-05 11:53:55 -050054 self.core_group_id_key = core_binding_key
Richard Jankowski2755adf2019-01-17 17:16:48 -050055
khenaidoo43aa6bd2019-05-29 13:35:13 -040056 # Since the api-router binds an OFAgent to two RW Cores in a pair and
57 # transparently forward requests between the two then the onus is on
58 # the OFAgent to fulfill part of the function of the api-server which
59 # involves sending a transaction key to both RW Cores for the latter
60 # to figure out which Core will handle the transaction. To prevent
61 # collision between the api-server ID and the one from OFAgent then the
62 # OFAgent ID will be prefixed with "O-".
63 self.core_transaction_key = core_transaction_key
64
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050065 self.stopped = False
66
67 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
68 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
69 self.change_event_queue = DeferredQueue() # queue change events
70
71 def start(self):
Richard Jankowski46464e92019-03-05 11:53:55 -050072 log.debug('starting', grpc_timeout=self.grpc_timeout,
khenaidoo43aa6bd2019-05-29 13:35:13 -040073 core_binding_key=self.core_group_id_key,
74 core_transaction_key=self.core_transaction_key)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050075 self.start_packet_out_stream()
76 self.start_packet_in_stream()
77 self.start_change_event_in_stream()
78 reactor.callLater(0, self.packet_in_forwarder_loop)
79 reactor.callLater(0, self.change_event_processing_loop)
80 log.info('started')
81 return self
82
83 def stop(self):
84 log.debug('stopping')
85 self.stopped = True
86 log.info('stopped')
87
khenaidoo43aa6bd2019-05-29 13:35:13 -040088 def get_core_transaction_metadata(self):
89 return (self.core_transaction_key, "O-" + uuid.uuid4().hex)
90
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050091 def start_packet_out_stream(self):
92
93 def packet_generator():
94 while 1:
95 try:
96 packet = self.packet_out_queue.get(block=True, timeout=1.0)
97 except Empty:
98 if self.stopped:
99 return
100 else:
101 yield packet
102
103 def stream_packets_out():
104 generator = packet_generator()
105 try:
khenaidoo43aa6bd2019-05-29 13:35:13 -0400106 self.grpc_stub.StreamPacketsOut(generator,
107 metadata=((self.core_group_id_key, self.core_group_id),
108 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500109 except _Rendezvous, e:
110 log.error('grpc-exception', status=e.code())
111 if e.code() == StatusCode.UNAVAILABLE:
112 os.system("kill -15 {}".format(os.getpid()))
113
114 reactor.callInThread(stream_packets_out)
115
116 def start_packet_in_stream(self):
117
118 def receive_packet_in_stream():
khenaidoo43aa6bd2019-05-29 13:35:13 -0400119 streaming_rpc_method = self.grpc_stub.ReceivePacketsIn
Richard Jankowskidec93172019-02-12 14:59:28 -0500120 iterator = streaming_rpc_method(empty_pb2.Empty(),
khenaidoo43aa6bd2019-05-29 13:35:13 -0400121 metadata=((self.core_group_id_key, self.core_group_id),
122 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500123 try:
124 for packet_in in iterator:
125 reactor.callFromThread(self.packet_in_queue.put,
126 packet_in)
Richard Jankowskidec93172019-02-12 14:59:28 -0500127 log.debug('enqueued-packet-in',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500128 packet_in=packet_in,
129 queue_len=len(self.packet_in_queue.pending))
130 except _Rendezvous, e:
131 log.error('grpc-exception', status=e.code())
132 if e.code() == StatusCode.UNAVAILABLE:
133 os.system("kill -15 {}".format(os.getpid()))
134
135 reactor.callInThread(receive_packet_in_stream)
136
137 def start_change_event_in_stream(self):
138
139 def receive_change_events():
khenaidoo43aa6bd2019-05-29 13:35:13 -0400140 streaming_rpc_method = self.grpc_stub.ReceiveChangeEvents
Richard Jankowskidec93172019-02-12 14:59:28 -0500141 iterator = streaming_rpc_method(empty_pb2.Empty(),
khenaidoo43aa6bd2019-05-29 13:35:13 -0400142 metadata=((self.core_group_id_key, self.core_group_id),
143 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500144 try:
145 for event in iterator:
146 reactor.callFromThread(self.change_event_queue.put, event)
Richard Jankowskidec93172019-02-12 14:59:28 -0500147 log.debug('enqueued-change-event',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500148 change_event=event,
149 queue_len=len(self.change_event_queue.pending))
150 except _Rendezvous, e:
151 log.error('grpc-exception', status=e.code())
152 if e.code() == StatusCode.UNAVAILABLE:
153 os.system("kill -15 {}".format(os.getpid()))
154
155 reactor.callInThread(receive_change_events)
156
157 @inlineCallbacks
158 def change_event_processing_loop(self):
159 while True:
160 try:
161 event = yield self.change_event_queue.get()
162 device_id = event.id
163 self.connection_manager.forward_change_event(device_id, event)
164 except Exception, e:
165 log.exception('failed-in-packet-in-handler', e=e)
166 if self.stopped:
167 break
168
169 @inlineCallbacks
170 def packet_in_forwarder_loop(self):
171 while True:
172 packet_in = yield self.packet_in_queue.get()
173 device_id = packet_in.id
174 ofp_packet_in = packet_in.packet_in
175 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
176 if self.stopped:
177 break
178
179 def send_packet_out(self, device_id, packet_out):
180 packet_out = PacketOut(id=device_id, packet_out=packet_out)
181 self.packet_out_queue.put(packet_out)
182
183 @inlineCallbacks
184 def get_port(self, device_id, port_id):
185 req = LogicalPortId(id=device_id, port_id=port_id)
186 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400187 self.grpc_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
188 metadata=((self.core_group_id_key, self.core_group_id),
189 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500190 returnValue(res)
191
192 @inlineCallbacks
193 def get_port_list(self, device_id):
194 req = ID(id=device_id)
195 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400196 self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
197 metadata=((self.core_group_id_key, self.core_group_id),
198 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500199 returnValue(res.items)
200
201 @inlineCallbacks
202 def enable_port(self, device_id, port_id):
203 req = LogicalPortId(
204 id=device_id,
205 port_id=port_id
206 )
207 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400208 self.grpc_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
209 metadata=((self.core_group_id_key, self.core_group_id),
210 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500211 returnValue(res)
212
213 @inlineCallbacks
214 def disable_port(self, device_id, port_id):
215 req = LogicalPortId(
216 id=device_id,
217 port_id=port_id
218 )
219 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400220 self.grpc_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
221 metadata=((self.core_group_id_key, self.core_group_id),
222 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500223 returnValue(res)
224
225 @inlineCallbacks
226 def get_device_info(self, device_id):
227 req = ID(id=device_id)
228 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400229 self.grpc_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
230 metadata=((self.core_group_id_key, self.core_group_id),
231 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500232 returnValue(res)
233
234 @inlineCallbacks
235 def update_flow_table(self, device_id, flow_mod):
236 req = FlowTableUpdate(
237 id=device_id,
238 flow_mod=flow_mod
239 )
240 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400241 self.grpc_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
242 metadata=((self.core_group_id_key, self.core_group_id),
243 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500244 returnValue(res)
245
246 @inlineCallbacks
247 def update_group_table(self, device_id, group_mod):
248 req = FlowGroupTableUpdate(
249 id=device_id,
250 group_mod=group_mod
251 )
252 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400253 self.grpc_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
254 metadata=((self.core_group_id_key, self.core_group_id),
255 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500256 returnValue(res)
257
258 @inlineCallbacks
259 def list_flows(self, device_id):
260 req = ID(id=device_id)
261 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400262 self.grpc_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
263 metadata=((self.core_group_id_key, self.core_group_id),
264 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500265 returnValue(res.items)
266
267 @inlineCallbacks
268 def list_groups(self, device_id):
269 req = ID(id=device_id)
270 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400271 self.grpc_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
272 metadata=((self.core_group_id_key, self.core_group_id),
273 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500274 returnValue(res.items)
275
276 @inlineCallbacks
277 def list_ports(self, device_id):
278 req = ID(id=device_id)
279 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400280 self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
281 metadata=((self.core_group_id_key, self.core_group_id),
282 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500283 returnValue(res.items)
284
285 @inlineCallbacks
286 def list_logical_devices(self):
287 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400288 self.grpc_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
289 metadata=((self.core_group_id_key, self.core_group_id),
290 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500291 returnValue(res.items)
292
293 @inlineCallbacks
294 def subscribe(self, subscriber):
Richard Jankowski2755adf2019-01-17 17:16:48 -0500295 res, call = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400296 self.grpc_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
297 metadata=((self.core_group_id_key, self.core_group_id),
298 self.get_core_transaction_metadata(),))
Richard Jankowski2755adf2019-01-17 17:16:48 -0500299 returned_metadata = call.initial_metadata()
300
301 # Update the core_group_id if present in the returned metadata
302 if returned_metadata is None:
303 log.debug('header-metadata-missing')
304 else:
305 log.debug('metadata-returned', metadata=returned_metadata)
306 for pair in returned_metadata:
Richard Jankowski46464e92019-03-05 11:53:55 -0500307 if pair[0] == self.core_group_id_key:
Richard Jankowski2755adf2019-01-17 17:16:48 -0500308 self.core_group_id = pair[1]
Richard Jankowski46464e92019-03-05 11:53:55 -0500309 log.debug('core-binding', core_group=self.core_group_id)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500310 returnValue(res)