blob: 4a6d274daccff05741674a63e9d27366f3812252 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
20from Queue import Queue, Empty
21import os
22
23from grpc import StatusCode
24from grpc._channel import _Rendezvous
25from structlog import get_logger
26from twisted.internet import reactor
27from twisted.internet import threads
28from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
29
30from protos.voltha_pb2 import ID, VolthaServiceStub, FlowTableUpdate, \
31 FlowGroupTableUpdate, PacketOut
32from protos.logical_device_pb2 import LogicalPortId
33from google.protobuf import empty_pb2
34
35
36log = get_logger()
37
38
39class GrpcClient(object):
40
Richard Jankowski46464e92019-03-05 11:53:55 -050041 def __init__(self, connection_manager, channel, grpc_timeout, core_binding_key):
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050042
43 self.connection_manager = connection_manager
44 self.channel = channel
45 self.grpc_timeout = grpc_timeout
46 self.local_stub = VolthaServiceStub(channel)
47
Richard Jankowski46464e92019-03-05 11:53:55 -050048 # This is the rw-core cluster to which an OFAgent is bound.
Richard Jankowski2755adf2019-01-17 17:16:48 -050049 # It is the affinity router that forwards all OFAgent
Richard Jankowski46464e92019-03-05 11:53:55 -050050 # requests to a specific rw-core in this back-end cluster.
Richard Jankowski2755adf2019-01-17 17:16:48 -050051 self.core_group_id = ''
Richard Jankowski46464e92019-03-05 11:53:55 -050052 self.core_group_id_key = core_binding_key
Richard Jankowski2755adf2019-01-17 17:16:48 -050053
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050054 self.stopped = False
55
56 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
57 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
58 self.change_event_queue = DeferredQueue() # queue change events
59
60 def start(self):
Richard Jankowski46464e92019-03-05 11:53:55 -050061 log.debug('starting', grpc_timeout=self.grpc_timeout,
62 core_binding_key=self.core_group_id_key)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050063 self.start_packet_out_stream()
64 self.start_packet_in_stream()
65 self.start_change_event_in_stream()
66 reactor.callLater(0, self.packet_in_forwarder_loop)
67 reactor.callLater(0, self.change_event_processing_loop)
68 log.info('started')
69 return self
70
71 def stop(self):
72 log.debug('stopping')
73 self.stopped = True
74 log.info('stopped')
75
76 def start_packet_out_stream(self):
77
78 def packet_generator():
79 while 1:
80 try:
81 packet = self.packet_out_queue.get(block=True, timeout=1.0)
82 except Empty:
83 if self.stopped:
84 return
85 else:
86 yield packet
87
88 def stream_packets_out():
89 generator = packet_generator()
90 try:
Richard Jankowski2755adf2019-01-17 17:16:48 -050091 self.local_stub.StreamPacketsOut(generator,
Richard Jankowski46464e92019-03-05 11:53:55 -050092 metadata=((self.core_group_id_key, self.core_group_id), ))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050093 except _Rendezvous, e:
94 log.error('grpc-exception', status=e.code())
95 if e.code() == StatusCode.UNAVAILABLE:
96 os.system("kill -15 {}".format(os.getpid()))
97
98 reactor.callInThread(stream_packets_out)
99
100 def start_packet_in_stream(self):
101
102 def receive_packet_in_stream():
103 streaming_rpc_method = self.local_stub.ReceivePacketsIn
Richard Jankowskidec93172019-02-12 14:59:28 -0500104 iterator = streaming_rpc_method(empty_pb2.Empty(),
Richard Jankowski46464e92019-03-05 11:53:55 -0500105 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500106 try:
107 for packet_in in iterator:
108 reactor.callFromThread(self.packet_in_queue.put,
109 packet_in)
Richard Jankowskidec93172019-02-12 14:59:28 -0500110 log.debug('enqueued-packet-in',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500111 packet_in=packet_in,
112 queue_len=len(self.packet_in_queue.pending))
113 except _Rendezvous, e:
114 log.error('grpc-exception', status=e.code())
115 if e.code() == StatusCode.UNAVAILABLE:
116 os.system("kill -15 {}".format(os.getpid()))
117
118 reactor.callInThread(receive_packet_in_stream)
119
120 def start_change_event_in_stream(self):
121
122 def receive_change_events():
123 streaming_rpc_method = self.local_stub.ReceiveChangeEvents
Richard Jankowskidec93172019-02-12 14:59:28 -0500124 iterator = streaming_rpc_method(empty_pb2.Empty(),
Richard Jankowski46464e92019-03-05 11:53:55 -0500125 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500126 try:
127 for event in iterator:
128 reactor.callFromThread(self.change_event_queue.put, event)
Richard Jankowskidec93172019-02-12 14:59:28 -0500129 log.debug('enqueued-change-event',
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500130 change_event=event,
131 queue_len=len(self.change_event_queue.pending))
132 except _Rendezvous, e:
133 log.error('grpc-exception', status=e.code())
134 if e.code() == StatusCode.UNAVAILABLE:
135 os.system("kill -15 {}".format(os.getpid()))
136
137 reactor.callInThread(receive_change_events)
138
139 @inlineCallbacks
140 def change_event_processing_loop(self):
141 while True:
142 try:
143 event = yield self.change_event_queue.get()
144 device_id = event.id
145 self.connection_manager.forward_change_event(device_id, event)
146 except Exception, e:
147 log.exception('failed-in-packet-in-handler', e=e)
148 if self.stopped:
149 break
150
151 @inlineCallbacks
152 def packet_in_forwarder_loop(self):
153 while True:
154 packet_in = yield self.packet_in_queue.get()
155 device_id = packet_in.id
156 ofp_packet_in = packet_in.packet_in
157 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
158 if self.stopped:
159 break
160
161 def send_packet_out(self, device_id, packet_out):
162 packet_out = PacketOut(id=device_id, packet_out=packet_out)
163 self.packet_out_queue.put(packet_out)
164
165 @inlineCallbacks
166 def get_port(self, device_id, port_id):
167 req = LogicalPortId(id=device_id, port_id=port_id)
168 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500169 self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500170 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500171 returnValue(res)
172
173 @inlineCallbacks
174 def get_port_list(self, device_id):
175 req = ID(id=device_id)
176 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500177 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500178 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500179 returnValue(res.items)
180
181 @inlineCallbacks
182 def enable_port(self, device_id, port_id):
183 req = LogicalPortId(
184 id=device_id,
185 port_id=port_id
186 )
187 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500188 self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500189 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500190 returnValue(res)
191
192 @inlineCallbacks
193 def disable_port(self, device_id, port_id):
194 req = LogicalPortId(
195 id=device_id,
196 port_id=port_id
197 )
198 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500199 self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500200 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500201 returnValue(res)
202
203 @inlineCallbacks
204 def get_device_info(self, device_id):
205 req = ID(id=device_id)
206 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500207 self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500208 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500209 returnValue(res)
210
211 @inlineCallbacks
212 def update_flow_table(self, device_id, flow_mod):
213 req = FlowTableUpdate(
214 id=device_id,
215 flow_mod=flow_mod
216 )
217 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500218 self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500219 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500220 returnValue(res)
221
222 @inlineCallbacks
223 def update_group_table(self, device_id, group_mod):
224 req = FlowGroupTableUpdate(
225 id=device_id,
226 group_mod=group_mod
227 )
228 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500229 self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500230 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500231 returnValue(res)
232
233 @inlineCallbacks
234 def list_flows(self, device_id):
235 req = ID(id=device_id)
236 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500237 self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500238 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500239 returnValue(res.items)
240
241 @inlineCallbacks
242 def list_groups(self, device_id):
243 req = ID(id=device_id)
244 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500245 self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500246 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500247 returnValue(res.items)
248
249 @inlineCallbacks
250 def list_ports(self, device_id):
251 req = ID(id=device_id)
252 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500253 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500254 metadata=((self.core_group_id_key, self.core_group_id),))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500255 returnValue(res.items)
256
257 @inlineCallbacks
258 def list_logical_devices(self):
259 res = yield threads.deferToThread(
Richard Jankowski2755adf2019-01-17 17:16:48 -0500260 self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500261 metadata=((self.core_group_id_key, self.core_group_id), ))
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500262 returnValue(res.items)
263
264 @inlineCallbacks
265 def subscribe(self, subscriber):
Richard Jankowski2755adf2019-01-17 17:16:48 -0500266 res, call = yield threads.deferToThread(
267 self.local_stub.Subscribe.with_call, subscriber, timeout=self.grpc_timeout,
Richard Jankowski46464e92019-03-05 11:53:55 -0500268 metadata=((self.core_group_id_key, self.core_group_id), ))
Richard Jankowski2755adf2019-01-17 17:16:48 -0500269 returned_metadata = call.initial_metadata()
270
271 # Update the core_group_id if present in the returned metadata
272 if returned_metadata is None:
273 log.debug('header-metadata-missing')
274 else:
275 log.debug('metadata-returned', metadata=returned_metadata)
276 for pair in returned_metadata:
Richard Jankowski46464e92019-03-05 11:53:55 -0500277 if pair[0] == self.core_group_id_key:
Richard Jankowski2755adf2019-01-17 17:16:48 -0500278 self.core_group_id = pair[1]
Richard Jankowski46464e92019-03-05 11:53:55 -0500279 log.debug('core-binding', core_group=self.core_group_id)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500280 returnValue(res)