blob: 76b052eaf55b1ece32e4441f43949387c918bca2 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
20from Queue import Queue, Empty
21import os
22
23from grpc import StatusCode
24from grpc._channel import _Rendezvous
25from structlog import get_logger
26from twisted.internet import reactor
27from twisted.internet import threads
28from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
29
30from protos.voltha_pb2 import ID, VolthaServiceStub, FlowTableUpdate, \
31 FlowGroupTableUpdate, PacketOut
32from protos.logical_device_pb2 import LogicalPortId
33from google.protobuf import empty_pb2
34
35
36log = get_logger()
37
38
39class GrpcClient(object):
40
41 def __init__(self, connection_manager, channel, grpc_timeout):
42
43 self.connection_manager = connection_manager
44 self.channel = channel
45 self.grpc_timeout = grpc_timeout
46 self.local_stub = VolthaServiceStub(channel)
47
Richard Jankowski2755adf2019-01-17 17:16:48 -050048 # This is the vcore group to which an OFAgent is bound.
49 # It is the affinity router that forwards all OFAgent
50 # requests to the primary vcore in the group.
51 self.core_group_id = ''
52 self.CORE_GROUP_ID = 'voltha_backend_name'
53
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050054 self.stopped = False
55
56 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
57 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
58 self.change_event_queue = DeferredQueue() # queue change events
59
60 def start(self):
61 log.debug('starting', grpc_timeout=self.grpc_timeout)
62 self.start_packet_out_stream()
63 self.start_packet_in_stream()
64 self.start_change_event_in_stream()
65 reactor.callLater(0, self.packet_in_forwarder_loop)
66 reactor.callLater(0, self.change_event_processing_loop)
67 log.info('started')
68 return self
69
70 def stop(self):
71 log.debug('stopping')
72 self.stopped = True
73 log.info('stopped')
74
75 def start_packet_out_stream(self):
76
77 def packet_generator():
78 while 1:
79 try:
80 packet = self.packet_out_queue.get(block=True, timeout=1.0)
81 except Empty:
82 if self.stopped:
83 return
84 else:
85 yield packet
86
87 def stream_packets_out():
88 generator = packet_generator()
89 try:
Richard Jankowski2755adf2019-01-17 17:16:48 -050090 self.local_stub.StreamPacketsOut(generator,
91 metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050092 except _Rendezvous, e:
93 log.error('grpc-exception', status=e.code())
94 if e.code() == StatusCode.UNAVAILABLE:
95 os.system("kill -15 {}".format(os.getpid()))
96
97 reactor.callInThread(stream_packets_out)
98
99 def start_packet_in_stream(self):
100
101 def receive_packet_in_stream():
102 streaming_rpc_method = self.local_stub.ReceivePacketsIn
Richard Jankowskidec93172019-02-12 14:59:28 -0500103 iterator = streaming_rpc_method(empty_pb2.Empty(),
104 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500105 try:
106 for packet_in in iterator:
107 reactor.callFromThread(self.packet_in_queue.put,
108 packet_in)
Richard Jankowskidec93172019-02-12 14:59:28 -0500109 log.debug('enqueued-packet-in',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500110 packet_in=packet_in,
111 queue_len=len(self.packet_in_queue.pending))
112 except _Rendezvous, e:
113 log.error('grpc-exception', status=e.code())
114 if e.code() == StatusCode.UNAVAILABLE:
115 os.system("kill -15 {}".format(os.getpid()))
116
117 reactor.callInThread(receive_packet_in_stream)
118
119 def start_change_event_in_stream(self):
120
121 def receive_change_events():
122 streaming_rpc_method = self.local_stub.ReceiveChangeEvents
Richard Jankowskidec93172019-02-12 14:59:28 -0500123 iterator = streaming_rpc_method(empty_pb2.Empty(),
124 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500125 try:
126 for event in iterator:
127 reactor.callFromThread(self.change_event_queue.put, event)
Richard Jankowskidec93172019-02-12 14:59:28 -0500128 log.debug('enqueued-change-event',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500129 change_event=event,
130 queue_len=len(self.change_event_queue.pending))
131 except _Rendezvous, e:
132 log.error('grpc-exception', status=e.code())
133 if e.code() == StatusCode.UNAVAILABLE:
134 os.system("kill -15 {}".format(os.getpid()))
135
136 reactor.callInThread(receive_change_events)
137
138 @inlineCallbacks
139 def change_event_processing_loop(self):
140 while True:
141 try:
142 event = yield self.change_event_queue.get()
143 device_id = event.id
144 self.connection_manager.forward_change_event(device_id, event)
145 except Exception, e:
146 log.exception('failed-in-packet-in-handler', e=e)
147 if self.stopped:
148 break
149
150 @inlineCallbacks
151 def packet_in_forwarder_loop(self):
152 while True:
153 packet_in = yield self.packet_in_queue.get()
154 device_id = packet_in.id
155 ofp_packet_in = packet_in.packet_in
156 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
157 if self.stopped:
158 break
159
160 def send_packet_out(self, device_id, packet_out):
161 packet_out = PacketOut(id=device_id, packet_out=packet_out)
162 self.packet_out_queue.put(packet_out)
163
164 @inlineCallbacks
165 def get_port(self, device_id, port_id):
166 req = LogicalPortId(id=device_id, port_id=port_id)
167 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500168 self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
169 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500170 returnValue(res)
171
172 @inlineCallbacks
173 def get_port_list(self, device_id):
174 req = ID(id=device_id)
175 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500176 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
177 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500178 returnValue(res.items)
179
180 @inlineCallbacks
181 def enable_port(self, device_id, port_id):
182 req = LogicalPortId(
183 id=device_id,
184 port_id=port_id
185 )
186 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500187 self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
188 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500189 returnValue(res)
190
191 @inlineCallbacks
192 def disable_port(self, device_id, port_id):
193 req = LogicalPortId(
194 id=device_id,
195 port_id=port_id
196 )
197 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500198 self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
199 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500200 returnValue(res)
201
202 @inlineCallbacks
203 def get_device_info(self, device_id):
204 req = ID(id=device_id)
205 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500206 self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
207 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500208 returnValue(res)
209
210 @inlineCallbacks
211 def update_flow_table(self, device_id, flow_mod):
212 req = FlowTableUpdate(
213 id=device_id,
214 flow_mod=flow_mod
215 )
216 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500217 self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
218 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500219 returnValue(res)
220
221 @inlineCallbacks
222 def update_group_table(self, device_id, group_mod):
223 req = FlowGroupTableUpdate(
224 id=device_id,
225 group_mod=group_mod
226 )
227 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500228 self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
229 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500230 returnValue(res)
231
232 @inlineCallbacks
233 def list_flows(self, device_id):
234 req = ID(id=device_id)
235 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500236 self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
237 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500238 returnValue(res.items)
239
240 @inlineCallbacks
241 def list_groups(self, device_id):
242 req = ID(id=device_id)
243 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500244 self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
245 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500246 returnValue(res.items)
247
248 @inlineCallbacks
249 def list_ports(self, device_id):
250 req = ID(id=device_id)
251 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500252 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
253 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500254 returnValue(res.items)
255
256 @inlineCallbacks
257 def list_logical_devices(self):
258 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500259 self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
260 metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500261 returnValue(res.items)
262
263 @inlineCallbacks
264 def subscribe(self, subscriber):
Richard Jankowski2755adf2019-01-17 17:16:48 -0500265 res, call = yield threads.deferToThread(
266 self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
267 metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
268 returned_metadata = call.initial_metadata()
269
270 # Update the core_group_id if present in the returned metadata
271 if returned_metadata is None:
272 log.debug('header-metadata-missing')
273 else:
274 log.debug('metadata-returned', metadata=returned_metadata)
275 for pair in returned_metadata:
276 if pair[0] == self.CORE_GROUP_ID:
277 self.core_group_id = pair[1]
278 log.debug('received-core-group-id', vcore_group=self.core_group_id)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500279 returnValue(res)