blob: ab9417b3188eb7b601ba4eded75a055b027471c4 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18The gRPC client layer for the OpenFlow agent
19"""
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070020from Queue import Queue, Empty
alshabib06b449c2017-01-15 17:33:16 -060021import os
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070022
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070023from grpc import StatusCode
24from grpc._channel import _Rendezvous
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070025from structlog import get_logger
26from twisted.internet import reactor
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070027from twisted.internet import threads
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070028from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070029
Zsolt Haraszti66862032016-11-28 14:28:39 -080030from protos.voltha_pb2 import ID, VolthaLocalServiceStub, FlowTableUpdate, \
Jonathan Hart398e4072018-05-30 16:54:00 -070031 FlowGroupTableUpdate, PacketOut
32from protos.logical_device_pb2 import LogicalPortId
Zsolt Haraszti7eeb2b32016-11-06 14:04:55 -080033from google.protobuf import empty_pb2
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070034
35
36log = get_logger()
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070037
38
39class GrpcClient(object):
40
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040041 def __init__(self, connection_manager, channel, grpc_timeout):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070042
43 self.connection_manager = connection_manager
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070044 self.channel = channel
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040045 self.grpc_timeout = grpc_timeout
Zsolt Haraszti66862032016-11-28 14:28:39 -080046 self.local_stub = VolthaLocalServiceStub(channel)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070047
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070048 self.stopped = False
49
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070050 self.packet_out_queue = Queue() # queue to send out PacketOut msgs
51 self.packet_in_queue = DeferredQueue() # queue to receive PacketIn
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080052 self.change_event_queue = DeferredQueue() # queue change events
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070053
54 def start(self):
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040055 log.debug('starting', grpc_timeout=self.grpc_timeout)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070056 self.start_packet_out_stream()
57 self.start_packet_in_stream()
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080058 self.start_change_event_in_stream()
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070059 reactor.callLater(0, self.packet_in_forwarder_loop)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080060 reactor.callLater(0, self.change_event_processing_loop)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070061 log.info('started')
62 return self
63
64 def stop(self):
65 log.debug('stopping')
66 self.stopped = True
67 log.info('stopped')
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070068
69 def start_packet_out_stream(self):
70
71 def packet_generator():
72 while 1:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070073 try:
74 packet = self.packet_out_queue.get(block=True, timeout=1.0)
75 except Empty:
76 if self.stopped:
77 return
78 else:
79 yield packet
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070080
81 def stream_packets_out():
82 generator = packet_generator()
alshabib06b449c2017-01-15 17:33:16 -060083 try:
84 self.local_stub.StreamPacketsOut(generator)
85 except _Rendezvous, e:
Richard Jankowskidb9a86e2018-09-17 13:33:29 -040086 log.error('grpc-exception', status=e.code())
alshabib06b449c2017-01-15 17:33:16 -060087 if e.code() == StatusCode.UNAVAILABLE:
88 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070089
90 reactor.callInThread(stream_packets_out)
91
92 def start_packet_in_stream(self):
93
94 def receive_packet_in_stream():
Zsolt Haraszti66862032016-11-28 14:28:39 -080095 streaming_rpc_method = self.local_stub.ReceivePacketsIn
Zsolt Haraszti9ed54292017-01-09 18:28:32 -080096 iterator = streaming_rpc_method(empty_pb2.Empty())
alshabib06b449c2017-01-15 17:33:16 -060097 try:
98 for packet_in in iterator:
99 reactor.callFromThread(self.packet_in_queue.put,
100 packet_in)
101 log.debug('enqued-packet-in',
102 packet_in=packet_in,
103 queue_len=len(self.packet_in_queue.pending))
104 except _Rendezvous, e:
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400105 log.error('grpc-exception', status=e.code())
alshabib06b449c2017-01-15 17:33:16 -0600106 if e.code() == StatusCode.UNAVAILABLE:
107 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700108
109 reactor.callInThread(receive_packet_in_stream)
110
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800111 def start_change_event_in_stream(self):
112
113 def receive_change_events():
114 streaming_rpc_method = self.local_stub.ReceiveChangeEvents
Zsolt Haraszti9ed54292017-01-09 18:28:32 -0800115 iterator = streaming_rpc_method(empty_pb2.Empty())
alshabib06b449c2017-01-15 17:33:16 -0600116 try:
117 for event in iterator:
118 reactor.callFromThread(self.change_event_queue.put, event)
119 log.debug('enqued-change-event',
120 change_event=event,
121 queue_len=len(self.change_event_queue.pending))
122 except _Rendezvous, e:
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400123 log.error('grpc-exception', status=e.code())
alshabib06b449c2017-01-15 17:33:16 -0600124 if e.code() == StatusCode.UNAVAILABLE:
125 os.system("kill -15 {}".format(os.getpid()))
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800126
127 reactor.callInThread(receive_change_events)
128
129 @inlineCallbacks
130 def change_event_processing_loop(self):
131 while True:
132 try:
133 event = yield self.change_event_queue.get()
134 device_id = event.id
135 self.connection_manager.forward_change_event(device_id, event)
136 except Exception, e:
137 log.exception('failed-in-packet-in-handler', e=e)
138 if self.stopped:
139 break
140
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700141 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700142 def packet_in_forwarder_loop(self):
143 while True:
144 packet_in = yield self.packet_in_queue.get()
145 device_id = packet_in.id
146 ofp_packet_in = packet_in.packet_in
147 self.connection_manager.forward_packet_in(device_id, ofp_packet_in)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700148 if self.stopped:
149 break
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700150
151 def send_packet_out(self, device_id, packet_out):
152 packet_out = PacketOut(id=device_id, packet_out=packet_out)
153 self.packet_out_queue.put(packet_out)
154
155 @inlineCallbacks
Jonathan Hart8d21c322018-04-17 07:42:02 -0700156 def get_port(self, device_id, port_id):
157 req = LogicalPortId(id=device_id, port_id=port_id)
158 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400159 self.local_stub.GetLogicalDevicePort, req, timeout=self.grpc_timeout)
Jonathan Hart8d21c322018-04-17 07:42:02 -0700160 returnValue(res)
161
162 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700163 def get_port_list(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700164 req = ID(id=device_id)
165 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400166 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700167 returnValue(res.items)
168
169 @inlineCallbacks
Jonathan Hart8d21c322018-04-17 07:42:02 -0700170 def enable_port(self, device_id, port_id):
171 req = LogicalPortId(
172 id=device_id,
173 port_id=port_id
174 )
175 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400176 self.local_stub.EnableLogicalDevicePort, req, timeout=self.grpc_timeout)
Jonathan Hart8d21c322018-04-17 07:42:02 -0700177 returnValue(res)
178
179 @inlineCallbacks
180 def disable_port(self, device_id, port_id):
181 req = LogicalPortId(
182 id=device_id,
183 port_id=port_id
184 )
185 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400186 self.local_stub.DisableLogicalDevicePort, req, timeout=self.grpc_timeout)
Jonathan Hart8d21c322018-04-17 07:42:02 -0700187 returnValue(res)
188
189 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700190 def get_device_info(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700191 req = ID(id=device_id)
192 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400193 self.local_stub.GetLogicalDevice, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700194 returnValue(res)
195
196 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700197 def update_flow_table(self, device_id, flow_mod):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700198 req = FlowTableUpdate(
199 id=device_id,
200 flow_mod=flow_mod
201 )
202 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400203 self.local_stub.UpdateLogicalDeviceFlowTable, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700204 returnValue(res)
205
206 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700207 def update_group_table(self, device_id, group_mod):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800208 req = FlowGroupTableUpdate(
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700209 id=device_id,
210 group_mod=group_mod
211 )
212 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400213 self.local_stub.UpdateLogicalDeviceFlowGroupTable, req, timeout=self.grpc_timeout)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700214 returnValue(res)
215
216 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700217 def list_flows(self, device_id):
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700218 req = ID(id=device_id)
219 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400220 self.local_stub.ListLogicalDeviceFlows, req, timeout=self.grpc_timeout)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700221 returnValue(res.items)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700222
223 @inlineCallbacks
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700224 def list_groups(self, device_id):
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700225 req = ID(id=device_id)
226 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400227 self.local_stub.ListLogicalDeviceFlowGroups, req, timeout=self.grpc_timeout)
Zsolt Haraszti8a774382016-10-24 18:25:54 -0700228 returnValue(res.items)
Nicolas Palpacuerfd7b8b12018-06-15 13:58:06 -0400229
230 @inlineCallbacks
231 def list_ports(self, device_id):
232 req = ID(id=device_id)
233 res = yield threads.deferToThread(
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400234 self.local_stub.ListLogicalDevicePorts, req, timeout=self.grpc_timeout)
235 returnValue(res.items)
236
237 @inlineCallbacks
238 def list_logical_devices(self):
239 res = yield threads.deferToThread(
240 self.local_stub.ListLogicalDevices, empty_pb2.Empty(), timeout=self.grpc_timeout)
241 returnValue(res.items)
242
243 @inlineCallbacks
Nicolas Palpacuer75ba77f2018-08-27 17:26:57 -0400244 def list_reachable_logical_devices(self):
245 res = yield threads.deferToThread(
246 self.local_stub.ListReachableLogicalDevices, empty_pb2.Empty(),
247 timeout=self.grpc_timeout)
248 returnValue(res.items)
249
250 @inlineCallbacks
Richard Jankowskidb9a86e2018-09-17 13:33:29 -0400251 def subscribe(self, subscriber):
252 res = yield threads.deferToThread(
253 self.local_stub.Subscribe, subscriber, timeout=self.grpc_timeout)
254 returnValue(res)