blob: b58612f2eb4d50c600b7b321746c4319ad62b592 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
20from Queue import Queue, Empty
21import os
22
23from grpc import StatusCode
24from grpc._channel import _Rendezvous
25from structlog import get_logger
26from twisted.internet import reactor
27from twisted.internet import threads
28from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
29
30from protos.voltha_pb2 import ID, VolthaServiceStub, FlowTableUpdate, \
31 FlowGroupTableUpdate, PacketOut
32from protos.logical_device_pb2 import LogicalPortId
33from google.protobuf import empty_pb2
34
35
36log = get_logger()
37
38
39class GrpcClient(object):
40
41 def __init__(self, connection_manager, channel, grpc_timeout):
42
43 self.connection_manager = connection_manager
44 self.channel = channel
45 self.grpc_timeout = grpc_timeout
46 self.local_stub = VolthaServiceStub(channel)
47
Richard Jankowski2755adf2019-01-17 17:16:48 -050048 # This is the vcore group to which an OFAgent is bound.
49 # It is the affinity router that forwards all OFAgent
50 # requests to the primary vcore in the group.
51 self.core_group_id = ''
52 self.CORE_GROUP_ID = 'voltha_backend_name'
53
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050054 self.stopped = False
55
56 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
57 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
58 self.change_event_queue = DeferredQueue() # queue change events
59
60 def start(self):
61 log.debug('starting', grpc_timeout=self.grpc_timeout)
62 self.start_packet_out_stream()
63 self.start_packet_in_stream()
64 self.start_change_event_in_stream()
65 reactor.callLater(0, self.packet_in_forwarder_loop)
66 reactor.callLater(0, self.change_event_processing_loop)
67 log.info('started')
68 return self
69
70 def stop(self):
71 log.debug('stopping')
72 self.stopped = True
73 log.info('stopped')
74
75 def start_packet_out_stream(self):
76
77 def packet_generator():
78 while 1:
79 try:
80 packet = self.packet_out_queue.get(block=True, timeout=1.0)
81 except Empty:
82 if self.stopped:
83 return
84 else:
85 yield packet
86
87 def stream_packets_out():
88 generator = packet_generator()
89 try:
Richard Jankowski2755adf2019-01-17 17:16:48 -050090 self.local_stub.StreamPacketsOut(generator,
91 metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050092 except _Rendezvous, e:
93 log.error('grpc-exception', status=e.code())
94 if e.code() == StatusCode.UNAVAILABLE:
95 os.system("kill -15 {}".format(os.getpid()))
96
97 reactor.callInThread(stream_packets_out)
98
99 def start_packet_in_stream(self):
100
101 def receive_packet_in_stream():
102 streaming_rpc_method = self.local_stub.ReceivePacketsIn
103 iterator = streaming_rpc_method(empty_pb2.Empty())
104 try:
105 for packet_in in iterator:
106 reactor.callFromThread(self.packet_in_queue.put,
107 packet_in)
108 log.debug('enqued-packet-in',
109 packet_in=packet_in,
110 queue_len=len(self.packet_in_queue.pending))
111 except _Rendezvous, e:
112 log.error('grpc-exception', status=e.code())
113 if e.code() == StatusCode.UNAVAILABLE:
114 os.system("kill -15 {}".format(os.getpid()))
115
116 reactor.callInThread(receive_packet_in_stream)
117
118 def start_change_event_in_stream(self):
119
120 def receive_change_events():
121 streaming_rpc_method = self.local_stub.ReceiveChangeEvents
122 iterator = streaming_rpc_method(empty_pb2.Empty())
123 try:
124 for event in iterator:
125 reactor.callFromThread(self.change_event_queue.put, event)
126 log.debug('enqued-change-event',
127 change_event=event,
128 queue_len=len(self.change_event_queue.pending))
129 except _Rendezvous, e:
130 log.error('grpc-exception', status=e.code())
131 if e.code() == StatusCode.UNAVAILABLE:
132 os.system("kill -15 {}".format(os.getpid()))
133
134 reactor.callInThread(receive_change_events)
135
136 @inlineCallbacks
137 def change_event_processing_loop(self):
138 while True:
139 try:
140 event = yield self.change_event_queue.get()
141 device_id = event.id
142 self.connection_manager.forward_change_event(device_id, event)
143 except Exception, e:
144 log.exception('failed-in-packet-in-handler', e=e)
145 if self.stopped:
146 break
147
148 @inlineCallbacks
149 def packet_in_forwarder_loop(self):
150 while True:
151 packet_in = yield self.packet_in_queue.get()
152 device_id = packet_in.id
153 ofp_packet_in = packet_in.packet_in
154 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
155 if self.stopped:
156 break
157
158 def send_packet_out(self, device_id, packet_out):
159 packet_out = PacketOut(id=device_id, packet_out=packet_out)
160 self.packet_out_queue.put(packet_out)
161
162 @inlineCallbacks
163 def get_port(self, device_id, port_id):
164 req = LogicalPortId(id=device_id, port_id=port_id)
165 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500166 self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
167 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500168 returnValue(res)
169
170 @inlineCallbacks
171 def get_port_list(self, device_id):
172 req = ID(id=device_id)
173 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500174 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
175 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500176 returnValue(res.items)
177
178 @inlineCallbacks
179 def enable_port(self, device_id, port_id):
180 req = LogicalPortId(
181 id=device_id,
182 port_id=port_id
183 )
184 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500185 self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
186 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500187 returnValue(res)
188
189 @inlineCallbacks
190 def disable_port(self, device_id, port_id):
191 req = LogicalPortId(
192 id=device_id,
193 port_id=port_id
194 )
195 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500196 self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
197 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500198 returnValue(res)
199
200 @inlineCallbacks
201 def get_device_info(self, device_id):
202 req = ID(id=device_id)
203 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500204 self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
205 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500206 returnValue(res)
207
208 @inlineCallbacks
209 def update_flow_table(self, device_id, flow_mod):
210 req = FlowTableUpdate(
211 id=device_id,
212 flow_mod=flow_mod
213 )
214 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500215 self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
216 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500217 returnValue(res)
218
219 @inlineCallbacks
220 def update_group_table(self, device_id, group_mod):
221 req = FlowGroupTableUpdate(
222 id=device_id,
223 group_mod=group_mod
224 )
225 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500226 self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
227 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500228 returnValue(res)
229
230 @inlineCallbacks
231 def list_flows(self, device_id):
232 req = ID(id=device_id)
233 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500234 self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
235 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500236 returnValue(res.items)
237
238 @inlineCallbacks
239 def list_groups(self, device_id):
240 req = ID(id=device_id)
241 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500242 self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
243 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500244 returnValue(res.items)
245
246 @inlineCallbacks
247 def list_ports(self, device_id):
248 req = ID(id=device_id)
249 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500250 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
251 metadata=((self.CORE_GROUP_ID, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500252 returnValue(res.items)
253
254 @inlineCallbacks
255 def list_logical_devices(self):
256 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500257 self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
258 metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500259 returnValue(res.items)
260
261 @inlineCallbacks
262 def subscribe(self, subscriber):
Richard Jankowski2755adf2019-01-17 17:16:48 -0500263 res, call = yield threads.deferToThread(
264 self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
265 metadata=((self.CORE_GROUP_ID, self.core_group_id), ))
266 returned_metadata = call.initial_metadata()
267
268 # Update the core_group_id if present in the returned metadata
269 if returned_metadata is None:
270 log.debug('header-metadata-missing')
271 else:
272 log.debug('metadata-returned', metadata=returned_metadata)
273 for pair in returned_metadata:
274 if pair[0] == self.CORE_GROUP_ID:
275 self.core_group_id = pair[1]
276 log.debug('received-core-group-id', vcore_group=self.core_group_id)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500277 returnValue(res)