blob: 7f6d274c168261e5bc4bce50287196ac237c100a [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
20from Queue import Queue, Empty
21import os
khenaidoo43aa6bd2019-05-29 13:35:13 -040022import uuid
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050023
24from grpc import StatusCode
25from grpc._channel import _Rendezvous
26from structlog import get_logger
27from twisted.internet import reactor
28from twisted.internet import threads
29from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
30
William Kurkianfc0dcda2019-04-08 16:54:36 -040031from voltha_protos.voltha_pb2_grpc import VolthaServiceStub
32from voltha_protos.voltha_pb2 import ID, FlowTableUpdate, \
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050033 FlowGroupTableUpdate, PacketOut
William Kurkianfc0dcda2019-04-08 16:54:36 -040034from voltha_protos.logical_device_pb2 import LogicalPortId
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050035from google.protobuf import empty_pb2
36
37
38log = get_logger()
39
40
41class GrpcClient(object):
42
khenaidoo43aa6bd2019-05-29 13:35:13 -040043 def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key, core_transaction_key):
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050044
45 self.connection_manager = connection_manager
46 self.channel = channel
47 self.grpc_timeout = grpc_timeout
khenaidoo43aa6bd2019-05-29 13:35:13 -040048 self.grpc_stub = VolthaServiceStub(channel)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050049
Richard Jankowski46464e92019-03-05 11:53:55 -050050 # This is the rw-core cluster to which an OFAgent is bound.
Richard Jankowski2755adf2019-01-17 17:16:48 -050051 # It is the affinity router that forwards all OFAgent
Richard Jankowski46464e92019-03-05 11:53:55 -050052 # requests to a specific rw-core in this back-end cluster.
Richard Jankowski2755adf2019-01-17 17:16:48 -050053 self.core_group_id = ''
Richard Jankowski46464e92019-03-05 11:53:55 -050054 self.core_group_id_key = core_binding_key
Richard Jankowski2755adf2019-01-17 17:16:48 -050055
khenaidoo43aa6bd2019-05-29 13:35:13 -040056 # Since the api-router binds an OFAgent to two RW Cores in a pair and
57 # transparently forward requests between the two then the onus is on
58 # the OFAgent to fulfill part of the function of the api-server which
59 # involves sending a transaction key to both RW Cores for the latter
60 # to figure out which Core will handle the transaction. To prevent
61 # collision between the api-server ID and the one from OFAgent then the
62 # OFAgent ID will be prefixed with "O-".
63 self.core_transaction_key = core_transaction_key
64
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050065 self.stopped = False
66
67 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
68 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
69 self.change_event_queue = DeferredQueue() # queue change events
70
71 def start(self):
Richard Jankowski46464e92019-03-05 11:53:55 -050072 log.debug('starting', grpc_timeout=self.grpc_timeout,
khenaidoo43aa6bd2019-05-29 13:35:13 -040073 core_binding_key=self.core_group_id_key,
74 core_transaction_key=self.core_transaction_key)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050075 self.start_packet_out_stream()
76 self.start_packet_in_stream()
77 self.start_change_event_in_stream()
78 reactor.callLater(0, self.packet_in_forwarder_loop)
79 reactor.callLater(0, self.change_event_processing_loop)
80 log.info('started')
81 return self
82
83 def stop(self):
84 log.debug('stopping')
85 self.stopped = True
86 log.info('stopped')
87
khenaidoo43aa6bd2019-05-29 13:35:13 -040088 def get_core_transaction_metadata(self):
89 return (self.core_transaction_key, "O-" + uuid.uuid4().hex)
90
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050091 def start_packet_out_stream(self):
92
93 def packet_generator():
94 while 1:
95 try:
96 packet = self.packet_out_queue.get(block=True, timeout=1.0)
97 except Empty:
98 if self.stopped:
99 return
100 else:
101 yield packet
102
103 def stream_packets_out():
104 generator = packet_generator()
105 try:
khenaidoo43aa6bd2019-05-29 13:35:13 -0400106 self.grpc_stub.StreamPacketsOut(generator,
107 metadata=((self.core_group_id_key, self.core_group_id),
108 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500109 except _Rendezvous, e:
110 log.error('grpc-exception', status=e.code())
111 if e.code() == StatusCode.UNAVAILABLE:
112 os.system("kill -15 {}".format(os.getpid()))
113
114 reactor.callInThread(stream_packets_out)
115
116 def start_packet_in_stream(self):
117
118 def receive_packet_in_stream():
khenaidoo43aa6bd2019-05-29 13:35:13 -0400119 streaming_rpc_method = self.grpc_stub.ReceivePacketsIn
Richard Jankowskidec93172019-02-12 14:59:28 -0500120 iterator = streaming_rpc_method(empty_pb2.Empty(),
khenaidoo43aa6bd2019-05-29 13:35:13 -0400121 metadata=((self.core_group_id_key, self.core_group_id),
122 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500123 try:
124 for packet_in in iterator:
125 reactor.callFromThread(self.packet_in_queue.put,
126 packet_in)
Richard Jankowskidec93172019-02-12 14:59:28 -0500127 log.debug('enqueued-packet-in',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500128 packet_in=packet_in,
129 queue_len=len(self.packet_in_queue.pending))
130 except _Rendezvous, e:
131 log.error('grpc-exception', status=e.code())
132 if e.code() == StatusCode.UNAVAILABLE:
133 os.system("kill -15 {}".format(os.getpid()))
134
135 reactor.callInThread(receive_packet_in_stream)
136
137 def start_change_event_in_stream(self):
138
139 def receive_change_events():
khenaidoo43aa6bd2019-05-29 13:35:13 -0400140 streaming_rpc_method = self.grpc_stub.ReceiveChangeEvents
Richard Jankowskidec93172019-02-12 14:59:28 -0500141 iterator = streaming_rpc_method(empty_pb2.Empty(),
khenaidoo43aa6bd2019-05-29 13:35:13 -0400142 metadata=((self.core_group_id_key, self.core_group_id),
143 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500144 try:
145 for event in iterator:
146 reactor.callFromThread(self.change_event_queue.put, event)
Richard Jankowskidec93172019-02-12 14:59:28 -0500147 log.debug('enqueued-change-event',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500148 change_event=event,
149 queue_len=len(self.change_event_queue.pending))
150 except _Rendezvous, e:
151 log.error('grpc-exception', status=e.code())
152 if e.code() == StatusCode.UNAVAILABLE:
153 os.system("kill -15 {}".format(os.getpid()))
154
155 reactor.callInThread(receive_change_events)
156
157 @inlineCallbacks
158 def change_event_processing_loop(self):
159 while True:
160 try:
161 event = yield self.change_event_queue.get()
162 device_id = event.id
163 self.connection_manager.forward_change_event(device_id, event)
164 except Exception, e:
165 log.exception('failed-in-packet-in-handler', e=e)
166 if self.stopped:
167 break
168
169 @inlineCallbacks
170 def packet_in_forwarder_loop(self):
171 while True:
172 packet_in = yield self.packet_in_queue.get()
173 device_id = packet_in.id
174 ofp_packet_in = packet_in.packet_in
Daniele Rossi9ab99b72019-07-12 08:51:55 +0000175 self.log.debug('grpc client to send packet-in')
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500176 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
177 if self.stopped:
178 break
179
180 def send_packet_out(self, device_id, packet_out):
Daniele Rossi9ab99b72019-07-12 08:51:55 +0000181 self.log.debug('grpc client to send packet-out')
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500182 packet_out = PacketOut(id=device_id, packet_out=packet_out)
183 self.packet_out_queue.put(packet_out)
184
185 @inlineCallbacks
186 def get_port(self, device_id, port_id):
187 req = LogicalPortId(id=device_id, port_id=port_id)
188 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400189 self.grpc_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
190 metadata=((self.core_group_id_key, self.core_group_id),
191 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500192 returnValue(res)
193
194 @inlineCallbacks
195 def get_port_list(self, device_id):
196 req = ID(id=device_id)
197 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400198 self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
199 metadata=((self.core_group_id_key, self.core_group_id),
200 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500201 returnValue(res.items)
202
203 @inlineCallbacks
204 def enable_port(self, device_id, port_id):
205 req = LogicalPortId(
206 id=device_id,
207 port_id=port_id
208 )
209 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400210 self.grpc_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
211 metadata=((self.core_group_id_key, self.core_group_id),
212 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500213 returnValue(res)
214
215 @inlineCallbacks
216 def disable_port(self, device_id, port_id):
217 req = LogicalPortId(
218 id=device_id,
219 port_id=port_id
220 )
221 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400222 self.grpc_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
223 metadata=((self.core_group_id_key, self.core_group_id),
224 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500225 returnValue(res)
226
227 @inlineCallbacks
228 def get_device_info(self, device_id):
229 req = ID(id=device_id)
230 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400231 self.grpc_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
232 metadata=((self.core_group_id_key, self.core_group_id),
233 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500234 returnValue(res)
235
236 @inlineCallbacks
237 def update_flow_table(self, device_id, flow_mod):
238 req = FlowTableUpdate(
239 id=device_id,
240 flow_mod=flow_mod
241 )
242 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400243 self.grpc_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
244 metadata=((self.core_group_id_key, self.core_group_id),
245 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500246 returnValue(res)
247
248 @inlineCallbacks
249 def update_group_table(self, device_id, group_mod):
250 req = FlowGroupTableUpdate(
251 id=device_id,
252 group_mod=group_mod
253 )
254 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400255 self.grpc_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
256 metadata=((self.core_group_id_key, self.core_group_id),
257 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500258 returnValue(res)
259
260 @inlineCallbacks
261 def list_flows(self, device_id):
262 req = ID(id=device_id)
263 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400264 self.grpc_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
265 metadata=((self.core_group_id_key, self.core_group_id),
266 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500267 returnValue(res.items)
268
269 @inlineCallbacks
270 def list_groups(self, device_id):
271 req = ID(id=device_id)
272 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400273 self.grpc_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
274 metadata=((self.core_group_id_key, self.core_group_id),
275 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500276 returnValue(res.items)
277
278 @inlineCallbacks
279 def list_ports(self, device_id):
280 req = ID(id=device_id)
281 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400282 self.grpc_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
283 metadata=((self.core_group_id_key, self.core_group_id),
284 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500285 returnValue(res.items)
286
287 @inlineCallbacks
288 def list_logical_devices(self):
289 res = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400290 self.grpc_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
291 metadata=((self.core_group_id_key, self.core_group_id),
292 self.get_core_transaction_metadata(),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500293 returnValue(res.items)
294
295 @inlineCallbacks
296 def subscribe(self, subscriber):
Richard Jankowski2755adf2019-01-17 17:16:48 -0500297 res, call = yield threads.deferToThread(
khenaidoo43aa6bd2019-05-29 13:35:13 -0400298 self.grpc_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
299 metadata=((self.core_group_id_key, self.core_group_id),
300 self.get_core_transaction_metadata(),))
Richard Jankowski2755adf2019-01-17 17:16:48 -0500301 returned_metadata = call.initial_metadata()
302
303 # Update the core_group_id if present in the returned metadata
304 if returned_metadata is None:
305 log.debug('header-metadata-missing')
306 else:
307 log.debug('metadata-returned', metadata=returned_metadata)
308 for pair in returned_metadata:
Richard Jankowski46464e92019-03-05 11:53:55 -0500309 if pair[0] == self.core_group_id_key:
Richard Jankowski2755adf2019-01-17 17:16:48 -0500310 self.core_group_id = pair[1]
Richard Jankowski46464e92019-03-05 11:53:55 -0500311 log.debug('core-binding', core_group=self.core_group_id)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500312 returnValue(res)