blob: 096d56eacb44c9af78814107b90d490639c2ca42 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
20from Queue import Queue, Empty
21import os
22
23from grpc import StatusCode
24from grpc._channel import _Rendezvous
25from structlog import get_logger
26from twisted.internet import reactor
27from twisted.internet import threads
28from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
29
William Kurkianfc0dcda2019-04-08 16:54:36 -040030from voltha_protos.voltha_pb2_grpc import VolthaServiceStub
31from voltha_protos.voltha_pb2 import ID, FlowTableUpdate, \
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050032 FlowGroupTableUpdate, PacketOut
William Kurkianfc0dcda2019-04-08 16:54:36 -040033from voltha_protos.logical_device_pb2 import LogicalPortId
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050034from google.protobuf import empty_pb2
35
36
37log = get_logger()
38
39
40class GrpcClient(object):
41
Richard Jankowski46464e92019-03-05 11:53:55 -050042 def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key):
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050043
44 self.connection_manager = connection_manager
45 self.channel = channel
46 self.grpc_timeout = grpc_timeout
47 self.local_stub = VolthaServiceStub(channel)
48
Richard Jankowski46464e92019-03-05 11:53:55 -050049 # This is the rw-core cluster to which an OFAgent is bound.
Richard Jankowski2755adf2019-01-17 17:16:48 -050050 # It is the affinity router that forwards all OFAgent
Richard Jankowski46464e92019-03-05 11:53:55 -050051 # requests to a specific rw-core in this back-end cluster.
Richard Jankowski2755adf2019-01-17 17:16:48 -050052 self.core_group_id = ''
Richard Jankowski46464e92019-03-05 11:53:55 -050053 self.core_group_id_key = core_binding_key
Richard Jankowski2755adf2019-01-17 17:16:48 -050054
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050055 self.stopped = False
56
57 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
58 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
59 self.change_event_queue = DeferredQueue() # queue change events
60
61 def start(self):
Richard Jankowski46464e92019-03-05 11:53:55 -050062 log.debug('starting', grpc_timeout=self.grpc_timeout,
63 core_binding_key=self.core_group_id_key)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050064 self.start_packet_out_stream()
65 self.start_packet_in_stream()
66 self.start_change_event_in_stream()
67 reactor.callLater(0, self.packet_in_forwarder_loop)
68 reactor.callLater(0, self.change_event_processing_loop)
69 log.info('started')
70 return self
71
72 def stop(self):
73 log.debug('stopping')
74 self.stopped = True
75 log.info('stopped')
76
77 def start_packet_out_stream(self):
78
79 def packet_generator():
80 while 1:
81 try:
82 packet = self.packet_out_queue.get(block=True, timeout=1.0)
83 except Empty:
84 if self.stopped:
85 return
86 else:
87 yield packet
88
89 def stream_packets_out():
90 generator = packet_generator()
91 try:
Richard Jankowski2755adf2019-01-17 17:16:48 -050092 self.local_stub.StreamPacketsOut(generator,
Richard Jankowski46464e92019-03-05 11:53:55 -050093 metadata=((self.core_group_id_key, self.core_group_id), ))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050094 except _Rendezvous, e:
95 log.error('grpc-exception', status=e.code())
96 if e.code() == StatusCode.UNAVAILABLE:
97 os.system("kill -15 {}".format(os.getpid()))
98
99 reactor.callInThread(stream_packets_out)
100
101 def start_packet_in_stream(self):
102
103 def receive_packet_in_stream():
104 streaming_rpc_method = self.local_stub.ReceivePacketsIn
Richard Jankowskidec93172019-02-12 14:59:28 -0500105 iterator = streaming_rpc_method(empty_pb2.Empty(),
Richard Jankowski46464e92019-03-05 11:53:55 -0500106 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500107 try:
108 for packet_in in iterator:
109 reactor.callFromThread(self.packet_in_queue.put,
110 packet_in)
Richard Jankowskidec93172019-02-12 14:59:28 -0500111 log.debug('enqueued-packet-in',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500112 packet_in=packet_in,
113 queue_len=len(self.packet_in_queue.pending))
114 except _Rendezvous, e:
115 log.error('grpc-exception', status=e.code())
116 if e.code() == StatusCode.UNAVAILABLE:
117 os.system("kill -15 {}".format(os.getpid()))
118
119 reactor.callInThread(receive_packet_in_stream)
120
121 def start_change_event_in_stream(self):
122
123 def receive_change_events():
124 streaming_rpc_method = self.local_stub.ReceiveChangeEvents
Richard Jankowskidec93172019-02-12 14:59:28 -0500125 iterator = streaming_rpc_method(empty_pb2.Empty(),
Richard Jankowski46464e92019-03-05 11:53:55 -0500126 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500127 try:
128 for event in iterator:
129 reactor.callFromThread(self.change_event_queue.put, event)
Richard Jankowskidec93172019-02-12 14:59:28 -0500130 log.debug('enqueued-change-event',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500131 change_event=event,
132 queue_len=len(self.change_event_queue.pending))
133 except _Rendezvous, e:
134 log.error('grpc-exception', status=e.code())
135 if e.code() == StatusCode.UNAVAILABLE:
136 os.system("kill -15 {}".format(os.getpid()))
137
138 reactor.callInThread(receive_change_events)
139
140 @inlineCallbacks
141 def change_event_processing_loop(self):
142 while True:
143 try:
144 event = yield self.change_event_queue.get()
145 device_id = event.id
146 self.connection_manager.forward_change_event(device_id, event)
147 except Exception, e:
148 log.exception('failed-in-packet-in-handler', e=e)
149 if self.stopped:
150 break
151
152 @inlineCallbacks
153 def packet_in_forwarder_loop(self):
154 while True:
155 packet_in = yield self.packet_in_queue.get()
156 device_id = packet_in.id
157 ofp_packet_in = packet_in.packet_in
158 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
159 if self.stopped:
160 break
161
162 def send_packet_out(self, device_id, packet_out):
163 packet_out = PacketOut(id=device_id, packet_out=packet_out)
164 self.packet_out_queue.put(packet_out)
165
166 @inlineCallbacks
167 def get_port(self, device_id, port_id):
168 req = LogicalPortId(id=device_id, port_id=port_id)
169 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500170 self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500171 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500172 returnValue(res)
173
174 @inlineCallbacks
175 def get_port_list(self, device_id):
176 req = ID(id=device_id)
177 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500178 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500179 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500180 returnValue(res.items)
181
182 @inlineCallbacks
183 def enable_port(self, device_id, port_id):
184 req = LogicalPortId(
185 id=device_id,
186 port_id=port_id
187 )
188 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500189 self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500190 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500191 returnValue(res)
192
193 @inlineCallbacks
194 def disable_port(self, device_id, port_id):
195 req = LogicalPortId(
196 id=device_id,
197 port_id=port_id
198 )
199 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500200 self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500201 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500202 returnValue(res)
203
204 @inlineCallbacks
205 def get_device_info(self, device_id):
206 req = ID(id=device_id)
207 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500208 self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500209 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500210 returnValue(res)
211
212 @inlineCallbacks
213 def update_flow_table(self, device_id, flow_mod):
214 req = FlowTableUpdate(
215 id=device_id,
216 flow_mod=flow_mod
217 )
218 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500219 self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500220 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500221 returnValue(res)
222
223 @inlineCallbacks
224 def update_group_table(self, device_id, group_mod):
225 req = FlowGroupTableUpdate(
226 id=device_id,
227 group_mod=group_mod
228 )
229 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500230 self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500231 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500232 returnValue(res)
233
234 @inlineCallbacks
235 def list_flows(self, device_id):
236 req = ID(id=device_id)
237 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500238 self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500239 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500240 returnValue(res.items)
241
242 @inlineCallbacks
243 def list_groups(self, device_id):
244 req = ID(id=device_id)
245 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500246 self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500247 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500248 returnValue(res.items)
249
250 @inlineCallbacks
251 def list_ports(self, device_id):
252 req = ID(id=device_id)
253 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500254 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500255 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500256 returnValue(res.items)
257
258 @inlineCallbacks
259 def list_logical_devices(self):
260 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500261 self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500262 metadata=((self.core_group_id_key, self.core_group_id), ))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500263 returnValue(res.items)
264
265 @inlineCallbacks
266 def subscribe(self, subscriber):
Richard Jankowski2755adf2019-01-17 17:16:48 -0500267 res, call = yield threads.deferToThread(
268 self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500269 metadata=((self.core_group_id_key, self.core_group_id), ))
Richard Jankowski2755adf2019-01-17 17:16:48 -0500270 returned_metadata = call.initial_metadata()
271
272 # Update the core_group_id if present in the returned metadata
273 if returned_metadata is None:
274 log.debug('header-metadata-missing')
275 else:
276 log.debug('metadata-returned', metadata=returned_metadata)
277 for pair in returned_metadata:
Richard Jankowski46464e92019-03-05 11:53:55 -0500278 if pair[0] == self.core_group_id_key:
Richard Jankowski2755adf2019-01-17 17:16:48 -0500279 self.core_group_id = pair[1]
Richard Jankowski46464e92019-03-05 11:53:55 -0500280 log.debug('core-binding', core_group=self.core_group_id)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500281 returnValue(res)